2532 | 2555 | } | |
2533 | 2556 | ||
2557 | public static Map<String, String> calculateTableCompressionProperties(IcebergFileFormat oldFileFormat, IcebergFileFormat newFileFormat, Map<String, String> existingProperties, Map<String, Object> inputProperties) | ||
2558 | { |
Nit: no need for a lambda here, just newCompressionCodec.orElse(oldCompressionCodec)
orElse requires the parameter to be T and return T, but we are processing on optional and want to return optional, so want to use the or
instaed of orElse
Ah, I missed that- still, you want to avoid spinning a lambda for a cheap operation like this. Maybe a cleaner way to write it is: newCompressionCodec.isPresent() ? newCompressionCodec : oldCompressionCodec;
like we do later on for compression level.
262 | { | ||
263 | return Optional.ofNullable((Integer) tableProperties.get(COMPRESSION_LEVEL)) | ||
264 | .map(OptionalInt::of) | ||
265 | .orElseGet(OptionalInt::empty); |
Nit: no need for a lambda here, just orElse(OptionalInt.empty())
done
301 | { | ||
302 | if (compressionCodec.isPresent()) { | ||
303 | if (!isCompressionCodecSupportedForFormat(fileFormat, compressionCodec.get())) { | ||
304 | throw new TrinoException(NOT_SUPPORTED, format("Compression codec LZ4 not supported for %s", fileFormat.humanName())); |
Error message here mentions LZ4
explicitly, but it seems like this should be a parameter based on compressionCodec.get()
changed to pass the compressionCodec.get() instead of LZ4
309 | throw new TrinoException(INVALID_TABLE_PROPERTY, "write_compression must be set when compression_level is set"); | ||
310 | } | ||
311 | else { | ||
312 | if (!(VALID_ICEBERG_FILE_FORMATS_FOR_COMPRESSION_LEVEL_PROPERTY.contains(fileFormat) |
I wonder whether this branch would actually be easier to read and understand as an enum switch statement. That way we would fail to compile if a new enum were added and we would know we needed to add new handling at that point.
done
336 | |||
337 | public static boolean isCompressionCodecSupportedForFormat(IcebergFileFormat fileFormat, HiveCompressionCodec codec) | ||
338 | { | ||
339 | return !((fileFormat.equals(AVRO) || fileFormat.equals(PARQUET)) && codec.equals(HiveCompressionCodec.LZ4)); |
Let's rewrite this as a switch over the file formats and compression codecs so that it's easier to read and understand the valid combinations.
done
311 | 317 | { | |
312 | 318 | ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder(); | |
319 | IcebergFileFormat fileFormat = getFileFormat(icebergTable); | ||
313 | 320 | properties.put(FILE_FORMAT_PROPERTY, getFileFormat(icebergTable)); |
duplicate call to getFileFormat
, you can pass the instance from the previous line here instead
done
I would just inline compressionLevelValue != null && !compressionLevelValue.isEmpty()
instead of bringing in Strings
to do that
done
Let's move these cleanup / refactoring changes to a separate commit
884 | txn = catalog.newCreateOrReplaceTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, tableLocation, createTableProperties(tableMetadata, allowedExtraProperties)); | ||
885 | } | ||
886 | else { | ||
887 | txn = catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, Optional.ofNullable(tableLocation), createTableProperties(tableMetadata, allowedExtraProperties)); |
Is this related to the iceberg parquet compression handling bug? If so, we'll want an inline comment explaining what's going on here
done
Could you clarify what is the bug being referring to here ? I responded at #24851 (comment) to the concern raised there.
If we let Iceberg write the table property write.parquet.compression-codec
to zstd by default for us, then this PR will introduce some non-backward-compatible behavior. Suppose we have a user that didn't set write.parquet.compression-codec
in their table property, if they are setting write.parquet.compression-codec
to snappy in their iceberg config/iceberg session property, when they write a new file, on a previous version of Trino, the compression-codec being used would be snappy. But because this change will try to also read compression-codec from table properties when writing a file, this time the compression-codec being used would be zstd instead of snappy, which is not backward compatible. So we want to disable Iceberg from setting this table property if that's not being set by user.
910 | |||
911 | validateCompression(fileFormat, compressionCodec, compressionLevel); | ||
912 | |||
913 | Map<String, String> tableCompressionProperties = calculateTableCompressionProperties(fileFormat, fileFormat, ImmutableMap.of(), tableMetadata.getProperties()); |
We'll definitely want a test around this quirk
it should be covered by the test implicitly, but I still added a test on it
assertUpdate(format("CREATE TABLE %s WITH (format = 'AVRO') AS SELECT * FROM nation", tableName), "SELECT count(*) FROM nation");
assertQuery(format("SELECT COUNT(*) FROM \"%s$properties\" WHERE key = 'write.parquet.compression-codec'", tableName), "SELECT 0");
7214 | assertQuery(format("SELECT value FROM \"%s$properties\" WHERE key = 'write.avro.compression-codec'", tableName), "VALUES 'ZSTD'"); | ||
7215 | |||
7216 | assertUpdate(format("ALTER TABLE %s SET PROPERTIES compression_level = 5", tableName)); | ||
7217 | assertQuery(format("SELECT value FROM \"%s$properties\" WHERE key = 'write.avro.compression-level'", tableName), ("VALUES 5")); |
Looks like checkstyle is complaining about this line, but giving the wrong line number in the error.
This PR adds support for specifying the write_compression and compression_level table properties for the Iceberg connector and updates related tests and compression property validations.
Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.
File | Description |
---|---|
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java | Updated expected values in JSON assertions. |
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java | Modified test expectations for column_sizes and compression properties. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java | Revised table properties creation and transaction handling. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java | Introduced new methods for compression level and codec extraction. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java | Added new table properties and validation for compression settings. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | Updated table property and compression handling during metadata updates. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java | Adjusted writer creation to incorporate storage properties for compression. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java | Updated Avro writer to accept and use a compression level. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java:897
return txn;
57 | 64 | public static final String LOCATION_PROPERTY = "location"; | |
58 | 65 | public static final String FORMAT_VERSION_PROPERTY = "format_version"; | |
66 | |||
67 | public static final String WRITE_COMPRESSION = "write_compression"; |
Property name should be compression_codec
to maintain consistency with equivalent iceberg table property and the connector session property.
The order of precedence should be: session property > table property > catalog config property
I can change the property name. But I don't understand why session property should have a higher precedence over table property, if a user set table property for a particular table, it should have higher precedence compared to the default one from session property right?
220 | 252 | return (IcebergFileFormat) tableProperties.get(FILE_FORMAT_PROPERTY); | |
221 | 253 | } | |
222 | 254 | ||
255 | public static Optional<HiveCompressionCodec> getHiveCompressionCodec(Map<String, Object> inputProperties) |
getCompressionCodec
will update the name
865 | 880 | ||
881 | Transaction txn; | ||
882 | |||
866 | 883 | if (replace) { |
If user doesn't set anything, our default for iceberg.compression-codec
catalog config property should apply.
Already replied in a previous comment.
63 | 80 | public static final String OBJECT_STORE_LAYOUT_ENABLED_PROPERTY = "object_store_layout_enabled"; | |
64 | 81 | public static final String DATA_LOCATION_PROPERTY = "data_location"; | |
65 | 82 | public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties"; | |
83 | public static final Set<IcebergFileFormat> VALID_ICEBERG_FILE_FORMATS_FOR_COMPRESSION_LEVEL_PROPERTY = ImmutableSet.of(IcebergFileFormat.AVRO); |
It feels very odd to add a table property which is supported only for AVRO file format. When we're adding something as a Trino property, the expectation is that Trino is able to actually implement it for the typical use case (orc/parquet in this case). I suggest dropping the "compression_level" changes unless we actually implement it in Trino orc and parquet writers. Trino is already adhering to iceberg table property write.avro.compression-level
without these changes, and you can still use ALTER TABLE tableName SET PROPERTIES extra_properties ...
to change this property through Trino. So we don't strictly need this to be a Trino property.
Sure, I can remove support for setting compression_level.
878 | 908 | propertiesBuilder.put(COMMIT_NUM_RETRIES, Integer.toString(IcebergTableProperties.getMaxCommitRetry(tableMetadata.getProperties()))); | |
879 | 909 | ||
880 | HiveCompressionCodec compressionCodec = toCompressionCodec(getCompressionCodec(session)); | ||
881 | switch (fileFormat) { | ||
882 | case PARQUET -> propertiesBuilder.put(PARQUET_COMPRESSION, toParquetCompressionCodecTableProperty(compressionCodec)); | ||
883 | case ORC -> propertiesBuilder.put(ORC_COMPRESSION, compressionCodec.getOrcCompressionKind().name().toLowerCase(Locale.ENGLISH)); | ||
884 | case AVRO -> propertiesBuilder.put(AVRO_COMPRESSION, toAvroCompressionCodecTableProperty(compressionCodec)); | ||
910 | Optional<HiveCompressionCodec> compressionCodec = IcebergTableProperties.getHiveCompressionCodec(tableMetadata.getProperties()); | ||
911 | OptionalInt compressionLevel = IcebergTableProperties.getCompressionLevel(tableMetadata.getProperties()); |
The issue you've linked to is closed, we need to either re-open that (with explanation why) or file a new issue and refer to that.
Sure, we may re-open it after we reach agreement on #25755 (comment)
879 | 909 | ||
880 | HiveCompressionCodec compressionCodec = toCompressionCodec(getCompressionCodec(session)); | ||
881 | switch (fileFormat) { | ||
882 | case PARQUET -> propertiesBuilder.put(PARQUET_COMPRESSION, toParquetCompressionCodecTableProperty(compressionCodec)); | ||
883 | case ORC -> propertiesBuilder.put(ORC_COMPRESSION, compressionCodec.getOrcCompressionKind().name().toLowerCase(Locale.ENGLISH)); | ||
884 | case AVRO -> propertiesBuilder.put(AVRO_COMPRESSION, toAvroCompressionCodecTableProperty(compressionCodec)); | ||
910 | Optional<HiveCompressionCodec> compressionCodec = IcebergTableProperties.getHiveCompressionCodec(tableMetadata.getProperties()); | ||
911 | OptionalInt compressionLevel = IcebergTableProperties.getCompressionLevel(tableMetadata.getProperties()); | ||
912 |
Why ? If user has not set table property, the catalog config property for compression codec still applies
explained in a previous comment
Login to write a write a comment.
This PR add support for setting compression_codec table properties for Iceberg Connector.
Users are able to run the following command now to specify the compression_codec to create a new table with compression_codec set to ZSTD.
Users are able able to change the compression_codec via statement
Alter Table Set Properties
.Example
The write_compression users specify will take precedence over session variable
iceberg.compression-codec
. If the user change file_format without changing write_compression, it will inherit the write_compression set for the original file format, if the write_compression for the previous file format is set.If the user is trying to set a write_compression that is inconsistent with the file format, the system will throw an exception.
The compatibility matrix for write_compression and file format is the following
Description
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: