trino
Add support for aggregation pushdown in Druid connector
#25706
Open

Add support for aggregation pushdown in Druid connector #25706

conor-mcavoy
conor-mcavoy24 days ago (edited 22 days ago)

Description

This change adds support for aggregation pushdown in the Druid connector for some common aggregate functions, namely min/max, avg, sum, count, and variations of these. Actually adding this support requires little more than boilerplate, as the standard "Implements*" AggregateFunctionRules can be used for most functions. One special case, count(distinct), requires a new property for best support.

New tests are created for these changes in TestDruidConnectorTest. They are largely similar to tests in BaseJdbcConnectorTest, but those tests can't always be reused for Druid due to Druid's lack of support for table create statements.

See #4313 for a previous attempt. This fixes #4109.

Additional context and related issues

For efficient queries, Trino should "push down" aggregation functions into Druid when running queries on the Trino-Druid connector. This means that, for a query like SELECT sum(<col>) FROM druid.druid_table, Druid will actually perform the sum itself. There are generic classes already available to take care of this for JDBC connectors within Trino, see for example the io.trino.plugin.base.aggregation package. This PR largely just makes use of those existing classes to indicate which functions can be pushed down. Druid does not have functions that correspond to Trino's functions for covariance, correlation, and regression, and Druid's standard deviation and variance functions require an extension, so these functions are not supported for pushdown.

One special case is aggregations containing distinct, such as count(distinct). Druid has a property useApproximateCountDistinct that controls its behavior when running such functions. Essentially, if useApproximateCountDistinct=true, Druid will use an approximate cardinality algorithm to calculate count(distinct), presumably similar to the approx_distinct() function in Trino, whereas if useApproximateCountDistinct=false Druid will use an exact count. Druid rejects queries like sum(distinct) when useApproximateCountDistinct=true. To handle all this, this PR has added a property druid.count-distinct-strategy, with options DEFAULT, APPROXIMATE, and EXACT. When DEFAULT is selected, Trino does not set the property, leaving it up to Druid (Druid clusters can enforce this on their own end). APPROXIMATE corresponds to useApproximateCountDistinct=true, and EXACT to useApproximateCountDistinct=false. Using EXACT will produce technically fully correct aggregations.

A battery of Druid-specific aggregation pushdown tests is included in this PR. These are similar to the tests in BaseJdbcConnectorTest, but the BaseJdbcConnectorTest tests cannot be directly inherited by Druid because Druid does not support table create statements.

The Druid Docker image version had to be updated from 0.18.0 due to this issue, which is fixed in apache/druid#9959 and included in 0.19.0. Without this, queries like SELECT agg(*) FROM (SELECT * FROM <table> LIMIT N) would fail due to a bug in Druid's planner.

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text:

## Druid
* Support aggregation pushdown for the Druid plugin. ({https://github.com/trinodb/trino/issues/4109}`4109`)
cla-bot cla-bot added cla-signed
github-actions github-actions added druid
conor-mcavoy conor-mcavoy force pushed from 1aecb2a6 to 03485366 24 days ago
conor-mcavoy conor-mcavoy force pushed from 03485366 to 80423a25 23 days ago
conor-mcavoy conor-mcavoy force pushed from 80423a25 to db413629 23 days ago
conor-mcavoy conor-mcavoy force pushed from db413629 to a1de50f4 23 days ago
conor-mcavoy conor-mcavoy force pushed from a1de50f4 to e8ca33fa 23 days ago
conor-mcavoy Add support for aggregation pushdown in Druid connector
7a051fb2
conor-mcavoy conor-mcavoy force pushed from e8ca33fa to 7a051fb2 23 days ago
conor-mcavoy Add Druid config property for controlling approx count distinct behavior
366f131f
conor-mcavoy conor-mcavoy force pushed from 51ba15cf to 6eb0b3e8 22 days ago
github-actions github-actions added docs
conor-mcavoy conor-mcavoy force pushed from 6eb0b3e8 to 719262b1 22 days ago
conor-mcavoy Add docs explaining new Druid aggregate pushdown and related config
084ef228
conor-mcavoy conor-mcavoy force pushed from 719262b1 to 084ef228 22 days ago
conor-mcavoy conor-mcavoy changed the title [WIP] Add support for aggregation pushdown in Druid connector Add support for aggregation pushdown in Druid connector 22 days ago
conor-mcavoy conor-mcavoy marked this pull request as ready for review 22 days ago
Praveen2112
Praveen2112 commented on 2025-05-02
docs/src/main/sphinx/connector/druid.md
78* - Property name
79 - Description
80 - Default value
81
* - ``druid.count-distinct-strategy``
82
- Defines Druid behavior for pushed-down ``DISTINCT`` aggregations by setting (or leaving unset) ``useApproximateCountDistinct``. Must be one of ``DEFAULT``, ``APPROXIMATE``, or ``EXACT``. When ``DEFAULT`` is set, the behavior is left up to the Druid cluster. ``APPROXIMATE`` sets ``useApproximateCountDistinct`` explicitly to ``true``, while ``EXACT`` sets it to ``false``. Note that Druid will reject pushed-down ``DISTINCT`` aggregations unless ``useApproximateCountDistinct=false``. This can either be done on the Trino side by setting this property to ``EXACT``, or it can be done on the Druid cluster.
83
- ``DEFAULT``
Praveen211221 days ago

So if we set the this property to DEFAULT and if the Druid cluster has DEFAULT has useApproximateCountDistinct set to true does it uses approximate computation ?

Praveen211221 days ago

If we are planning to pushdown aggregation then I think we need to forcefully perform exact calculation irrespective of Druid's configuration. Can we add a single config property and session property to toggle whether do we need to perform the count distinct operation or not.

conor-mcavoy21 days ago

So if we set the this property to DEFAULT and if the Druid cluster has DEFAULT has useApproximateCountDistinct set to true does it uses approximate computation ?

Yes. And it would fail on queries that use, for example, sum(distinct) (though this is the same behavior Druid would exhibit natively).

If we are planning to pushdown aggregation then I think we need to forcefully perform exact calculation irrespective of Druid's configuration

We can do that, essentially by always setting useApproximateCountDistinct=false in the connection properties. The reason I worried about this is if someone actually wanted to use approximate count when using the query table function. This would block them from doing so.

Can we add a single config property and session property to toggle whether do we need to perform the count distinct operation or not

Can you explain this more? Do you essentially mean a boolean property to replace the ternary property that is currently in the PR, essentially druid.use-approx-count-distinct=true/false rather than having a "default" option?

plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidConfig.java
Praveen211221 days ago👍 1

Can we have a Test class for this config ?

plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java
678 return Optional.of(new JdbcTypeHandle(Types.NUMERIC, Optional.of("decimal"), Optional.of(decimalType.getPrecision()), Optional.of(decimalType.getScale()), Optional.empty(), Optional.empty()));
679 }
680
681
@Override
682
public Optional<ParameterizedExpression> convertPredicate(ConnectorSession session, ConnectorExpression expression, Map<String, ColumnHandle> assignments)
683
{
684
return connectorExpressionRewriter.rewrite(session, expression, assignments);
685
}
Praveen211221 days ago

This PR focuses only on aggregation pushdown right ? So we could implement this as a follow up

conor-mcavoy21 days ago

Correct, this PR is focusing on aggregation pushdown. A connectorExpressionRewriter must be defined to pass into the new AggregateFunctionRewriter() constructor. My understanding is that the connectorExpressionRewriter is actually used as part of the aggregation pushdown, when calls are made to aggregateFunctionRewriter.rewrite(), for example here in ImplementAvgDecimal.

So if I didn't include this convertPredicate as well, this PR would essentially make it so that expressions are rewritten when aggregation pushdown happens but not at other times. I figured it would be more consistent to rewrite expressions in more general scenarios too.

Let me know your thoughts. Another approach might be to make the connectorExpressionRewriter actually have no rules attached (not even standard rules) so that the behavior is not changing at all - then more rules could be added in the future.

plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestDruidConnectorTest.java
673 assertThat(query("SELECT DISTINCT regionkey, min(nationkey) FROM nation GROUP BY regionkey")).isFullyPushedDown();
674
675 Session withMarkDistinct = Session.builder(getSession())
676
.setSystemProperty(DISTINCT_AGGREGATIONS_STRATEGY, "mark_distinct")
Praveen211221 days ago

Any reason for it to work on this configuration ?

conor-mcavoy21 days ago

I wanted to avoid potential unknown behavior with AUTOMATIC, which is the default distinct aggregations strategy. I noticed this pattern of setting MARK_DISTINCT in other similar tests, so I did the same. I thought perhaps SPLIT_TO_SUBQUERIES in particular could change the plan if Trino decides to use this strategy on the multiple distinct queries, which might fail the test. However I just tested this to be sure and seems like it's not an issue, so I can remove this.

plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestingDruidServer.java
6767 private static final int DRUID_MIDDLE_MANAGER_PORT = 8091;
6868
69 private static final String DRUID_DOCKER_IMAGE = "apache/druid:0.18.0";
69
private static final String DRUID_DOCKER_IMAGE = "apache/druid:0.19.0";
Praveen211221 days ago

Updating druid should be as a prep commit

conor-mcavoy21 days ago

You mean just as an isolated commit at the start of the branch (not a separate PR)? Sure I can do that

chenjian2664
chenjian2664 commented on 2025-05-07
plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestDruidConnectorTest.java
601
602 @Test
603 @Override
604
public void testAggregationPushdown()
chenjian266417 days ago

Why override the test

conor-mcavoy16 days ago

The test in BaseJdbcConnectorTest will fail because it requires create table support. Druid cannot support table creation in this manner.

docs/src/main/sphinx/connector/druid.md
1818To connect to Druid, you need:
1919
20- Druid version 0.18.0 or higher.
20
- Druid version 0.19.0 or higher.
chenjian266417 days ago

Do we need to fully update this, I am wondering that some users may still use lower version?

chenjian266417 days ago

Maybe we could extract BaseDruidConnectorTest, see the BaseMysqlConnectorTest

conor-mcavoy16 days ago

Do we need to fully update this

To support certain types of aggregation pushdown, yes. Because of this issue apache/druid#9949, which is fixed in apache/druid#9959 and included in 0.19.0. Without this, queries like SELECT agg(*) FROM (SELECT * FROM <table> LIMIT N) would fail due to a bug in Druid's planner. Note that this is pretty much the only type of query affected, regular aggregation pushdown would still work on 0.18.0.

Druid 0.19.0 was released in July 2020, 0.18.0 in April of 2020, so these are relatively old versions. I suppose we could add some kind of flag to disable pushdown selectively so that users can still use the older version without this one type of query breaking. But I don't feel that we should be beholden to work around 5-year-old bugs in Druid's query planner.

Maybe we could extract BaseDruidConnectorTest, see the BaseMysqlConnectorTest

TestDruidConnectorTest already extends BaseJdbcConnectorTest, just like BaseMysqlConnectorTest does. What is it about BaseMysqlConnectorTest that you wish to be replicated in the Druid tests? And how does this relate to your previous comment about updating the Druid version?

plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestDruidConnectorTest.java
605 {
606 // count()
607 assertThat(query("SELECT count(*) FROM nation")).isFullyPushedDown();
608
assertThat(query("SELECT count(nationkey) FROM nation")).isFullyPushedDown();
chenjian266417 days ago

Does those aggregation works same with Trino on empty or one row table now?
I saw this is mentioned in #4313

plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestDruidConnectorTest.java
609 assertThat(query("SELECT count(1) FROM nation")).isFullyPushedDown();
610 assertThat(query("SELECT count() FROM nation")).isFullyPushedDown();
611 assertThat(query("SELECT regionkey, count(1) FROM nation GROUP BY regionkey")).isFullyPushedDown();
612
assertThat(query("SELECT regionkey, count(*) FROM nation GROUP BY regionkey")).isFullyPushedDown();
chenjian266417 days ago👍 1

Can we verify the behaviors that covers all types about the druid.count-distinct-strategy?

conor-mcavoy16 days ago

Yes will do. First want to resolve some of the other conversations about how to actually approach this config property, since it may change the implementation.

docs/src/main/sphinx/connector/druid.md
78* - Property name
79 - Description
80 - Default value
81
* - ``druid.count-distinct-strategy``
chenjian266417 days ago

It's just control the computing strategy in Druid or it will also influence the result?

conor-mcavoy16 days ago (edited 16 days ago)

It can influence the result. If Druid uses an approximate distinct strategy, the results could vary from query to query. This would happen if you set APPROXIMATE or if you accepted the DEFAULT and the Druid cluster was configured to use approximate.

I think because of this actually we should probably force EXACT as mentioned above. My worry there is that it could hypothetically affect users of the query table function, if they wanted to use this approximate behavior. I could make EXACT the default and allow users to unset it if they wish?

docs/src/main/sphinx/connector/druid.md
79 - Description
80 - Default value
81* - ``druid.count-distinct-strategy``
82
- Defines Druid behavior for pushed-down ``DISTINCT`` aggregations by setting (or leaving unset) ``useApproximateCountDistinct``. Must be one of ``DEFAULT``, ``APPROXIMATE``, or ``EXACT``. When ``DEFAULT`` is set, the behavior is left up to the Druid cluster. ``APPROXIMATE`` sets ``useApproximateCountDistinct`` explicitly to ``true``, while ``EXACT`` sets it to ``false``. Note that Druid will reject pushed-down ``DISTINCT`` aggregations unless ``useApproximateCountDistinct=false``. This can either be done on the Trino side by setting this property to ``EXACT``, or it can be done on the Druid cluster.
chenjian266417 days ago👍 1

Can we have a test for the "reject case"

plugin/trino-druid/src/test/java/io/trino/plugin/druid/TestDruidConnectorTest.java
699 .build();
700 try {
701 copyAndIngestTpchData(rows, druidServer, caseSensitiveTable);
702
} catch (Exception e) {
chenjian266417 days ago

Why catch it then fail?

conor-mcavoy16 days ago

copyAndIngestTpchData throws IOException and InterruptedException. If these are not caught, they would need to be added to the method signature. But the overridden method, from BaseJdbcConnectorTest, does not throw these exceptions, so we wouldn't be able to override.

Login to write a write a comment.

Login via GitHub

Assignees
No one assigned
Labels
Milestone