trino
Support setting compression_codec table properties for Iceberg Connector
#25755
Open

Support setting compression_codec table properties for Iceberg Connector #25755

SongChujun
SongChujun16 days ago (edited 2 days ago)

This PR add support for setting compression_codec table properties for Iceberg Connector.

Users are able to run the following command now to specify the compression_codec to create a new table with compression_codec set to ZSTD.

CREATE TABLE example (
    c1 INTEGER,
    c2 DATE,
    c3 DOUBLE
)
WITH (
    format = 'AVRO',
    partitioning = ARRAY['c1', 'c2'],
    sorted_by = ARRAY['c3'],
    compression_codec = 'ZSTD'
);

Users are able able to change the compression_codec via statement Alter Table Set Properties.

Example

ALTER TABLE example SET PROPERTIES compression_codec = 'GZIP';

The write_compression users specify will take precedence over session variable iceberg.compression-codec. If the user change file_format without changing write_compression, it will inherit the write_compression set for the original file format, if the write_compression for the previous file format is set.

If the user is trying to set a write_compression that is inconsistent with the file format, the system will throw an exception.

The compatibility matrix for write_compression and file format is the following

Orc Parquet Avro
NONE
SNAPPY
LZ4
ZSTD
GZIP

Description

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`20401`)
cla-bot cla-bot added cla-signed
github-actions github-actions added iceberg
SongChujun SongChujun force pushed from 8a89d1f5 to 53995bfc 16 days ago
SongChujun SongChujun force pushed from 194fec62 to 36c996af 16 days ago
SongChujun SongChujun force pushed from 36c996af to b5e8d553 16 days ago
SongChujun SongChujun added enhancement
SongChujun SongChujun requested a review from pettyjamesm pettyjamesm 16 days ago
pettyjamesm
pettyjamesm commented on 2025-05-09
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
25322555 }
25332556
2557 public static Map<String, String> calculateTableCompressionProperties(IcebergFileFormat oldFileFormat, IcebergFileFormat newFileFormat, Map<String, String> existingProperties, Map<String, Object> inputProperties)
2558
{
pettyjamesm15 days ago

Nit: no need for a lambda here, just newCompressionCodec.orElse(oldCompressionCodec)

SongChujun8 days ago (edited 8 days ago)

orElse requires the parameter to be T and return T, but we are processing on optional and want to return optional, so want to use the or instaed of orElse

pettyjamesm8 days ago👍 1

Ah, I missed that- still, you want to avoid spinning a lambda for a cheap operation like this. Maybe a cleaner way to write it is: newCompressionCodec.isPresent() ? newCompressionCodec : oldCompressionCodec; like we do later on for compression level.

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java
262 {
263 return Optional.ofNullable((Integer) tableProperties.get(COMPRESSION_LEVEL))
264 .map(OptionalInt::of)
265
.orElseGet(OptionalInt::empty);
pettyjamesm15 days ago

Nit: no need for a lambda here, just orElse(OptionalInt.empty())

SongChujun8 days ago

done

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java
301 {
302 if (compressionCodec.isPresent()) {
303 if (!isCompressionCodecSupportedForFormat(fileFormat, compressionCodec.get())) {
304
throw new TrinoException(NOT_SUPPORTED, format("Compression codec LZ4 not supported for %s", fileFormat.humanName()));
pettyjamesm15 days ago

Error message here mentions LZ4 explicitly, but it seems like this should be a parameter based on compressionCodec.get()

SongChujun8 days ago (edited 8 days ago)

changed to pass the compressionCodec.get() instead of LZ4

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java
309 throw new TrinoException(INVALID_TABLE_PROPERTY, "write_compression must be set when compression_level is set");
310 }
311 else {
312
if (!(VALID_ICEBERG_FILE_FORMATS_FOR_COMPRESSION_LEVEL_PROPERTY.contains(fileFormat)
pettyjamesm15 days ago

I wonder whether this branch would actually be easier to read and understand as an enum switch statement. That way we would fail to compile if a new enum were added and we would know we needed to add new handling at that point.

SongChujun8 days ago

done

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java
336
337 public static boolean isCompressionCodecSupportedForFormat(IcebergFileFormat fileFormat, HiveCompressionCodec codec)
338 {
339
return !((fileFormat.equals(AVRO) || fileFormat.equals(PARQUET)) && codec.equals(HiveCompressionCodec.LZ4));
pettyjamesm15 days ago

Let's rewrite this as a switch over the file formats and compression codecs so that it's easier to read and understand the valid combinations.

SongChujun8 days ago

done

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
311317 {
312318 ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder();
319 IcebergFileFormat fileFormat = getFileFormat(icebergTable);
313320
properties.put(FILE_FORMAT_PROPERTY, getFileFormat(icebergTable));
pettyjamesm15 days ago

duplicate call to getFileFormat, you can pass the instance from the previous line here instead

SongChujun8 days ago

done

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
pettyjamesm15 days ago

I would just inline compressionLevelValue != null && !compressionLevelValue.isEmpty() instead of bringing in Strings to do that

SongChujun8 days ago

done

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
pettyjamesm15 days ago

Let's move these cleanup / refactoring changes to a separate commit

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
884 txn = catalog.newCreateOrReplaceTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, tableLocation, createTableProperties(tableMetadata, allowedExtraProperties));
885 }
886 else {
887
txn = catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, Optional.ofNullable(tableLocation), createTableProperties(tableMetadata, allowedExtraProperties));
pettyjamesm15 days ago

Is this related to the iceberg parquet compression handling bug? If so, we'll want an inline comment explaining what's going on here

SongChujun8 days ago

done

raunaqmorarka4 days ago

Could you clarify what is the bug being referring to here ? I responded at #24851 (comment) to the concern raised there.

SongChujun3 days ago

If we let Iceberg write the table property write.parquet.compression-codec to zstd by default for us, then this PR will introduce some non-backward-compatible behavior. Suppose we have a user that didn't set write.parquet.compression-codec in their table property, if they are setting write.parquet.compression-codec to snappy in their iceberg config/iceberg session property, when they write a new file, on a previous version of Trino, the compression-codec being used would be snappy. But because this change will try to also read compression-codec from table properties when writing a file, this time the compression-codec being used would be zstd instead of snappy, which is not backward compatible. So we want to disable Iceberg from setting this table property if that's not being set by user.

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
910
911 validateCompression(fileFormat, compressionCodec, compressionLevel);
912
913
Map<String, String> tableCompressionProperties = calculateTableCompressionProperties(fileFormat, fileFormat, ImmutableMap.of(), tableMetadata.getProperties());
pettyjamesm15 days ago

We'll definitely want a test around this quirk

SongChujun8 days ago

it should be covered by the test implicitly, but I still added a test on it

SongChujun8 days ago
assertUpdate(format("CREATE TABLE %s WITH (format = 'AVRO') AS SELECT * FROM nation", tableName), "SELECT count(*) FROM nation");

assertQuery(format("SELECT COUNT(*) FROM \"%s$properties\" WHERE key = 'write.parquet.compression-codec'", tableName), "SELECT 0");
Conversation is marked as resolved
Show resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
12811292 .orElseGet(() -> catalog.defaultTableLocation(session, tableMetadata.getTable()));
12821293 }
12831294 transaction = newCreateTableTransaction(catalog, tableMetadata, session, replace, tableLocation, allowedExtraProperties);
1295
pettyjamesm15 days ago

Unintentional new line insertion?

SongChujun8 days ago

removed

pettyjamesm pettyjamesm requested a review from raunaqmorarka raunaqmorarka 15 days ago
pettyjamesm
pettyjamesm commented on 2025-05-09
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
7214 assertQuery(format("SELECT value FROM \"%s$properties\" WHERE key = 'write.avro.compression-codec'", tableName), "VALUES 'ZSTD'");
7215
7216 assertUpdate(format("ALTER TABLE %s SET PROPERTIES compression_level = 5", tableName));
7217
assertQuery(format("SELECT value FROM \"%s$properties\" WHERE key = 'write.avro.compression-level'", tableName), ("VALUES 5"));
pettyjamesm15 days ago

Looks like checkstyle is complaining about this line, but giving the wrong line number in the error.

SongChujun SongChujun marked this pull request as ready for review 15 days ago
SongChujun SongChujun assigned SongChujun SongChujun 15 days ago
raunaqmorarka raunaqmorarka requested a review from ebyhr ebyhr 12 days ago
raunaqmorarka raunaqmorarka requested a review from findinpath findinpath 12 days ago
raunaqmorarka raunaqmorarka requested a review from pajaks pajaks 12 days ago
raunaqmorarka raunaqmorarka requested a review from SemionPar SemionPar 12 days ago
raunaqmorarka raunaqmorarka requested a review from copilot-pull-request-reviewer copilot-pull-request-reviewer 12 days ago
copilot-pull-request-reviewer
copilot-pull-request-reviewer commented on 2025-05-12
copilot-pull-request-reviewer12 days ago

Pull Request Overview

This PR adds support for specifying the write_compression and compression_level table properties for the Iceberg connector and updates related tests and compression property validations.

  • Adds new DDL support and property validations for write_compression and compression_level.
  • Updates tests to verify new expected compression property values.
  • Adjusts compression property handling across table creation, metadata, and file writer implementations.

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.

Show a summary per file
File Description
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java Updated expected values in JSON assertions.
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java Modified test expectations for column_sizes and compression properties.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java Revised table properties creation and transaction handling.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java Introduced new methods for compression level and codec extraction.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java Added new table properties and validation for compression settings.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java Updated table property and compression handling during metadata updates.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java Adjusted writer creation to incorporate storage properties for compression.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java Updated Avro writer to accept and use a compression level.
Comments suppressed due to low confidence (1)

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java:897

  • The method 'createTableProperties' is expected to return a Map<String, String>, but it is returning a Transaction object. Please update the return value to the actual table properties map or adjust the method signature accordingly to ensure type consistency.
return txn;
SongChujun SongChujun force pushed from b5e8d553 to ffaf8b7f 8 days ago
SongChujun SongChujun force pushed from ffaf8b7f to 4d27522a 8 days ago
raunaqmorarka
raunaqmorarka commented on 2025-05-20
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java
5764 public static final String LOCATION_PROPERTY = "location";
5865 public static final String FORMAT_VERSION_PROPERTY = "format_version";
66
67
public static final String WRITE_COMPRESSION = "write_compression";
raunaqmorarka4 days ago

Property name should be compression_codec to maintain consistency with equivalent iceberg table property and the connector session property.

raunaqmorarka4 days ago

The order of precedence should be: session property > table property > catalog config property

SongChujun3 days ago

I can change the property name. But I don't understand why session property should have a higher precedence over table property, if a user set table property for a particular table, it should have higher precedence compared to the default one from session property right?

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java
220252 return (IcebergFileFormat) tableProperties.get(FILE_FORMAT_PROPERTY);
221253 }
222254
255
public static Optional<HiveCompressionCodec> getHiveCompressionCodec(Map<String, Object> inputProperties)
raunaqmorarka4 days ago

getCompressionCodec

SongChujun3 days ago

will update the name

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
865880
881 Transaction txn;
882
866883
if (replace) {
raunaqmorarka4 days ago

If user doesn't set anything, our default for iceberg.compression-codec catalog config property should apply.

SongChujun3 days ago

Already replied in a previous comment.

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java
6380 public static final String OBJECT_STORE_LAYOUT_ENABLED_PROPERTY = "object_store_layout_enabled";
6481 public static final String DATA_LOCATION_PROPERTY = "data_location";
6582 public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties";
83
public static final Set<IcebergFileFormat> VALID_ICEBERG_FILE_FORMATS_FOR_COMPRESSION_LEVEL_PROPERTY = ImmutableSet.of(IcebergFileFormat.AVRO);
raunaqmorarka4 days ago

It feels very odd to add a table property which is supported only for AVRO file format. When we're adding something as a Trino property, the expectation is that Trino is able to actually implement it for the typical use case (orc/parquet in this case). I suggest dropping the "compression_level" changes unless we actually implement it in Trino orc and parquet writers. Trino is already adhering to iceberg table property write.avro.compression-level without these changes, and you can still use ALTER TABLE tableName SET PROPERTIES extra_properties ... to change this property through Trino. So we don't strictly need this to be a Trino property.

SongChujun3 days ago

Sure, I can remove support for setting compression_level.

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
878908 propertiesBuilder.put(COMMIT_NUM_RETRIES, Integer.toString(IcebergTableProperties.getMaxCommitRetry(tableMetadata.getProperties())));
879909
880 HiveCompressionCodec compressionCodec = toCompressionCodec(getCompressionCodec(session));
881 switch (fileFormat) {
882 case PARQUET -> propertiesBuilder.put(PARQUET_COMPRESSION, toParquetCompressionCodecTableProperty(compressionCodec));
883 case ORC -> propertiesBuilder.put(ORC_COMPRESSION, compressionCodec.getOrcCompressionKind().name().toLowerCase(Locale.ENGLISH));
884 case AVRO -> propertiesBuilder.put(AVRO_COMPRESSION, toAvroCompressionCodecTableProperty(compressionCodec));
910 Optional<HiveCompressionCodec> compressionCodec = IcebergTableProperties.getHiveCompressionCodec(tableMetadata.getProperties());
911
OptionalInt compressionLevel = IcebergTableProperties.getCompressionLevel(tableMetadata.getProperties());
raunaqmorarka4 days ago

The issue you've linked to is closed, we need to either re-open that (with explanation why) or file a new issue and refer to that.

SongChujun3 days ago

Sure, we may re-open it after we reach agreement on #25755 (comment)

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
879909
880 HiveCompressionCodec compressionCodec = toCompressionCodec(getCompressionCodec(session));
881 switch (fileFormat) {
882 case PARQUET -> propertiesBuilder.put(PARQUET_COMPRESSION, toParquetCompressionCodecTableProperty(compressionCodec));
883 case ORC -> propertiesBuilder.put(ORC_COMPRESSION, compressionCodec.getOrcCompressionKind().name().toLowerCase(Locale.ENGLISH));
884 case AVRO -> propertiesBuilder.put(AVRO_COMPRESSION, toAvroCompressionCodecTableProperty(compressionCodec));
910 Optional<HiveCompressionCodec> compressionCodec = IcebergTableProperties.getHiveCompressionCodec(tableMetadata.getProperties());
911 OptionalInt compressionLevel = IcebergTableProperties.getCompressionLevel(tableMetadata.getProperties());
912
raunaqmorarka4 days ago

Why ? If user has not set table property, the catalog config property for compression codec still applies

SongChujun3 days ago

explained in a previous comment

SongChujun Support setting compression-codec table property for Iceberg Connector
a8410e89
SongChujun SongChujun force pushed from 4d27522a to 4c0b43fe 2 days ago
github-actions github-actions added docs
SongChujun SongChujun changed the title Support setting write_compression and compression_level table properties for Iceberg Connector Support setting compression_codec table properties for Iceberg Connector 2 days ago
SongChujun SongChujun force pushed from 4c0b43fe to 5d80c4bf 2 days ago
SongChujun Remove redundant type qualifications for NestedField and PrimitiveType
e53d72ea
SongChujun SongChujun force pushed from 5d80c4bf to e53d72ea 2 days ago
SongChujun SongChujun requested a review from raunaqmorarka raunaqmorarka 1 day ago

Login to write a write a comment.

Login via GitHub

Assignees
Labels
Milestone