-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Add per catalog metastore metrics to QueryStats #26900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add per catalog metastore metrics to QueryStats #26900
Conversation
Reviewer's GuideThis PR extends the metadata API to collect and expose per-catalog metastore call metrics in QueryStats and QueryInfo. It adds SPI methods for listing active catalogs and fetching connector metrics, wraps Hive metastore calls with timing and failure counting, integrates metrics capture into QueryStateMachine on query completion or failure, and updates connectors and tests to support and validate the new metrics field. Class diagram for MeasuredHiveMetastore and metastore metrics integrationclassDiagram
class MeasuredHiveMetastore {
-HiveMetastore delegate
-MetastoreApiCallStats allApiCallsStats
-Map<String, MetastoreApiCallStats> apiCallStats
-Ticker ticker
+Metrics getMetrics()
+<all HiveMetastore methods> (wrapped)
}
class MetastoreApiCallStats {
-TDigest timeNanosDistribution
-long totalTimeNanos
-long totalFailures
+addTime(long)
+addFailure()
+put(ImmutableMap.Builder<String, Metric<?>>, String)
}
class MeasuredMetastoreFactory {
-HiveMetastoreFactory metastoreFactory
+createMetastore(Optional<ConnectorIdentity>)
+isImpersonationEnabled()
}
MeasuredHiveMetastore --> HiveMetastore : delegates
MeasuredHiveMetastore --> MetastoreApiCallStats : uses
MeasuredMetastoreFactory --> MeasuredHiveMetastore : creates
MeasuredMetastoreFactory --> HiveMetastoreFactory : delegates
class HiveMetastore {
<<interface>>
+getMetrics() : Metrics
+<other methods>
}
MeasuredHiveMetastore ..|> HiveMetastore
class Metrics {
+Map<String, Metric<?>> metrics
}
MeasuredHiveMetastore --> Metrics : returns
MetastoreApiCallStats --> Metric : builds
class Metric {
<<interface>>
}
Class diagram for QueryStats and catalogMetadataMetrics fieldclassDiagram
class QueryStats {
+Map<String, Metrics> catalogMetadataMetrics
+getCatalogMetadataMetrics()
+<other fields and methods>
}
QueryStats --> Metrics : contains
class Metrics {
+Map<String, Metric<?>> metrics
}
Metrics --> Metric : contains
class Metric {
<<interface>>
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consolidate the duplicated assertCountMetricExists/assertDistributionMetricExists helpers in BaseHiveConnectorTest and BaseIcebergConnectorTest into a shared test utility to reduce code duplication.
- Centralize the collectCatalogMetadataMetrics invocation in QueryStateMachine (rather than calling it separately in both transitionToFinishing and transitionToFailed) to DRY up the code and ensure consistency.
- Consider refactoring the very large MeasuredHiveMetastore class by extracting the stats‐collection logic into smaller, focused components or utility classes to improve readability and maintainability.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consolidate the duplicated assertCountMetricExists/assertDistributionMetricExists helpers in BaseHiveConnectorTest and BaseIcebergConnectorTest into a shared test utility to reduce code duplication.
- Centralize the collectCatalogMetadataMetrics invocation in QueryStateMachine (rather than calling it separately in both transitionToFinishing and transitionToFailed) to DRY up the code and ensure consistency.
- Consider refactoring the very large MeasuredHiveMetastore class by extracting the stats‐collection logic into smaller, focused components or utility classes to improve readability and maintainability.
## Individual Comments
### Comment 1
<location> `plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java:9225-9226` </location>
<code_context>
assertQuerySucceeds("CALL system.flush_metadata_cache()");
}
+ @Test
+ public void testCatalogMetadataMetrics()
+ {
+ MaterializedResultWithPlan result = getQueryRunner().executeWithPlan(
</code_context>
<issue_to_address>
**suggestion (testing):** Missing test for metrics with multiple catalogs.
Please add a test that runs a query across multiple catalogs to ensure metrics are tracked separately for each.
Suggested implementation:
```java
@Test
public void testCatalogMetadataMetrics()
{
MaterializedResultWithPlan result = getQueryRunner().executeWithPlan(
getSession(),
"SELECT count(*) FROM region r, nation n WHERE r.regionkey = n.regionkey");
Map<String, Metrics> metrics = getCatalogMetadataMetrics(result.queryId());
assertCountMetricExists(metrics, "iceberg", "metastore.all.time.total");
assertDistributionMetricExists(metrics, "iceberg", "metastore.all.time.distribution");
assertCountMetricExists(metrics, "iceberg", "metastore.getTable.time.total");
assertDistributionMetricExists(metrics, "iceberg", "metastore.getTable.time.distribution");
}
@Test
public void testCatalogMetadataMetricsWithMultipleCatalogs()
{
// Assume "iceberg" and "tpch" catalogs are available for testing
MaterializedResultWithPlan result = getQueryRunner().executeWithPlan(
getSession(),
"SELECT count(*) FROM iceberg.region r JOIN tpch.nation n ON r.regionkey = n.regionkey");
Map<String, Metrics> metrics = getCatalogMetadataMetrics(result.queryId());
// Assert metrics for iceberg catalog
assertCountMetricExists(metrics, "iceberg", "metastore.all.time.total");
assertDistributionMetricExists(metrics, "iceberg", "metastore.all.time.distribution");
assertCountMetricExists(metrics, "iceberg", "metastore.getTable.time.total");
assertDistributionMetricExists(metrics, "iceberg", "metastore.getTable.time.distribution");
// Assert metrics for tpch catalog (replace with actual metric names if different)
assertCountMetricExists(metrics, "tpch", "metastore.all.time.total");
assertDistributionMetricExists(metrics, "tpch", "metastore.all.time.distribution");
assertCountMetricExists(metrics, "tpch", "metastore.getTable.time.total");
assertDistributionMetricExists(metrics, "tpch", "metastore.getTable.time.distribution");
}
```
- If the "tpch" catalog does not support the same metrics, adjust the metric names or assertions accordingly.
- Ensure that the catalogs "iceberg" and "tpch" are available and configured in your test environment.
- If you use different catalogs, update the catalog names in the test.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
Show resolved
Hide resolved
There are related CI failures. Moving to draft until I fix it |
57c65e9
to
2b8cd5c
Compare
2b8cd5c
to
0c78e13
Compare
a120650
to
75ab2b8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- The MeasuredHiveMetastore class manually wraps every HiveMetastore method, which leads to a lot of boilerplate; consider using a dynamic proxy or an abstract base wrapper to automatically instrument all methods and reduce duplication.
- The Hive and Iceberg connector tests duplicate the same metric‐assertion logic; extracting the
assertCountMetricExists
andassertDistributionMetricExists
helpers into a shared base test would DRY up the code and centralize metric validation. - Since connectors now rely on the new getMetrics/listActiveCatalogs SPI methods, add a quick check or lint to ensure every connector overrides these (or explicitly opts out) so no catalog is left without metrics by accident.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The MeasuredHiveMetastore class manually wraps every HiveMetastore method, which leads to a lot of boilerplate; consider using a dynamic proxy or an abstract base wrapper to automatically instrument all methods and reduce duplication.
- The Hive and Iceberg connector tests duplicate the same metric‐assertion logic; extracting the `assertCountMetricExists` and `assertDistributionMetricExists` helpers into a shared base test would DRY up the code and centralize metric validation.
- Since connectors now rely on the new getMetrics/listActiveCatalogs SPI methods, add a quick check or lint to ensure every connector overrides these (or explicitly opts out) so no catalog is left without metrics by accident.
## Individual Comments
### Comment 1
<location> `core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java:392-390` </location>
<code_context>
return queryStateMachine;
}
+ private void collectCatalogMetadataMetrics()
+ {
+ // collect the metrics only once. This avoid issue with transaction being removed
+ // after the check but before the metrics collection
+ if (catalogMetadataMetricsCollected.compareAndSet(false, true)) {
+ if (session.getTransactionId().filter(transactionManager::transactionExists).isEmpty()) {
+ // The metrics collection depends on active transaction as the metrics
+ // are stored in the transactional ConnectorMetadata, but the collection can be
+ // run after the query has failed e.g., via cancel.
+ return;
+ }
+
+ ImmutableMap.Builder<String, Metrics> catalogMetadataMetrics = ImmutableMap.builder();
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider handling exceptions during metrics collection to avoid masking query state transitions.
Unexpected exceptions during metrics collection may interrupt query state transitions. Wrapping this logic in a try-catch and logging errors will improve robustness, especially during transitions like finishing or failure.
Suggested implementation:
```java
private void collectCatalogMetadataMetrics()
{
// collect the metrics only once. This avoid issue with transaction being removed
// after the check but before the metrics collection
if (catalogMetadataMetricsCollected.compareAndSet(false, true)) {
if (session.getTransactionId().filter(transactionManager::transactionExists).isEmpty()) {
// The metrics collection depends on active transaction as the metrics
// are stored in the transactional ConnectorMetadata, but the collection can be
// run after the query has failed e.g., via cancel.
return;
}
try {
ImmutableMap.Builder<String, Metrics> catalogMetadataMetrics = ImmutableMap.builder();
for (CatalogInfo activeCatalog : metadata.listActiveCatalogs(session)) {
Metrics metrics = metadata.getMetrics(session, activeCatalog.catalogName());
if (!metrics.getMetrics().isEmpty()) {
catalogMetadataMetrics.put(activeCatalog.catalogName(), metrics);
}
}
this.catalogMetadataMetrics.set(catalogMetadataMetrics.buildOrThrow());
}
catch (Exception e) {
log.error(e, "Error collecting catalog metadata metrics for query %s", queryId);
}
}
}
```
If the logger (`log`) is not already defined in this class, you should add:
```java
private static final Logger log = Logger.get(QueryStateMachine.class);
```
at the top of the class, with the appropriate import:
```java
import io.airlift.log.Logger;
```
</issue_to_address>
### Comment 2
<location> `lib/trino-metastore/src/main/java/io/trino/metastore/MeasuredHiveMetastore.java:46-54` </location>
<code_context>
+ private final HiveMetastore delegate;
+ private final MetastoreApiCallStats allApiCallsStats = new MetastoreApiCallStats();
+ private final Map<String, MetastoreApiCallStats> apiCallStats = new ConcurrentHashMap<>();
+ private final Ticker ticker = Ticker.systemTicker();
+
+ public MeasuredHiveMetastore(HiveMetastore delegate)
</code_context>
<issue_to_address>
**suggestion:** Consider allowing ticker injection for testability.
Injecting the ticker through the constructor would make it easier to test time-dependent logic.
```suggestion
private final HiveMetastore delegate;
private final MetastoreApiCallStats allApiCallsStats = new MetastoreApiCallStats();
private final Map<String, MetastoreApiCallStats> apiCallStats = new ConcurrentHashMap<>();
private final Ticker ticker;
public MeasuredHiveMetastore(HiveMetastore delegate)
{
this(delegate, Ticker.systemTicker());
}
public MeasuredHiveMetastore(HiveMetastore delegate, Ticker ticker)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.ticker = requireNonNull(ticker, "ticker is null");
}
```
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
lib/trino-metastore/src/main/java/io/trino/metastore/MeasuredHiveMetastore.java
Show resolved
Hide resolved
@findepi @raunaqmorarka This is ready for review. There is one CI failure, but it is unrelated. |
test (plugin/trino-lakehouse) this job hanged it contains a couple errors like this
and then
are they related? |
75ab2b8
to
15f1657
Compare
Yes, the impl for the |
core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java
Outdated
Show resolved
Hide resolved
// collect the metrics only once. This avoid issue with transaction being removed | ||
// after the check but before the metrics collection | ||
if (catalogMetadataMetricsCollected.compareAndSet(false, true)) { | ||
if (session.getTransactionId().filter(transactionManager::transactionExists).isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory this is racy. A check in TM for a transaction on this line does not guarantee the TM knows about the transaction during listActiveCatalogs
. Perhaps, the current query is timed out and cleanup happens asyncly.
Perhaps instead we could just validate transaction only once?
List<CatalogInfo> activeCatalogs;
try {
activeCatalogs = metadata.listActiveCatalogs(session);
} catch (NotInTransactionException e) {
// explanation
return;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That won't work, unfortunately, as we need the transaction also to access ConnectorMetadata
that keeps the metrics.
I moved the invocation of the collectCatalogMetadataMetrics()
to the places where only a single thread can reach before the transaction is committed or aborted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this if
?
i realized there is already a catch (NotInTransactionExcept
below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just added the catch NotInTransactionExcept
yesterday. It was not there before. Now, the if
is not strictly necessary, but since this method may be called without a transaction running, and it is not exceptional flow (e.g.,START TRANSACTION
statement does not itself have a transaction, or async updateQueryInfo
that can happen after the query is finished), I thought I would leave it in place.
I'm fine dropping it as well.
{ | ||
queryStateTimer.endQuery(); | ||
|
||
collectCatalogMetadataMetrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
esp in case of query failure we could want to be defensive, and suppress exceptions coming from here.
the easiest way would be to put the metrics collection code inside cleanupQuery
.
are there any ordering requirements between metadata.cleanupQuery
and metadata.getMetrics
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be no exception thrown here except for a bug. IMO, it is better to fail the query in that case, but since this is connector-specific, I understand the urge to be defensive here. Will add try catch,
There is no ordering requirement with cleanupQuery
directly but cleanupQuery
can be executed by multiple threads, so it is not a good place for the collectCatalogMetadataMetrics
unless I move it after the
QueryState oldState = queryState.trySet(FAILED);
if (oldState.isDone()) {
if (log) {
QUERY_STATE_LOG.debug(throwable, "Failure after query %s finished", queryId);
}
return false;
}
core/trino-main/src/test/java/io/trino/execution/TestQueryStateMachine.java
Show resolved
Hide resolved
...rino-tests/src/test/java/io/trino/connector/system/metadata/TestSystemMetadataConnector.java
Outdated
Show resolved
Hide resolved
lib/trino-metastore/src/main/java/io/trino/metastore/MeasuredHiveMetastore.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
public synchronized void put(ImmutableMap.Builder<String, Metric<?>> metrics, String prefix) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
DistributionSnapshot distributionSnapshot;
long totalTimeNanos;
long totalFailures;
synchronized (this) {
// DistributionSnapshot does not retain reference to the histogram
distributionSnapshot = new DistributionSnapshot(new TDigestHistogram(timeNanosDistribution));
totalTimeNanos = this.totalTimeNanos;
totalFailures = this.totalFailures;
}
perhaps this is not needed because collecting metrics will generally not coincide with mestore comments
feel free to dismiss, but then add explanatory comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, I don't get it. What is this change about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reducing synchronization scope
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will leave it as is, as the ImmutableMap.Builder.put
is pretty simple, so the synchronization scope would not be reduced much + there should not be a lot of contention here anyway, as in normal operation, the metrics updates and the metrics collections happen at different times.
I added a comment about DistributionSnapshot
lib/trino-metastore/src/main/java/io/trino/metastore/MeasuredHiveMetastore.java
Show resolved
Hide resolved
lib/trino-metastore/src/main/java/io/trino/metastore/MeasuredHiveMetastore.java
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java
Outdated
Show resolved
Hide resolved
15f1657
to
c3d6dcb
Compare
Metadata and QueryStateMachine must use the same `TransactionManager` instance.
c3d6dcb
to
8be4339
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @findepi ! I addressed the comments
// collect the metrics only once. This avoid issue with transaction being removed | ||
// after the check but before the metrics collection | ||
if (catalogMetadataMetricsCollected.compareAndSet(false, true)) { | ||
if (session.getTransactionId().filter(transactionManager::transactionExists).isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That won't work, unfortunately, as we need the transaction also to access ConnectorMetadata
that keeps the metrics.
I moved the invocation of the collectCatalogMetadataMetrics()
to the places where only a single thread can reach before the transaction is committed or aborted.
{ | ||
queryStateTimer.endQuery(); | ||
|
||
collectCatalogMetadataMetrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be no exception thrown here except for a bug. IMO, it is better to fail the query in that case, but since this is connector-specific, I understand the urge to be defensive here. Will add try catch,
There is no ordering requirement with cleanupQuery
directly but cleanupQuery
can be executed by multiple threads, so it is not a good place for the collectCatalogMetadataMetrics
unless I move it after the
QueryState oldState = queryState.trySet(FAILED);
if (oldState.isDone()) {
if (log) {
QUERY_STATE_LOG.debug(throwable, "Failure after query %s finished", queryId);
}
return false;
}
8be4339
to
879926b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed some comments
lib/trino-metastore/src/main/java/io/trino/metastore/MeasuredHiveMetastore.java
Show resolved
Hide resolved
} | ||
|
||
public synchronized void put(ImmutableMap.Builder<String, Metric<?>> metrics, String prefix) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, I don't get it. What is this change about?
lib/trino-metastore/src/main/java/io/trino/metastore/MeasuredHiveMetastore.java
Show resolved
Hide resolved
lib/trino-metastore/src/main/java/io/trino/metastore/MeasuredHiveMetastore.java
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java
Outdated
Show resolved
Hide resolved
879926b
to
19f603d
Compare
a4aa981
to
a5d9fe1
Compare
@findepi @raunaqmorarka There were issues with collecting the metadata metrics concurrently with updating the final query info. The query info could be triggered asynchronously, and thus, missing the collected metrics. To fix this, I added a metadata metrics collection before every query stats collection. This also makes the metadata metrics available before the query is done, which is an additional benefit. |
// collect the metrics only once. This avoid issue with transaction being removed | ||
// after the check but before the metrics collection | ||
if (catalogMetadataMetricsCollected.compareAndSet(false, true)) { | ||
if (session.getTransactionId().filter(transactionManager::transactionExists).isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this if
?
i realized there is already a catch (NotInTransactionExcept
below
this.catalogMetadataMetrics.set(catalogMetadataMetrics.buildOrThrow()); | ||
} | ||
catch (NotInTransactionException e) { | ||
// ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain why this should be ignored, ie what are the legit cases where this can happen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment added
The goal is to expose in QueryStats, per catalog, connector-specific metrics like metastore api call stats.
a44749e
to
c0fddfe
Compare
c0fddfe
to
10d41ae
Compare
Description
Slow metastore can be a root cause of slow analysis or planning. This adds explicit metrics to the
QueryStats
with remote metastore call stats made for a given query.This is what this looks like in the query.json:
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( X) Release notes are required, with the following suggested text:
Summary by Sourcery
Add per-catalog metastore metrics to QueryStats by extending the Metadata API and collecting metrics from each active catalog on query completion or failure, instrument connectors with MeasuredHiveMetastore, and verify behavior with new connector tests.
New Features:
Enhancements:
Build:
Tests:
Summary by Sourcery
Add per-catalog metastore metrics tracking to QueryStats by instrumenting connectors and extending Metadata APIs to gather and report connector-specific metadata call metrics on query finish or failure.
New Features:
Enhancements:
Build:
Tests: