Skip to content

feat: add native Parquet VARIANT column support for JSON and semi structured fields#917

Open
gaurav7261 wants to merge 1 commit intoconfluentinc:masterfrom
gaurav7261:feat/parquet-variant-json-columns
Open

feat: add native Parquet VARIANT column support for JSON and semi structured fields#917
gaurav7261 wants to merge 1 commit intoconfluentinc:masterfrom
gaurav7261:feat/parquet-variant-json-columns

Conversation

@gaurav7261
Copy link
Copy Markdown

Problem

When the S3 sink connector writes Parquet files, JSON and semi-structured fields (Debezium CDC io.debezium.data.Json, Confluent Protobuf google.protobuf.Struct, custom Protobuf messages, maps,
arrays) are stored as plain STRING columns. This has several drawbacks:

  • Storage inefficiency: JSON text is verbose, with field names redundantly repeated in every row.
  • Query performance: Downstream engines (Spark, Trino, DuckDB) must fully parse each JSON string to access any nested field — no predicate pushdown, no column pruning.
  • Type loss: All values inside JSON strings are untyped text (integers, booleans, timestamps all become strings), requiring manual casting and increasing the risk of incorrect type coercion.
  • Recursive schema crash: Protobuf schemas using google.protobuf.Struct (which is self-referencing) cause StackOverflowError during Avro schema conversion.

Solution

Add native Parquet VARIANT column support to the S3 sink connector, leveraging the Variant logical type introduced in Apache Parquet
1.17.0
. VARIANT stores semi-structured data in a compact binary format with field-name deduplication
and type preservation, enabling offset-based field access without full document parsing.
Key changes:

  • Upgrade parquet-java to 1.17.0 for VARIANT logical type support.
  • Three new connector configs (all opt-in, feature disabled by default):
    • parquet.variant.enabled (default false) — master gate for the feature.
    • parquet.variant.connect.names — comma-separated Connect schema names to write as VARIANT (e.g. io.debezium.data.Json).
    • parquet.variant.field.names — comma-separated field names to explicitly write as VARIANT regardless of schema type.
  • Auto-detection of recursive schemas (google.protobuf.Struct) using IdentityHashMap-based cycle detection — these are automatically written as VARIANT without requiring explicit configuration,
    preventing StackOverflowError.
  • Streaming JSON-to-Variant conversion ported from Apache Spark's production
    VariantBuilder
    for single-pass, memory-efficient encoding with
    decimal-first number handling and integer overflow fallback. A PR to upstream this directly into parquet-java has been raised: GH-3414: Add parseJson to VariantBuilder for JSON-to-Variant conversion apache/parquet-java#3415 — once merged, the
    converter in this repo will delegate to the library method.
  • Protobuf unwrapping: Converts verbose Avro representations of google.protobuf.Struct, Value, ListValue, map-as-array, and single-field wrapper messages into clean JSON before VARIANT
    encoding.
  • Graceful fallback: Non-JSON values (e.g. Debezium's __debezium_unavailable_value placeholder) are caught, wrapped as a VARIANT-encoded string via fallback, and logged as a warning — no data loss,
    no crashes.
  • Thin wrapper architecture: VariantAwareWriteSupport wraps the existing AvroWriteSupport, transforming schemas and pre-processing records before delegating all Parquet writing to the native
    library. No reimplementation of Avro-to-Parquet logic.
  • Zero impact when disabled: When parquet.variant.enabled=false (default), none of the variant code paths are instantiated or executed. The original AvroParquetWriter flow runs exactly as before.
    New files:
    | File | Purpose |
    |---|---|
    | JsonFieldDetector | Identifies which fields to write as VARIANT based on config + recursive schema detection |
    | JsonToVariantConverter | Streaming JSON → Variant binary conversion (Spark port) |
    | AvroValueToJsonConverter | Avro values → clean JSON string (Protobuf unwrapping) |
    | VariantAwareWriteSupport | WriteSupport wrapper: schema transform + record pre-processing |
    | VariantSchemaBuilder | Parquet MessageType transform: replaces target fields with VARIANT group |
Does this solution apply anywhere else?
  • yes
  • no
If yes, where?

The streaming JSON-to-Variant conversion logic has been contributed upstream to parquet-java itself (apache/parquet-java#3415) as VariantBuilder.parseJson(), so all Parquet
consumers (not just Kafka Connect) can benefit from native JSON-to-Variant parsing.

Test Strategy

56 unit tests covering all new components:

Test class Tests Coverage
JsonFieldDetectorTest 15 Schema detection by connect name, field name, recursive schemas, nested structs, edge cases
JsonToVariantConverterTest 16 Primitives, objects, arrays, nested structures, unicode, large numbers, scientific notation, integer overflow to decimal, malformed JSON
AvroValueToJsonConverterTest 8 Protobuf Struct/Value/ListValue unwrapping, map-as-array, single-field wrapper, nested complex types
VariantAwareWriteSupportTest 11 End-to-end Parquet write+read: simple/nested/complex variant fields, Protobuf maps, recursive Struct (no StackOverflow), __debezium_unavailable_value fallback,
null handling
VariantSchemaBuilderTest 6 Parquet schema transformation, VARIANT annotation, nested field replacement
Manually tested on QA environment with:
  • Confluent Protobuf topics with google.protobuf.Struct fields (recursive + non-recursive)
  • Debezium CDC topics with io.debezium.data.Json columns
  • Verified Parquet output readable in Spark 4.0 with variant_get / path notation queries
Testing done:
  • Unit tests
  • Integration tests
  • System tests
  • Manual tests

Release Plan

Feature is fully opt-in behind parquet.variant.enabled=false default — no behavioral change for existing connectors. The parquet-java 1.17.0 upgrade is backward compatible for all existing Parquet read/write functionality as parquet is backward compatible

…uctured fields

Write Kafka Connect JSON fields (Debezium CDC, Confluent Protobuf Struct,
custom messages, maps, arrays) as native Parquet VARIANT columns instead of
plain STRING, improving storage efficiency and query performance.

- Upgrade parquet-java to 1.17.0 for VARIANT logical type support - https://parquet.apache.org/blog/2026/02/27/variant-type-in-apache-parquet-for-semi-structured-data/
- Add config: parquet.variant.enabled, parquet.variant.connect.names,
  parquet.variant.field.names
- Auto-detect recursive schemas (google.protobuf.Struct) as VARIANT
- Stream JSON-to-Variant conversion ported from Apache Spark (PR also raised in parquet-java, once approved, code will be neat here in this repo: apache/parquet-java#3415)
- Unwrap Protobuf Struct/Value/ListValue/map-as-array to clean JSON
- Graceful fallback for non-JSON values (e.g. __debezium_unavailable_value)
- Feature is fully opt-in (disabled by default), zero impact on existing connectors
@gaurav7261 gaurav7261 requested a review from a team as a code owner March 8, 2026 09:13
@confluent-cla-assistant
Copy link
Copy Markdown

confluent-cla-assistant bot commented Mar 8, 2026

🎉 All Contributor License Agreements have been signed. Ready to merge.
✅ gaurav7261
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant