Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 119 additions & 69 deletions parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ public final class Variant {
*/
final ByteBuffer metadata;

/**
* Pre-computed metadata dictionary size
*/
private final int dictSize;

/**
* Lazy cache for metadata dictionary strings.
*/
private volatile String[] metadataCache;

/**
* Lazy cache for the parsed object header.
*/
private volatile VariantUtil.ObjectInfo cachedObjectInfo;

/**
* Lazy cache for the parsed array header.
*/
private volatile VariantUtil.ArrayInfo cachedArrayInfo;

/**
* The threshold to switch from linear search to binary search when looking up a field by key in
* an object. This is a performance optimization to avoid the overhead of binary search for a
Expand Down Expand Up @@ -67,6 +87,26 @@ public Variant(ByteBuffer value, ByteBuffer metadata) {
"Unsupported variant metadata version: %d",
metadata.get(metadata.position()) & VariantUtil.VERSION_MASK));
}

// Pre-compute dictionary size for lazy metadata cache allocation.
int pos = this.metadata.position();
int metaOffsetSize = ((this.metadata.get(pos) >> 6) & 0x3) + 1;
if (this.metadata.remaining() > 1) {
this.dictSize = VariantUtil.readUnsigned(this.metadata, pos + 1, metaOffsetSize);
} else {
this.dictSize = 0;
Comment thread
nssalian marked this conversation as resolved.
}
this.metadataCache = null;
}

/**
* Package-private constructor that shares pre-parsed metadata state from a parent Variant.
*/
Variant(ByteBuffer value, ByteBuffer metadata, String[] metadataCache, int dictSize) {
this.value = value.asReadOnlyBuffer();
this.metadata = metadata.asReadOnlyBuffer();
this.metadataCache = metadataCache;
this.dictSize = dictSize;
}

public ByteBuffer getValueBuffer() {
Expand Down Expand Up @@ -194,7 +234,7 @@ public Type getType() {
* @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT`
*/
public int numObjectElements() {
return VariantUtil.getObjectInfo(value).numElements;
return objectInfo().numElements;
}

/**
Expand All @@ -206,22 +246,18 @@ public int numObjectElements() {
* @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT`
*/
public Variant getFieldByKey(String key) {
VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value);
// Use linear search for a short list. Switch to binary search when the length reaches
// `BINARY_SEARCH_THRESHOLD`.
VariantUtil.ObjectInfo info = objectInfo();
int idStart = value.position() + info.idStartOffset;
int offsetStart = value.position() + info.offsetStartOffset;
int dataStart = value.position() + info.dataStartOffset;

if (info.numElements < BINARY_SEARCH_THRESHOLD) {
for (int i = 0; i < info.numElements; ++i) {
ObjectField field = getFieldAtIndex(
i,
value,
metadata,
info.idSize,
info.offsetSize,
value.position() + info.idStartOffset,
value.position() + info.offsetStartOffset,
value.position() + info.dataStartOffset);
if (field.key.equals(key)) {
return field.value;
int id = VariantUtil.readUnsigned(value, idStart + info.idSize * i, info.idSize);
Comment thread
nssalian marked this conversation as resolved.
String fieldKey = getMetadataKeyCached(id);
if (fieldKey.equals(key)) {
int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * i, info.offsetSize);
return childVariant(VariantUtil.slice(value, dataStart + offset));
}
}
} else {
Expand All @@ -232,22 +268,16 @@ public Variant getFieldByKey(String key) {
// performance optimization, because it can properly handle the case where `low + high`
// overflows int.
int mid = (low + high) >>> 1;
ObjectField field = getFieldAtIndex(
mid,
value,
metadata,
info.idSize,
info.offsetSize,
value.position() + info.idStartOffset,
value.position() + info.offsetStartOffset,
value.position() + info.dataStartOffset);
int cmp = field.key.compareTo(key);
int midId = VariantUtil.readUnsigned(value, idStart + info.idSize * mid, info.idSize);
String midKey = getMetadataKeyCached(midId);
int cmp = midKey.compareTo(key);
if (cmp < 0) {
low = mid + 1;
} else if (cmp > 0) {
high = mid - 1;
} else {
return field.value;
int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * mid, info.offsetSize);
return childVariant(VariantUtil.slice(value, dataStart + offset));
}
}
}
Expand Down Expand Up @@ -275,35 +305,14 @@ public ObjectField(String key, Variant value) {
* @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT`
*/
public ObjectField getFieldAtIndex(int idx) {
VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value);
// Use linear search for a short list. Switch to binary search when the length reaches
// `BINARY_SEARCH_THRESHOLD`.
ObjectField field = getFieldAtIndex(
idx,
value,
metadata,
info.idSize,
info.offsetSize,
value.position() + info.idStartOffset,
value.position() + info.offsetStartOffset,
value.position() + info.dataStartOffset);
return field;
}

static ObjectField getFieldAtIndex(
int index,
ByteBuffer value,
ByteBuffer metadata,
int idSize,
int offsetSize,
int idStart,
int offsetStart,
int dataStart) {
// idStart, offsetStart, and dataStart are absolute positions in the `value` buffer.
int id = VariantUtil.readUnsigned(value, idStart + idSize * index, idSize);
int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize);
String key = VariantUtil.getMetadataKey(metadata, id);
Variant v = new Variant(VariantUtil.slice(value, dataStart + offset), metadata);
VariantUtil.ObjectInfo info = objectInfo();
int idStart = value.position() + info.idStartOffset;
int offsetStart = value.position() + info.offsetStartOffset;
int dataStart = value.position() + info.dataStartOffset;
int id = VariantUtil.readUnsigned(value, idStart + info.idSize * idx, info.idSize);
int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * idx, info.offsetSize);
String key = getMetadataKeyCached(id);
Variant v = childVariant(VariantUtil.slice(value, dataStart + offset));
return new ObjectField(key, v);
}

Expand All @@ -312,7 +321,7 @@ static ObjectField getFieldAtIndex(
* @throws IllegalArgumentException if `getType()` does not return `Type.ARRAY`
*/
public int numArrayElements() {
return VariantUtil.getArrayInfo(value).numElements;
return arrayInfo().numElements;
}

/**
Expand All @@ -324,23 +333,64 @@ public int numArrayElements() {
* @throws IllegalArgumentException if `getType()` does not return `Type.ARRAY`
*/
public Variant getElementAtIndex(int index) {
VariantUtil.ArrayInfo info = VariantUtil.getArrayInfo(value);
VariantUtil.ArrayInfo info = arrayInfo();
if (index < 0 || index >= info.numElements) {
return null;
}
return getElementAtIndex(
index,
value,
metadata,
info.offsetSize,
value.position() + info.offsetStartOffset,
value.position() + info.dataStartOffset);
int offsetStart = value.position() + info.offsetStartOffset;
int dataStart = value.position() + info.dataStartOffset;
int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * index, info.offsetSize);
return childVariant(VariantUtil.slice(value, dataStart + offset));
}

/**
* Creates a child Variant that shares this instance's metadata cache.
*/
private Variant childVariant(ByteBuffer childValue) {
return new Variant(childValue, metadata, metadataCache, dictSize);
}

private static Variant getElementAtIndex(
int index, ByteBuffer value, ByteBuffer metadata, int offsetSize, int offsetStart, int dataStart) {
// offsetStart and dataStart are absolute positions in the `value` buffer.
int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize);
return new Variant(VariantUtil.slice(value, dataStart + offset), metadata);
/**
* Returns the metadata dictionary string for the given ID, caching the result.
*/
String getMetadataKeyCached(int id) {
// Fall back to uncached lookup for out-of-range IDs
if (id < 0 || id >= dictSize) {
Comment thread
nssalian marked this conversation as resolved.
return VariantUtil.getMetadataKey(metadata, id);
}
// Demand-create shared dictionary cache
String[] cache = metadataCache;
if (cache == null) {
cache = new String[dictSize];
metadataCache = cache;
}
if (cache[id] == null) {
cache[id] = VariantUtil.getMetadataKey(metadata, id);
}
return cache[id];
}

/**
* Returns the cached object header, parsing it on first access.
*/
private VariantUtil.ObjectInfo objectInfo() {
VariantUtil.ObjectInfo info = cachedObjectInfo;
if (info == null) {
info = VariantUtil.getObjectInfo(value);
cachedObjectInfo = info;
}
return info;
}

/**
* Returns the cached array header, parsing it on first access.
*/
private VariantUtil.ArrayInfo arrayInfo() {
VariantUtil.ArrayInfo info = cachedArrayInfo;
if (info == null) {
info = VariantUtil.getArrayInfo(value);
cachedArrayInfo = info;
}
return info;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import org.apache.parquet.io.api.Binary;

/**
* Builder for creating Variant value and metadata.
Expand Down Expand Up @@ -109,7 +110,14 @@ public void appendEncodedValue(ByteBuffer value) {
*/
public void appendString(String str) {
onAppend();
byte[] data = str.getBytes(StandardCharsets.UTF_8);
writeUTF8bytes(str.getBytes(StandardCharsets.UTF_8));
}

/**
* Write bytes as a UTF8 string.
* @param data data to write; this is not modified.
*/
private void writeUTF8bytes(final byte[] data) {
boolean longStr = data.length > VariantUtil.MAX_SHORT_STR_SIZE;
checkCapacity((longStr ? 1 + VariantUtil.U32_SIZE : 1) + data.length);
if (longStr) {
Expand All @@ -125,6 +133,16 @@ public void appendString(String str) {
writePos += data.length;
}

/**
* Given a Binary, append it to the variant as a string.
* Avoids intermediate String creation when unmarshalling from shredded string columns.
* @param binary source data.
*/
void appendAsString(Binary binary) {
onAppend();
writeUTF8bytes(binary.getBytesUnsafe());
}

/**
* Appends a null value to the Variant builder.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,14 @@ static class PartiallyShreddedFieldsConverter extends GroupConverter {
PartiallyShreddedFieldsConverter(GroupType fieldsType, ParentConverter<VariantBuilder> parent) {
this.converters = new Converter[fieldsType.getFieldCount()];
this.parent = parent;
ParentConverter<VariantObjectBuilder> newParent = converter -> converter.accept(objectBuilder);

for (int index = 0; index < fieldsType.getFieldCount(); index += 1) {
Type field = fieldsType.getType(index);
Preconditions.checkArgument(!field.isPrimitive(), "Invalid field group: " + field);

String name = field.getName();
shreddedFieldNames.add(name);
ParentConverter<VariantObjectBuilder> newParent = converter -> converter.accept(objectBuilder);
converters[index] = new FieldValueConverter(name, field.asGroupType(), newParent);
}
}
Expand Down Expand Up @@ -501,7 +501,7 @@ static class VariantStringConverter extends ShreddedScalarConverter {

@Override
public void addBinary(Binary value) {
parent.build(builder -> builder.appendString(value.toStringUsingUTF8()));
parent.build(builder -> builder.appendAsString(value));
}
}

Expand Down