Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -323,18 +323,42 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS)
private int metricAnalysisTimeout;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_URI,
DefaultValue = EMPTY_STRING)
private String metricUri;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ACCOUNT_NAME,
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRICS_ACCOUNT_NAME,
DefaultValue = EMPTY_STRING)
private String metricAccount;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ACCOUNT_KEY,
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRICS_ACCOUNT_KEY,
DefaultValue = EMPTY_STRING)
private String metricAccountKey;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRICS_COLLECTION_ENABLED,
DefaultValue = DEFAULT_METRICS_COLLECTION_ENABLED)
private boolean metricsCollectionEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRICS_SHOULD_EMIT_ON_IDLE_TIME,
DefaultValue = DEFAULT_METRICS_SHOULD_EMIT_ON_IDLE_TIME)
private boolean shouldEmitMetricsOnIdleTime;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRICS_EMIT_THRESHOLD,
DefaultValue = DEFAULT_METRICS_EMIT_THRESHOLD)
private long metricsEmitThreshold;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRICS_EMIT_THRESHOLD_INTERVAL_SECS,
DefaultValue = DEFAULT_METRICS_EMIT_THRESHOLD_INTERVAL_SECS)
private long metricsEmitThresholdIntervalInSecs;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRICS_EMIT_INTERVAL_MINS,
DefaultValue = DEFAULT_METRICS_EMIT_INTERVAL_MINS)
private long metricsEmitIntervalInMins;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRICS_MAX_CALLS_PER_SECOND,
DefaultValue = DEFAULT_METRICS_MAX_CALLS_PER_SECOND)
private int maxMetricsCallsPerSecond;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRICS_BACKOFF_RETRY_ENABLED,
DefaultValue = DEFAULT_METRICS_BACKOFF_RETRY_ENABLED)
private boolean backoffRetryMetricsEnabled;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
private int accountOperationIdleTimeout;
Expand Down Expand Up @@ -1294,10 +1318,6 @@ public int getMetricAnalysisTimeout() {
return this.metricAnalysisTimeout;
}

public String getMetricUri() {
return metricUri;
}

public String getMetricAccount() {
return metricAccount;
}
Expand All @@ -1306,6 +1326,34 @@ public String getMetricAccountKey() {
return metricAccountKey;
}

public boolean isMetricsCollectionEnabled() {
return metricsCollectionEnabled;
}

public boolean shouldEmitMetricsOnIdleTime() {
return shouldEmitMetricsOnIdleTime;
}

public long getMetricsEmitThreshold() {
return metricsEmitThreshold;
}

public long getMetricsEmitIntervalInMins() {
return metricsEmitIntervalInMins;
}

public long getMetricsEmitThresholdIntervalInSecs() {
return metricsEmitThresholdIntervalInSecs;
}

public int getMaxMetricsCallsPerSecond() {
return maxMetricsCallsPerSecond;
}

public boolean isBackoffRetryMetricsEnabled() {
return backoffRetryMetricsEnabled;
}

public int getAccountOperationIdleTimeout() {
return accountOperationIdleTimeout;
}
Expand Down Expand Up @@ -1394,7 +1442,7 @@ public TracingHeaderFormat getTracingHeaderFormat() {
}

public MetricFormat getMetricFormat() {
return getEnum(FS_AZURE_METRIC_FORMAT, MetricFormat.EMPTY);
return getEnum(FS_AZURE_METRICS_FORMAT, MetricFormat.INTERNAL_METRIC_FORMAT);
}

public AuthType getAuthType(String accountName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SERVER_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.WRITE_THROTTLES;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_NUMBER_OF_REQUESTS;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
import static org.apache.hadoop.util.Time.now;
Expand Down Expand Up @@ -201,23 +202,39 @@ public void initializeWriteResourceUtilizationMetrics() {


@Override
public void initializeMetrics(MetricFormat metricFormat) {
public void initializeMetrics(final MetricFormat metricFormat,
final AbfsConfiguration abfsConfiguration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing AbfsConfiguration here, better to only pass isRetryMetricsEnabled.
Whole object is not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for sending the entire AbfsConfiguration was that we may need additional configuration values in the future; in that case, we can fetch them directly without having to change this again.

switch (metricFormat) {
case INTERNAL_BACKOFF_METRIC_FORMAT:
abfsBackoffMetrics = new AbfsBackoffMetrics();
break;
case INTERNAL_FOOTER_METRIC_FORMAT:
abfsReadFooterMetrics = new AbfsReadFooterMetrics();
break;
case INTERNAL_METRIC_FORMAT:
abfsBackoffMetrics = new AbfsBackoffMetrics();
abfsReadFooterMetrics = new AbfsReadFooterMetrics();
break;
default:
break;
case INTERNAL_BACKOFF_METRIC_FORMAT:
abfsBackoffMetrics = new AbfsBackoffMetrics(
abfsConfiguration.isBackoffRetryMetricsEnabled());
break;
case INTERNAL_FOOTER_METRIC_FORMAT:
initializeReadFooterMetrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break missing here

break;
case INTERNAL_METRIC_FORMAT:
abfsBackoffMetrics = new AbfsBackoffMetrics(
abfsConfiguration.isBackoffRetryMetricsEnabled());
initializeReadFooterMetrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do sm like
abfsReadFooterMetrics = new AbfsReadFooterMetrics(
abfsReadFooterMetrics == null ? null : abfsReadFooterMetrics.getFileTypeMetricsMap()
);

we would have a single constructor for AbfsReadFooterMetrics then

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even then we may need two constructors as it is getting used in some test cases. But as you suggest I can simplify this if else statement in initializeReadFooterMetrics method.

break;
default:
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break is redundant

Copy link
Contributor Author

@bhattmanish98 bhattmanish98 Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is as per best practice to keep break statement with default case.

}
}

/**
* Initialize the read footer metrics.
* In case the metrics are already initialized,
* create a new instance with the existing map.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create a new instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the collected metrics are pushed to the backend, the counters are refreshed by creating new instances of the backoff and read-footer metrics rather than clearing each counter individually. The old instances are automatically garbage-collected.

*/
private void initializeReadFooterMetrics() {
abfsReadFooterMetrics = new AbfsReadFooterMetrics(
abfsReadFooterMetrics == null
? null
: abfsReadFooterMetrics.getFileTypeMetricsMap()
);
}

/**
* Look up a Metric from registered set.
*
Expand Down Expand Up @@ -373,10 +390,9 @@ public DurationTracker trackDuration(String key) {

@Override
public String toString() {
String metric = "";
String metric = EMPTY_STRING;
if (abfsBackoffMetrics != null) {
long totalNoRequests = getAbfsBackoffMetrics().getMetricValue(TOTAL_NUMBER_OF_REQUESTS);
if (totalNoRequests > 0) {
if (getAbfsBackoffMetrics().getMetricValue(TOTAL_NUMBER_OF_REQUESTS) > 0) {
metric += "#BO:" + getAbfsBackoffMetrics().toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ public void initialize(URI uri, Configuration configuration)
.withBlockFactory(blockFactory)
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withBackReference(new BackReference(this))
.withFileSystemId(this.fileSystemId)
.build();

this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder);
Expand Down Expand Up @@ -841,18 +842,6 @@ public synchronized void close() throws IOException {
if (isClosed()) {
return;
}
if (getAbfsStore().getClient().isMetricCollectionEnabled()) {
TracingContext tracingMetricContext = new TracingContext(
clientCorrelationId,
fileSystemId, FSOperationType.GET_ATTR, true,
tracingHeaderFormat,
listener, abfsCounters.toString());
try {
getAbfsClient().getMetricCall(tracingMetricContext);
} catch (IOException e) {
throw new IOException(e);
}
}
// does all the delete-on-exit calls, and may be slow.
super.close();
LOG.debug("AzureBlobFileSystem.close");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private final IdentityTransformerInterface identityTransformer;
private final AbfsPerfTracker abfsPerfTracker;
private final AbfsCounters abfsCounters;
private final String fileSystemId;

/**
* The set of directories where we should store files as append blobs.
Expand Down Expand Up @@ -258,6 +259,7 @@ public AzureBlobFileSystemStore(
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : abfsStoreBuilder.isSecureScheme;
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
this.abfsCounters = abfsStoreBuilder.abfsCounters;
this.fileSystemId = abfsStoreBuilder.fileSystemId;
initializeClient(uri, fileSystemName, accountName, useHttps);
final Class<? extends IdentityTransformerInterface> identityTransformerClass =
abfsStoreBuilder.configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
Expand Down Expand Up @@ -1828,6 +1830,7 @@ private AbfsClientContext populateAbfsClientContext() {
new TailLatencyRequestTimeoutRetryPolicy(abfsConfiguration))
.withAbfsCounters(abfsCounters)
.withAbfsPerfTracker(abfsPerfTracker)
.withFileSystemId(fileSystemId)
.build();
}

Expand Down Expand Up @@ -1979,6 +1982,7 @@ public static final class AzureBlobFileSystemStoreBuilder {
private DataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks;
private BackReference fsBackRef;
private String fileSystemId;

public AzureBlobFileSystemStoreBuilder withUri(URI value) {
this.uri = value;
Expand Down Expand Up @@ -2020,6 +2024,11 @@ public AzureBlobFileSystemStoreBuilder withBackReference(
return this;
}

public AzureBlobFileSystemStoreBuilder withFileSystemId(String fileSystemId) {
this.fileSystemId = fileSystemId;
return this;
}

public AzureBlobFileSystemStoreBuilder build() {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public final class AbfsHttpConstants {
public static final String HASH = "#";
public static final String TRUE = "true";
public static final String ZERO = "0";
public static final String UNDERSCORE = "_";
public static final String OPENING_SQUARE_BRACKET = "[";
public static final String CLOSING_SQUARE_BRACKET = "]";

public static final String PLUS_ENCODE = "%20";
public static final String FORWARD_SLASH_ENCODE = "%2F";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.azurebfs.constants;

import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -69,9 +70,59 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = "fs.azure.account.expect.header.enabled";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
public static final String FS_AZURE_METRIC_ACCOUNT_NAME = "fs.azure.metric.account.name";
public static final String FS_AZURE_METRIC_ACCOUNT_KEY = "fs.azure.metric.account.key";
public static final String FS_AZURE_METRIC_URI = "fs.azure.metric.uri";

/**
* Config to set separate metrics account in case user don't want to use
* existing storage account for metrics collection.
* Value: {@value}.
*/
public static final String FS_AZURE_METRICS_ACCOUNT_NAME = "fs.azure.metrics.account.name";
/**
* Config to set metrics account key for @FS_AZURE_METRICS_ACCOUNT_NAME.
* Value: {@value}.
*/
public static final String FS_AZURE_METRICS_ACCOUNT_KEY = "fs.azure.metrics.account.key";
/**
* Config to set metrics format. Possible values are {@link MetricFormat}
* Value: {@value}.
*/
public static final String FS_AZURE_METRICS_FORMAT = "fs.azure.metrics.format";
/**
* Config to enable or disable metrics collection.
* Value: {@value}.
*/
public static final String FS_AZURE_METRICS_COLLECTION_ENABLED = "fs.azure.metrics.collection.enabled";
/**
* Config to enable or disable emitting metrics when idle time exceeds threshold.
* Value: {@value}.
*/
public static final String FS_AZURE_METRICS_SHOULD_EMIT_ON_IDLE_TIME = "fs.azure.metrics.should.emit.on.idle.time";
/**
* Config to set threshold for emitting metrics when number of operations exceeds threshold.
* Value: {@value}.
*/
public static final String FS_AZURE_METRICS_EMIT_THRESHOLD = "fs.azure.metrics.emit.threshold";
/**
* Config to set interval in seconds to check for threshold breach for emitting metrics.
* If the number of operations exceed threshold within this interval, metrics will be emitted.
* Value: {@value}.
*/
public static final String FS_AZURE_METRICS_EMIT_THRESHOLD_INTERVAL_SECS = "fs.azure.metrics.emit.threshold.interval.secs";
/**
* Config to set interval in minutes for emitting metrics in regular time intervals.
* Value: {@value}.
*/
public static final String FS_AZURE_METRICS_EMIT_INTERVAL_MINS = "fs.azure.metrics.emit.interval.mins";
/**
* Config to set maximum metrics calls per second.
* Value: {@value}.
*/
public static final String FS_AZURE_METRICS_MAX_CALLS_PER_SECOND = "fs.azure.metrics.max.calls.per.second";
/**
* Config to enable or disable backoff retry metrics collection.
* Value: {@value}.
*/
public static final String FS_AZURE_METRICS_BACKOFF_RETRY_ENABLED = "fs.azure.metrics.backoff.retry.enabled";

public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
Expand Down Expand Up @@ -231,7 +282,6 @@ public final class ConfigurationKeys {
* character constraints are not satisfied. **/
public static final String FS_AZURE_CLIENT_CORRELATIONID = "fs.azure.client.correlationid";
public static final String FS_AZURE_TRACINGHEADER_FORMAT = "fs.azure.tracingheader.format";
public static final String FS_AZURE_METRIC_FORMAT = "fs.azure.metric.format";
public static final String FS_AZURE_CLUSTER_NAME = "fs.azure.cluster.name";
public static final String FS_AZURE_CLUSTER_TYPE = "fs.azure.cluster.type";
public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = false;
public static final int DEFAULT_METRIC_IDLE_TIMEOUT_MS = 60_000;
public static final int DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS = 60_000;
public static final boolean DEFAULT_METRICS_COLLECTION_ENABLED = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to keep it enabled by default for all Cxs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this feature will be enabled by default.

public static final boolean DEFAULT_METRICS_SHOULD_EMIT_ON_IDLE_TIME = false;
public static final long DEFAULT_METRICS_EMIT_THRESHOLD = 100_000L;
public static final long DEFAULT_METRICS_EMIT_THRESHOLD_INTERVAL_SECS = 60;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Genrally names here also contains FS. Format was exact config variable name prefixed with DEFAULT_

I know this isn't followed everywhere but let's try to resolve this here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, earlier as well we keep it DEFAULT_. We can discuss it offline as it will make the variable name bigger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, no changes required here.

public static final long DEFAULT_METRICS_EMIT_INTERVAL_MINS = 60;
public static final int DEFAULT_METRICS_MAX_CALLS_PER_SECOND = 3;
public static final boolean DEFAULT_METRICS_BACKOFF_RETRY_ENABLED = false;
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000;
public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000;
Expand Down
Loading