feat: add native Parquet VARIANT column support for JSON and semi structured fields#917
Open
gaurav7261 wants to merge 1 commit intoconfluentinc:masterfrom
Open
Conversation
…uctured fields Write Kafka Connect JSON fields (Debezium CDC, Confluent Protobuf Struct, custom messages, maps, arrays) as native Parquet VARIANT columns instead of plain STRING, improving storage efficiency and query performance. - Upgrade parquet-java to 1.17.0 for VARIANT logical type support - https://parquet.apache.org/blog/2026/02/27/variant-type-in-apache-parquet-for-semi-structured-data/ - Add config: parquet.variant.enabled, parquet.variant.connect.names, parquet.variant.field.names - Auto-detect recursive schemas (google.protobuf.Struct) as VARIANT - Stream JSON-to-Variant conversion ported from Apache Spark (PR also raised in parquet-java, once approved, code will be neat here in this repo: apache/parquet-java#3415) - Unwrap Protobuf Struct/Value/ListValue/map-as-array to clean JSON - Graceful fallback for non-JSON values (e.g. __debezium_unavailable_value) - Feature is fully opt-in (disabled by default), zero impact on existing connectors
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When the S3 sink connector writes Parquet files, JSON and semi-structured fields (Debezium CDC
io.debezium.data.Json, Confluent Protobufgoogle.protobuf.Struct, custom Protobuf messages, maps,arrays) are stored as plain
STRINGcolumns. This has several drawbacks:google.protobuf.Struct(which is self-referencing) causeStackOverflowErrorduring Avro schema conversion.Solution
Add native Parquet VARIANT column support to the S3 sink connector, leveraging the Variant logical type introduced in Apache Parquet
1.17.0. VARIANT stores semi-structured data in a compact binary format with field-name deduplication
and type preservation, enabling offset-based field access without full document parsing.
Key changes:
parquet-javato 1.17.0 for VARIANT logical type support.parquet.variant.enabled(defaultfalse) — master gate for the feature.parquet.variant.connect.names— comma-separated Connect schema names to write as VARIANT (e.g.io.debezium.data.Json).parquet.variant.field.names— comma-separated field names to explicitly write as VARIANT regardless of schema type.google.protobuf.Struct) usingIdentityHashMap-based cycle detection — these are automatically written as VARIANT without requiring explicit configuration,preventing
StackOverflowError.VariantBuilderfor single-pass, memory-efficient encoding withdecimal-first number handling and integer overflow fallback. A PR to upstream this directly into
parquet-javahas been raised: GH-3414: Add parseJson to VariantBuilder for JSON-to-Variant conversion apache/parquet-java#3415 — once merged, theconverter in this repo will delegate to the library method.
google.protobuf.Struct,Value,ListValue, map-as-array, and single-field wrapper messages into clean JSON before VARIANTencoding.
__debezium_unavailable_valueplaceholder) are caught, wrapped as a VARIANT-encoded string via fallback, and logged as a warning — no data loss,no crashes.
VariantAwareWriteSupportwraps the existingAvroWriteSupport, transforming schemas and pre-processing records before delegating all Parquet writing to the nativelibrary. No reimplementation of Avro-to-Parquet logic.
parquet.variant.enabled=false(default), none of the variant code paths are instantiated or executed. The originalAvroParquetWriterflow runs exactly as before.New files:
| File | Purpose |
|---|---|
|
JsonFieldDetector| Identifies which fields to write as VARIANT based on config + recursive schema detection ||
JsonToVariantConverter| Streaming JSON → Variant binary conversion (Spark port) ||
AvroValueToJsonConverter| Avro values → clean JSON string (Protobuf unwrapping) ||
VariantAwareWriteSupport| WriteSupport wrapper: schema transform + record pre-processing ||
VariantSchemaBuilder| Parquet MessageType transform: replaces target fields with VARIANT group |Does this solution apply anywhere else?
If yes, where?
The streaming JSON-to-Variant conversion logic has been contributed upstream to
parquet-javaitself (apache/parquet-java#3415) asVariantBuilder.parseJson(), so all Parquetconsumers (not just Kafka Connect) can benefit from native JSON-to-Variant parsing.
Test Strategy
56 unit tests covering all new components:
JsonFieldDetectorTestJsonToVariantConverterTestAvroValueToJsonConverterTestVariantAwareWriteSupportTest__debezium_unavailable_valuefallback,VariantSchemaBuilderTestgoogle.protobuf.Structfields (recursive + non-recursive)io.debezium.data.Jsoncolumnsvariant_get/ path notation queriesTesting done:
Release Plan
Feature is fully opt-in behind
parquet.variant.enabled=falsedefault — no behavioral change for existing connectors. Theparquet-java1.17.0 upgrade is backward compatible for all existing Parquet read/write functionality as parquet is backward compatible