78 | * - Property name | ||
79 | - Description | ||
80 | - Default value | ||
81 | * - ``druid.count-distinct-strategy`` | ||
82 | - Defines Druid behavior for pushed-down ``DISTINCT`` aggregations by setting (or leaving unset) ``useApproximateCountDistinct``. Must be one of ``DEFAULT``, ``APPROXIMATE``, or ``EXACT``. When ``DEFAULT`` is set, the behavior is left up to the Druid cluster. ``APPROXIMATE`` sets ``useApproximateCountDistinct`` explicitly to ``true``, while ``EXACT`` sets it to ``false``. Note that Druid will reject pushed-down ``DISTINCT`` aggregations unless ``useApproximateCountDistinct=false``. This can either be done on the Trino side by setting this property to ``EXACT``, or it can be done on the Druid cluster. | ||
83 | - ``DEFAULT`` |
So if we set the this property to DEFAULT
and if the Druid cluster has DEFAULT
has useApproximateCountDistinct
set to true does it uses approximate computation ?
If we are planning to pushdown aggregation then I think we need to forcefully perform exact calculation irrespective of Druid's configuration. Can we add a single config property and session property to toggle whether do we need to perform the count distinct operation or not.
So if we set the this property to DEFAULT and if the Druid cluster has DEFAULT has useApproximateCountDistinct set to true does it uses approximate computation ?
Yes. And it would fail on queries that use, for example, sum(distinct)
(though this is the same behavior Druid would exhibit natively).
If we are planning to pushdown aggregation then I think we need to forcefully perform exact calculation irrespective of Druid's configuration
We can do that, essentially by always setting useApproximateCountDistinct=false in the connection properties. The reason I worried about this is if someone actually wanted to use approximate count when using the query
table function. This would block them from doing so.
Can we add a single config property and session property to toggle whether do we need to perform the count distinct operation or not
Can you explain this more? Do you essentially mean a boolean property to replace the ternary property that is currently in the PR, essentially druid.use-approx-count-distinct=true/false
rather than having a "default" option?
Can we have a Test class for this config ?
678 | return Optional.of(new JdbcTypeHandle(Types.NUMERIC, Optional.of("decimal"), Optional.of(decimalType.getPrecision()), Optional.of(decimalType.getScale()), Optional.empty(), Optional.empty())); | ||
679 | } | ||
680 | |||
681 | @Override | ||
682 | public Optional<ParameterizedExpression> convertPredicate(ConnectorSession session, ConnectorExpression expression, Map<String, ColumnHandle> assignments) | ||
683 | { | ||
684 | return connectorExpressionRewriter.rewrite(session, expression, assignments); | ||
685 | } |
This PR focuses only on aggregation pushdown right ? So we could implement this as a follow up
Correct, this PR is focusing on aggregation pushdown. A connectorExpressionRewriter
must be defined to pass into the new AggregateFunctionRewriter()
constructor. My understanding is that the connectorExpressionRewriter
is actually used as part of the aggregation pushdown, when calls are made to aggregateFunctionRewriter.rewrite()
, for example here in ImplementAvgDecimal
.
So if I didn't include this convertPredicate
as well, this PR would essentially make it so that expressions are rewritten when aggregation pushdown happens but not at other times. I figured it would be more consistent to rewrite expressions in more general scenarios too.
Let me know your thoughts. Another approach might be to make the connectorExpressionRewriter
actually have no rules attached (not even standard rules) so that the behavior is not changing at all - then more rules could be added in the future.
673 | assertThat(query("SELECT DISTINCT regionkey, min(nationkey) FROM nation GROUP BY regionkey")).isFullyPushedDown(); | ||
674 | |||
675 | Session withMarkDistinct = Session.builder(getSession()) | ||
676 | .setSystemProperty(DISTINCT_AGGREGATIONS_STRATEGY, "mark_distinct") |
Any reason for it to work on this configuration ?
I wanted to avoid potential unknown behavior with AUTOMATIC, which is the default distinct aggregations strategy. I noticed this pattern of setting MARK_DISTINCT in other similar tests, so I did the same. I thought perhaps SPLIT_TO_SUBQUERIES in particular could change the plan if Trino decides to use this strategy on the multiple distinct
queries, which might fail the test. However I just tested this to be sure and seems like it's not an issue, so I can remove this.
67 | 67 | private static final int DRUID_MIDDLE_MANAGER_PORT = 8091; | |
68 | 68 | ||
69 | private static final String DRUID_DOCKER_IMAGE = "apache/druid:0.18.0"; | ||
69 | private static final String DRUID_DOCKER_IMAGE = "apache/druid:0.19.0"; |
Updating druid should be as a prep commit
You mean just as an isolated commit at the start of the branch (not a separate PR)? Sure I can do that
601 | |||
602 | @Test | ||
603 | @Override | ||
604 | public void testAggregationPushdown() |
Why override the test
The test in BaseJdbcConnectorTest will fail because it requires create table
support. Druid cannot support table creation in this manner.
18 | 18 | To connect to Druid, you need: | |
19 | 19 | ||
20 | - Druid version 0.18.0 or higher. | ||
20 | - Druid version 0.19.0 or higher. |
Do we need to fully update this, I am wondering that some users may still use lower version?
Maybe we could extract BaseDruidConnectorTest
, see the BaseMysqlConnectorTest
Do we need to fully update this
To support certain types of aggregation pushdown, yes. Because of this issue apache/druid#9949, which is fixed in apache/druid#9959 and included in 0.19.0. Without this, queries like SELECT agg(*) FROM (SELECT * FROM <table> LIMIT N)
would fail due to a bug in Druid's planner. Note that this is pretty much the only type of query affected, regular aggregation pushdown would still work on 0.18.0.
Druid 0.19.0 was released in July 2020, 0.18.0 in April of 2020, so these are relatively old versions. I suppose we could add some kind of flag to disable pushdown selectively so that users can still use the older version without this one type of query breaking. But I don't feel that we should be beholden to work around 5-year-old bugs in Druid's query planner.
Maybe we could extract
BaseDruidConnectorTest
, see theBaseMysqlConnectorTest
TestDruidConnectorTest
already extends BaseJdbcConnectorTest
, just like BaseMysqlConnectorTest
does. What is it about BaseMysqlConnectorTest
that you wish to be replicated in the Druid tests? And how does this relate to your previous comment about updating the Druid version?
605 | { | ||
606 | // count() | ||
607 | assertThat(query("SELECT count(*) FROM nation")).isFullyPushedDown(); | ||
608 | assertThat(query("SELECT count(nationkey) FROM nation")).isFullyPushedDown(); |
Does those aggregation works same with Trino on empty or one row table now?
I saw this is mentioned in #4313
609 | assertThat(query("SELECT count(1) FROM nation")).isFullyPushedDown(); | ||
610 | assertThat(query("SELECT count() FROM nation")).isFullyPushedDown(); | ||
611 | assertThat(query("SELECT regionkey, count(1) FROM nation GROUP BY regionkey")).isFullyPushedDown(); | ||
612 | assertThat(query("SELECT regionkey, count(*) FROM nation GROUP BY regionkey")).isFullyPushedDown(); |
Can we verify the behaviors that covers all types about the druid.count-distinct-strategy
?
Yes will do. First want to resolve some of the other conversations about how to actually approach this config property, since it may change the implementation.
78 | * - Property name | ||
79 | - Description | ||
80 | - Default value | ||
81 | * - ``druid.count-distinct-strategy`` |
It's just control the computing strategy in Druid or it will also influence the result?
It can influence the result. If Druid uses an approximate distinct strategy, the results could vary from query to query. This would happen if you set APPROXIMATE or if you accepted the DEFAULT and the Druid cluster was configured to use approximate.
I think because of this actually we should probably force EXACT as mentioned above. My worry there is that it could hypothetically affect users of the query
table function, if they wanted to use this approximate behavior. I could make EXACT the default and allow users to unset it if they wish?
79 | - Description | ||
80 | - Default value | ||
81 | * - ``druid.count-distinct-strategy`` | ||
82 | - Defines Druid behavior for pushed-down ``DISTINCT`` aggregations by setting (or leaving unset) ``useApproximateCountDistinct``. Must be one of ``DEFAULT``, ``APPROXIMATE``, or ``EXACT``. When ``DEFAULT`` is set, the behavior is left up to the Druid cluster. ``APPROXIMATE`` sets ``useApproximateCountDistinct`` explicitly to ``true``, while ``EXACT`` sets it to ``false``. Note that Druid will reject pushed-down ``DISTINCT`` aggregations unless ``useApproximateCountDistinct=false``. This can either be done on the Trino side by setting this property to ``EXACT``, or it can be done on the Druid cluster. |
Can we have a test for the "reject case"
699 | .build(); | ||
700 | try { | ||
701 | copyAndIngestTpchData(rows, druidServer, caseSensitiveTable); | ||
702 | } catch (Exception e) { |
Why catch it then fail?
copyAndIngestTpchData
throws IOException
and InterruptedException
. If these are not caught, they would need to be added to the method signature. But the overridden method, from BaseJdbcConnectorTest
, does not throw these exceptions, so we wouldn't be able to override.
Login to write a write a comment.
Description
This change adds support for aggregation pushdown in the Druid connector for some common aggregate functions, namely min/max, avg, sum, count, and variations of these. Actually adding this support requires little more than boilerplate, as the standard "Implements*"
AggregateFunctionRule
s can be used for most functions. One special case,count(distinct)
, requires a new property for best support.New tests are created for these changes in
TestDruidConnectorTest
. They are largely similar to tests inBaseJdbcConnectorTest
, but those tests can't always be reused for Druid due to Druid's lack of support for table create statements.See #4313 for a previous attempt. This fixes #4109.
Additional context and related issues
For efficient queries, Trino should "push down" aggregation functions into Druid when running queries on the Trino-Druid connector. This means that, for a query like
SELECT sum(<col>) FROM druid.druid_table
, Druid will actually perform the sum itself. There are generic classes already available to take care of this for JDBC connectors within Trino, see for example theio.trino.plugin.base.aggregation
package. This PR largely just makes use of those existing classes to indicate which functions can be pushed down. Druid does not have functions that correspond to Trino's functions for covariance, correlation, and regression, and Druid's standard deviation and variance functions require an extension, so these functions are not supported for pushdown.One special case is aggregations containing
distinct
, such ascount(distinct)
. Druid has a propertyuseApproximateCountDistinct
that controls its behavior when running such functions. Essentially, ifuseApproximateCountDistinct=true
, Druid will use an approximate cardinality algorithm to calculatecount(distinct)
, presumably similar to theapprox_distinct()
function in Trino, whereas ifuseApproximateCountDistinct=false
Druid will use an exact count. Druid rejects queries likesum(distinct)
whenuseApproximateCountDistinct=true
. To handle all this, this PR has added a propertydruid.count-distinct-strategy
, with optionsDEFAULT
,APPROXIMATE
, andEXACT
. WhenDEFAULT
is selected, Trino does not set the property, leaving it up to Druid (Druid clusters can enforce this on their own end).APPROXIMATE
corresponds touseApproximateCountDistinct=true
, andEXACT
touseApproximateCountDistinct=false
. UsingEXACT
will produce technically fully correct aggregations.A battery of Druid-specific aggregation pushdown tests is included in this PR. These are similar to the tests in
BaseJdbcConnectorTest
, but theBaseJdbcConnectorTest
tests cannot be directly inherited by Druid because Druid does not support table create statements.The Druid Docker image version had to be updated from 0.18.0 due to this issue, which is fixed in apache/druid#9959 and included in 0.19.0. Without this, queries like
SELECT agg(*) FROM (SELECT * FROM <table> LIMIT N)
would fail due to a bug in Druid's planner.Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: