Skip to content

Commit

Permalink
Support BanyanDB internal metrics query execution tracing. (#12400)
Browse files Browse the repository at this point in the history
* Support BanyanDB internal metrics query execution tracing.
* BanyanDB client config: rise the default `maxBulkSize` to 10000, add `flushTimeout` and set default to 10s.
  • Loading branch information
wankai123 committed Jul 2, 2024
1 parent 2e8b4f9 commit 8b678be
Show file tree
Hide file tree
Showing 15 changed files with 75 additions and 26 deletions.
2 changes: 2 additions & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
* Fix BanyanDB metrics query: used the wrong `Downsampling` type to find the schema.
* Support fetch cilium flow to monitoring network traffic between cilium services.
* Support `labelCount` function in the OAL engine.
* Support BanyanDB internal metrics query execution tracing.
* BanyanDB client config: rise the default `maxBulkSize` to 10000, add `flushTimeout` and set default to 10s.

#### UI
* Highlight search log keywords.
Expand Down
3 changes: 2 additions & 1 deletion docs/en/setup/backend/configuration-vocabulary.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ The Configuration Vocabulary lists all available configurations provided by `app
| - | - | asyncBatchPersistentPoolSize | async flush data into database thread size | SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE | 4 |
| - | banyandb | - | BanyanDB storage. | - | - |
| - | - | targets | Hosts with ports of the BanyanDB. | SW_STORAGE_BANYANDB_TARGETS | 127.0.0.1:17912 |
| - | - | maxBulkSize | The maximum size of write entities in a single batch write call. | SW_STORAGE_BANYANDB_MAX_BULK_SIZE | 5000 |
| - | - | maxBulkSize | The maximum size of write entities in a single batch write call. | SW_STORAGE_BANYANDB_MAX_BULK_SIZE | 10000 |
| - | - | flushInterval | Period of flush interval. In the timeunit of seconds. | SW_STORAGE_BANYANDB_FLUSH_INTERVAL | 15 |
| - | - | flushTimeout | The timeout seconds of a bulk flush. | SW_STORAGE_BANYANDB_FLUSH_TIMEOUT | 10 |
| - | - | metricsShardsNumber | Shards Number for measure/metrics. | SW_STORAGE_BANYANDB_METRICS_SHARDS_NUMBER | 1 |
| - | - | recordShardsNumber | Shards Number for a normal record. | SW_STORAGE_BANYANDB_RECORD_SHARDS_NUMBER | 1 |
| - | - | superDatasetShardsFactor | Shards Factor for a super dataset record, i.e. Shard number of a super dataset is recordShardsNumber*superDatasetShardsFactor. | SW_STORAGE_BANYANDB_SUPERDATASET_SHARDS_FACTOR | 2 |
Expand Down
6 changes: 4 additions & 2 deletions docs/en/setup/backend/storages/banyandb.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[BanyanDB](https://github.com/apache/skywalking-banyandb) is a dedicated storage implementation developed by the SkyWalking Team and the community.
Activate BanyanDB as the storage, and set storage provider to **banyandb**.

The OAP requires BanyanDB 0.6 server. From this version, BanyanDB provides general compatibility.
The OAP requires BanyanDB 0.7 server. From this version, BanyanDB provides general compatibility.

```yaml
storage:
Expand All @@ -14,12 +14,14 @@ storage:
targets: ${SW_STORAGE_BANYANDB_TARGETS:127.0.0.1:17912}
# The max number of records in a bulk write request.
# Bigger value can improve the write performance, but also increase the OAP and BanyanDB Server memory usage.
maxBulkSize: ${SW_STORAGE_BANYANDB_MAX_BULK_SIZE:5000}
maxBulkSize: ${SW_STORAGE_BANYANDB_MAX_BULK_SIZE:10000}
# The minimum seconds between two bulk flushes.
# If the data in a bulk is less than maxBulkSize, the data will be flushed after this period.
# If the data in a bulk is more than maxBulkSize, the data will be flushed immediately.
# Bigger value can reduce the write pressure on BanyanDB Server, but also increase the latency of the data.
flushInterval: ${SW_STORAGE_BANYANDB_FLUSH_INTERVAL:15}
# The timeout seconds of a bulk flush.
flushTimeout: ${SW_STORAGE_BANYANDB_FLUSH_TIMEOUT:10}
# The shard number of `measure` groups that store the metrics data.
metricsShardsNumber: ${SW_STORAGE_BANYANDB_METRICS_SHARDS_NUMBER:1}
# The shard number of `stream` groups that store the trace, log and profile data.
Expand Down
2 changes: 1 addition & 1 deletion oap-server-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<httpcore.version>4.4.13</httpcore.version>
<httpasyncclient.version>4.1.5</httpasyncclient.version>
<commons-compress.version>1.21</commons-compress.version>
<banyandb-java-client.version>0.6.0</banyandb-java-client.version>
<banyandb-java-client.version>0.7.0-rc1</banyandb-java-client.version>
<kafka-clients.version>3.4.0</kafka-clients.version>
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
<consul.client.version>1.5.3</consul.client.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public MetricsValues readMetricsValues(MetricsCondition condition, Duration dura
DebuggingSpan span = null;
try {
if (traceContext != null) {
span = traceContext.createSpan("Query Service");
span.setMsg("readMetricsValues, MetricsCondition: " + condition + ", Duration: " + duration);
span = traceContext.createSpan("Query Service: readMetricsValues");
span.setMsg("MetricsCondition: " + condition + ", Duration: " + duration);
}
return invokeReadMetricsValues(condition, duration);
} finally {
Expand Down Expand Up @@ -125,8 +125,8 @@ public List<MetricsValues> readLabeledMetricsValues(MetricsCondition condition,
DebuggingSpan span = null;
try {
if (traceContext != null) {
span = traceContext.createSpan("Query Service");
span.setMsg("readLabeledMetricsValues, MetricsCondition: " + condition + ", Labels: " + labels + ", Duration: " + duration);
span = traceContext.createSpan("Query Service: readLabeledMetricsValues");
span.setMsg("MetricsCondition: " + condition + ", Labels: " + labels + ", Duration: " + duration);
}
return invokeReadLabeledMetricsValues(condition, labels, duration);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public List<Record> readRecords(RecordCondition condition, Duration duration) th
DebuggingSpan span = null;
try {
if (traceContext != null) {
span = traceContext.createSpan("Query Service");
span.setMsg("readRecords, RecordCondition: " + condition + ", Duration: " + duration);
span = traceContext.createSpan("Query Service: readRecords");
span.setMsg("RecordCondition: " + condition + ", Duration: " + duration);
}
return invokeReadRecords(condition, duration);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ public class DebuggingSpan {
private int parentSpanId;
private final String operation;
//nano seconds
private final long startTime;
@Setter
private long startTime;
//nano seconds
@Setter
private long endTime;
//nano seconds
@Setter
private long duration;
@Setter
private String msg;
Expand All @@ -41,11 +44,5 @@ public class DebuggingSpan {
public DebuggingSpan(int spanId, String operation) {
this.spanId = spanId;
this.operation = operation;
this.startTime = System.nanoTime();
}

public void stopSpan() {
this.endTime = System.nanoTime();
this.duration = endTime - startTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ public DebuggingTraceContext(String condition, boolean debug, boolean dumpStorag
public DebuggingSpan createSpan(String operation) {
DebuggingSpan span = new DebuggingSpan(spanIdGenerator++, operation);
if (debug) {
//default start time, could be overwritten by setStartTime (BanyanDB Trace)
span.setStartTime(System.nanoTime());
DebuggingSpan parentSpan = spanStack.isEmpty() ? null : spanStack.peek();
if (parentSpan != null) {
//default parent span id, could be overwritten by setParentSpanId (BanyanDB Trace)
span.setParentSpanId(parentSpan.getSpanId());
} else {
span.setParentSpanId(-1);
Expand All @@ -51,9 +54,17 @@ public DebuggingSpan createSpan(String operation) {
return span;
}

public DebuggingSpan getParentSpan() {
if (spanStack.isEmpty()) {
return null;
}
return spanStack.peek();
}

public void stopSpan(DebuggingSpan span) {
if (debug) {
span.stopSpan();
span.setEndTime(System.nanoTime());
span.setDuration(span.getEndTime() - span.getStartTime());
if (spanStack.isEmpty()) {
return;
}
Expand Down
4 changes: 3 additions & 1 deletion oap-server/server-starter/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,14 @@ storage:
targets: ${SW_STORAGE_BANYANDB_TARGETS:127.0.0.1:17912}
# The max number of records in a bulk write request.
# Bigger value can improve the write performance, but also increase the OAP and BanyanDB Server memory usage.
maxBulkSize: ${SW_STORAGE_BANYANDB_MAX_BULK_SIZE:5000}
maxBulkSize: ${SW_STORAGE_BANYANDB_MAX_BULK_SIZE:10000}
# The minimum seconds between two bulk flushes.
# If the data in a bulk is less than maxBulkSize, the data will be flushed after this period.
# If the data in a bulk is more than maxBulkSize, the data will be flushed immediately.
# Bigger value can reduce the write pressure on BanyanDB Server, but also increase the latency of the data.
flushInterval: ${SW_STORAGE_BANYANDB_FLUSH_INTERVAL:15}
# The timeout seconds of a bulk flush.
flushTimeout: ${SW_STORAGE_BANYANDB_FLUSH_TIMEOUT:10}
# The shard number of `measure` groups that store the metrics data.
metricsShardsNumber: ${SW_STORAGE_BANYANDB_METRICS_SHARDS_NUMBER:1}
# The shard number of `stream` groups that store the trace, log and profile data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@
public class BanyanDBStorageClient implements Client, HealthCheckable {
final BanyanDBClient client;
private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
private final int flushTimeout;

public BanyanDBStorageClient(String... targets) {
public BanyanDBStorageClient(int flushTimeout, String... targets) {
this.client = new BanyanDBClient(targets);
this.flushTimeout = flushTimeout;
}

@Override
Expand Down Expand Up @@ -234,11 +236,11 @@ public void write(StreamWrite streamWrite) {
}

public StreamBulkWriteProcessor createStreamBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) {
return this.client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency);
return this.client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency, flushTimeout);
}

public MeasureBulkWriteProcessor createMeasureBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) {
return this.client.buildMeasureWriteProcessor(maxBulkSize, flushInterval, concurrency);
return this.client.buildMeasureWriteProcessor(maxBulkSize, flushInterval, concurrency, flushTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public class BanyanDBStorageConfig extends ModuleConfig {
* Period of flush interval. In the timeunit of seconds.
*/
private int flushInterval = 15;
/**
* Timeout of flush. In the timeunit of seconds.
*/
private int flushTimeout = 10;
/**
* Concurrent consumer threads for batch writing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void onInitialized(final BanyanDBStorageConfig initialized) {
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());

this.client = new BanyanDBStorageClient(config.getTargetArray());
this.client = new BanyanDBStorageClient(config.getFlushTimeout(), config.getTargetArray());
this.modelInstaller = new BanyanDBIndexInstaller(client, getManager(), this.config);

// Stream
Expand Down
Loading

0 comments on commit 8b678be

Please sign in to comment.