trino
Add io.trino.spi.connector.ConnectorSplitSource#getMetrics
#25770
Merged

Add io.trino.spi.connector.ConnectorSplitSource#getMetrics #25770

raunaqmorarka merged 7 commits into master from split-source-metrics
raunaqmorarka
raunaqmorarka12 days ago (edited 9 days ago)👍 1

Description

Add new SPI io.trino.spi.connector.ConnectorSplitSource#getMetrics
This allows connectors to expose detailed metrics from splits generation.
EXPLAIN ANALYZE VERBOSE is modified to print these metrics.
Added an implementation in iceberg to collect metrics from iceberg metadata scan.
Example output

ScanFilterProject[table = iceberg:default.lineitem$data@5213842060806047048, filterPredicate = (l_orderkey = bigint '3423110')]
    connector metrics:
      'ParquetReaderCompressionFormat_ZSTD' = LongCount{total=9808205}
    splits generation metrics:
      'dataFileSizeBytes' = LongCount{total=394528676}
      'dataFiles' = LongCount{total=8}
      'dataManifests' = LongCount{total=1}
      'deleteFileSizeBytes' = LongCount{total=2370}
      'deleteManifests' = LongCount{total=1}
      'equalityDeleteFiles' = LongCount{total=0}
      'positionalDeleteFiles' = LongCount{total=2}
      'scanPlanningDuration' = {duration=7.00ms}

Remove SplitOperatorInfo and ConnectorSplit#getSplitInfo, this
avoids the need to send splits info from workers to coordinator.

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text:

## Iceberg
* Show detailed metrics from splits generation in output of EXPLAIN ANALYZE VERBOSE. ({issue}`25770`)
cla-bot cla-bot added cla-signed
github-actions github-actions added hudi
github-actions github-actions added iceberg
github-actions github-actions added delta-lake
github-actions github-actions added hive
github-actions github-actions added bigquery
github-actions github-actions added mongodb
github-actions github-actions added cassandra
github-actions github-actions added blackhole
github-actions github-actions added elasticsearch
github-actions github-actions added google-sheets
github-actions github-actions added kafka
github-actions github-actions added memory
github-actions github-actions added opensearch
github-actions github-actions added pinot
github-actions github-actions added prometheus
github-actions github-actions added redis
github-actions github-actions added redshift
raunaqmorarka raunaqmorarka requested a review from losipiuk losipiuk 12 days ago
raunaqmorarka raunaqmorarka removed review request from losipiuk losipiuk 12 days ago
raunaqmorarka raunaqmorarka requested a review from wendigo wendigo 12 days ago
raunaqmorarka raunaqmorarka requested a review from losipiuk losipiuk 12 days ago
raunaqmorarka raunaqmorarka requested a review from copilot-pull-request-reviewer copilot-pull-request-reviewer 12 days ago
raunaqmorarka raunaqmorarka requested a review from ebyhr ebyhr 12 days ago
raunaqmorarka raunaqmorarka requested a review from lukasz-stec lukasz-stec 12 days ago
copilot-pull-request-reviewer
copilot-pull-request-reviewer commented on 2025-05-12
copilot-pull-request-reviewer12 days ago

Pull Request Overview

This PR adds a new SPI method, ConnectorSplitSource#getMetrics, to capture detailed metrics during splits generation and integrates these metrics into various scheduling and execution components. Key changes include:

  • Propagation of an optional source identifier (sourceId) in OperatorStats and OperatorContext.
  • Replacement of timing callbacks with a new SplitSourceMetricsRecorder across multiple scheduling/execution stages.
  • Removal of SplitOperatorInfo and a shift from custom formatting of split info to using default toString() methods.

Reviewed Changes

Copilot reviewed 91 out of 91 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java Updated operator context instantiation to include sourceId.
core/trino-main/src/main/java/io/trino/operator/OperatorStats.java Introduced sourceId, added getter, and implemented addConnectorSplitSourceMetrics for merging metrics.
core/trino-main/src/main/java/io/trino/operator/OperatorInfo.java Removed obsolete SplitOperatorInfo mapping.
core/trino-main/src/main/java/io/trino/operator/OperatorContext.java Updated constructor and field to include sourceId.
core/trino-main/src/main/java/io/trino/operator/DriverContext.java Added overloaded addOperatorContext accepting sourceId.
core/trino-main/src/main/java/io/trino/metadata/Split.java Removed getInfo method.
core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/* Changed split time recorder to use metricsRecorder.
core/trino-main/src/main/java/io/trino/execution/* Updated methods to record and propagate split source metrics.
core/trino-main/src/main/java/io/trino/connector/* Removed or updated split info formatting methods.
Comments suppressed due to low confidence (1)

core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java:897

  • [nitpick] Consider verifying that the default toString() implementation of the split provides sufficient detail for debugging purposes, or implement a custom formatting method if more clarity is required.
return (partitionedSplit == null) ? "" : partitionedSplit.getSplit().toString();
Conversation is marked as resolved
Show resolved
core/trino-main/src/main/java/io/trino/operator/OperatorStats.java
781789 blockedReason,
782790 info);
783791 }
792
copilot-pull-request-reviewer12 days ago

[nitpick] Consider adding JavaDoc for the new method addConnectorSplitSourceMetrics to clarify how it merges connector metrics with existing metrics.

Suggested change
/**
* Creates a new {@code OperatorStats} instance with updated connector metrics.
* <p>
* This method merges the provided {@code splitSourceMetrics} with the existing
* {@code connectorMetrics} using the {@code mergeWith} method.
*
* @param splitSourceMetrics the metrics to merge with the existing connector metrics
* @return a new {@code OperatorStats} instance with the updated connector metrics
*/
raunaqmorarka raunaqmorarka requested a review 12 days ago
raunaqmorarka raunaqmorarka force pushed from d006c131 to 2035dc47 12 days ago
raunaqmorarka raunaqmorarka force pushed from 2035dc47 to ffca7c7b 12 days ago
raunaqmorarka raunaqmorarka force pushed from ffca7c7b to 1902d673 12 days ago
wendigo
wendigo commented on 2025-05-14
Conversation is marked as resolved
Show resolved
core/trino-main/src/main/java/io/trino/split/SplitSource.java
41 /**
42 * Returns the split source's metrics, mapping a metric ID to its latest value.
43 * Each call must return an immutable snapshot of available metrics.
44
* The same ID metrics are merged across all tasks and exposed via OperatorStats.
wendigo10 days ago

Id

wendigo
wendigo commented on 2025-05-14
Conversation is marked as resolved
Show resolved
core/trino-main/src/main/java/io/trino/split/SplitSource.java
42 * Returns the split source's metrics, mapping a metric ID to its latest value.
43 * Each call must return an immutable snapshot of available metrics.
44 * The same ID metrics are merged across all tasks and exposed via OperatorStats.
45
* This method can be called after the split source is closed.
wendigo10 days ago (edited 10 days ago)

This method can be called after the split source is closed and in that case last split metrics are returned.

wendigo
wendigo commented on 2025-05-14
Conversation is marked as resolved
Show resolved
core/trino-main/src/main/java/io/trino/sql/planner/planprinter/TextRenderer.java
189 return;
190 }
191 Metrics metrics = node.getSplitSourceMetrics();
192
printMetrics(output, "connector splits generation metrics:", metrics.getMetrics());
wendigo10 days ago

just: splits generation metrics:

wendigo
wendigo commented on 2025-05-14
Conversation is marked as resolved
Show resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitSource.java
57 * Returns the connector's metrics, mapping a metric ID to its latest value.
58 * Each call must return an immutable snapshot of available metrics.
59 * The same ID metrics are merged across all tasks and exposed via OperatorStats.
60
* This method can be called after the split source is closed.
wendigo10 days ago

and in that case last split metrics are returned.

wendigo
wendigo commented on 2025-05-14
Conversation is marked as resolved
Show resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java
118121 }
119122
120 private Scan<?, FileScanTask, CombinedScanTask> getScan(IcebergMetadata icebergMetadata, Table icebergTable, IcebergTableHandle table, ExecutorService executor)
123
private Scan<?, FileScanTask, CombinedScanTask> getScan(IcebergMetadata icebergMetadata, Table icebergTable, IcebergTableHandle table, InMemoryMetricsReporter metricsReporter, ExecutorService executor)
wendigo10 days ago

MetricsReporter (interface)

wendigo
wendigo commented on 2025-05-14
Conversation is marked as resolved
Show resolved
core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java
784 Map<PlanNodeId, Metrics> splitSourceMetrics = stageInfo.getStageStats().getSplitSourceMetrics();
785 for (OperatorStats stats : operatorStats) {
786 if (stats.getSourceId().isPresent() &&
787
(stats.getOperatorType().equals(TableScanOperator.class.getSimpleName()) ||
wendigo10 days ago

can we extract this condition to some static method ? (isScanOperator)

wendigo10 days ago

or can we check this unconditionally? If I understand correctly for non-scan operators, splitSourceMetrics.get(stats.getSourceId().get()) will be null

wendigo
wendigo commented on 2025-05-14
Conversation is marked as resolved
Show resolved
core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java
788 stats.getOperatorType().equals(ScanFilterAndProjectOperator.class.getSimpleName()))) {
789 Metrics metrics = splitSourceMetrics.get(stats.getSourceId().get());
790 if (metrics != null) {
791
stats = stats.addConnectorSplitSourceMetrics(metrics);
wendigo10 days ago

nit: withConnectorSplitSourceMetrics

wendigo
wendigo commented on 2025-05-14
Conversation is marked as resolved
Show resolved
core/trino-main/src/main/java/io/trino/operator/OperatorStats.java
782795 info);
783796 }
797
798
public OperatorStats addConnectorSplitSourceMetrics(Metrics splitSourceMetrics)
wendigo10 days ago

it's not add since you are returning a new instance: withConnectorSplitSourceMetrics is more appropriate

wendigo
wendigo approved these changes on 2025-05-14
wendigo
wendigo10 days ago

Overall LGTM

raunaqmorarka Add io.trino.spi.connector.ConnectorSplitSource#getMetrics
650fe697
raunaqmorarka Implement split source metrics for iceberg
521b2900
raunaqmorarka Propagate split source metrics to QueryStats
e6325c33
raunaqmorarka Use split source metrics in redshift tests
1ebb011d
raunaqmorarka Remove SplitOperatorInfo
bc5e7c4e
raunaqmorarka Remove unused method io.trino.operator.OperatorStats#add
5fffde34
raunaqmorarka Deprecate io.trino.spi.connector.ConnectorSplit#getSplitInfo
732cc7b0
raunaqmorarka raunaqmorarka force pushed from 1902d673 to 732cc7b0 10 days ago
raunaqmorarka raunaqmorarka requested a review from copilot-pull-request-reviewer copilot-pull-request-reviewer 10 days ago
copilot-pull-request-reviewer
copilot-pull-request-reviewer commented on 2025-05-15
copilot-pull-request-reviewer10 days ago

Pull Request Overview

This PR adds a new SPI method, ConnectorSplitSource#getMetrics, to allow connectors to expose detailed metrics from splits generation and update various components to record and display these metrics. Key changes include:

  • Injecting an Optional sourceId into operator contexts and stats to support metric recording.
  • Replacing the existing split time recording with a metrics‐based approach throughout scheduling and execution.
  • Removing SplitOperatorInfo and updating split info formatting.

Reviewed Changes

Copilot reviewed 91 out of 91 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
ScanFilterAndProjectOperator.java Updated operator context creation to pass Optional sourceId.
OperatorStats.java Added sourceId field and withConnectorSplitSourceMetrics method to merge connector split metrics.
OperatorInfo.java Removed deprecated SplitOperatorInfo registration.
OperatorContext.java Injected Optional sourceId into constructors.
DriverContext.java Added overload to support Optional sourceId when adding an operator context.
Split.java Removed getInfo() method to deprecate split info representation.
SplitSourceMetricsRecorder.java Introduced new interface for recording metrics.
EventDrivenTaskSourceFactory.java, EventDrivenTaskSource.java Replaced BiConsumer time recorder with a metrics recorder, updating corresponding references.
StageExecution.java, SourcePartitionedScheduler.java, PipelinedStageExecution.java, StageStats.java, StageStateMachine.java, SqlTaskExecution.java, SqlStage.java, QueryStateMachine.java Updated methods to record and propagate split source metrics.
SystemSplit.java, InformationSchemaSplit.java Changed the split info formatting to rely on toString().
Comments suppressed due to low confidence (1)

core/trino-main/src/main/java/io/trino/execution/scheduler/StageStateMachine.java:753

  • The current implementation overwrites split source metrics for a given node; if multiple updates are expected, consider merging the metrics instead of replacing them.
splitSourceMetrics.put(nodeId, metrics);
Conversation is marked as resolved
Show resolved
core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java
896893 public String getInfo()
897894 {
898 return (partitionedSplit == null) ? "" : formatSplitInfo(partitionedSplit.getSplit());
895
return (partitionedSplit == null) ? "" : partitionedSplit.getSplit().toString();
copilot-pull-request-reviewer10 days ago

[nitpick] Using toString() for split information may reduce readability compared to the previously formatted output; consider preserving or enhancing formatting for clearer diagnostics.

Conversation is marked as resolved
Show resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSource.java
289289 long start = System.nanoTime();
290290 future = Optional.of(new CallbackProxyFuture<>(Futures.transform(splitSource.getNextBatch(splitBatchSize), batch -> {
291 getSplitTimeRecorder.accept(planNodeId, start);
291
metricsRecorder.record(planNodeId, splitSource.getMetrics(), start);
copilot-pull-request-reviewer10 days ago

[nitpick] Ensure that the behavior of splitSource.getMetrics() is consistent with the intended metrics recording, given that IdempotentSplitSource.getMetrics() currently returns Metrics.EMPTY.

raunaqmorarka raunaqmorarka merged d45b32e6 into master 9 days ago
raunaqmorarka raunaqmorarka deleted the split-source-metrics branch 9 days ago
github-actions github-actions added this to the 476 milestone 9 days ago

Login to write a write a comment.

Login via GitHub

Assignees
No one assigned
Labels
Milestone