Skip to content

Commit

Permalink
Polish BanyanDB group and schema creation logic (#12408)
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-sheng committed Jul 3, 2024
1 parent 8b678be commit fb6f352
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 26 deletions.
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* Support `labelCount` function in the OAL engine.
* Support BanyanDB internal metrics query execution tracing.
* BanyanDB client config: rise the default `maxBulkSize` to 10000, add `flushTimeout` and set default to 10s.
* Polish BanyanDB group and schema creation logic to fix the schema creation failure issue in distributed race conditions.

#### UI
* Highlight search log keywords.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.skywalking.oap.server.storage.plugin.banyandb;

import io.grpc.Status;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
Expand All @@ -31,8 +32,6 @@
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;

import java.io.IOException;

@Slf4j
public class BanyanDBIndexInstaller extends ModelInstaller {
private final BanyanDBStorageConfig config;
Expand All @@ -48,8 +47,11 @@ public boolean isExists(Model model) throws StorageException {
if (!model.isTimeSeries()) {
return true;
}
final ConfigService configService = moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class);
final MetadataRegistry.SchemaMetadata metadata = MetadataRegistry.INSTANCE.parseMetadata(model, config, configService);
final ConfigService configService = moduleManager.find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
final MetadataRegistry.SchemaMetadata metadata = MetadataRegistry.INSTANCE.parseMetadata(
model, config, configService);
try {
final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client;
// first check resource existence and create group if necessary
Expand Down Expand Up @@ -78,26 +80,60 @@ public boolean isExists(Model model) throws StorageException {
@Override
public void createTable(Model model) throws StorageException {
try {
ConfigService configService = moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class);
ConfigService configService = moduleManager.find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
if (model.isRecord()) { // stream
Stream stream = MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
if (stream != null) {
log.info("install stream schema {}", model.getName());
((BanyanDBStorageClient) client).define(stream);
try {
((BanyanDBStorageClient) client).define(stream);
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info(
"Stream schema {}_{} already created by another OAP node",
model.getName(),
model.getDownsampling()
);
} else {
throw ex;
}
}
}
} else { // measure
Measure measure = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
if (measure != null) {
log.info("install measure schema {}", measure.name());
((BanyanDBStorageClient) client).define(measure);
final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client;
MetadataRegistry.INSTANCE.findMetadata(model).installTopNAggregation(c);
try {
c.define(measure);
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("Measure schema {}_{} already created by another OAP node",
model.getName(),
model.getDownsampling());
} else {
throw ex;
}
}
final MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
try {
schema.installTopNAggregation(c);
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("Measure schema {}_{} TopN({}) already created by another OAP node",
model.getName(),
model.getDownsampling(),
schema.getTopNSpec());
} else {
throw ex;
}
}
}
}
} catch (IOException ex) {
throw new StorageException("fail to install schema", ex);
} catch (BanyanDBException ex) {
throw new StorageException("fail to install TopN schema", ex);
throw new StorageException("fail to create schema " + model.getName(), ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,23 +180,23 @@ public void define(Property property, PropertyStore.Strategy strategy) throws IO
}
}

public void define(Stream stream) throws IOException {
public void define(Stream stream) throws BanyanDBException {
try {
this.client.define(stream);
this.healthChecker.health();
} catch (BanyanDBException ex) {
healthChecker.unHealth(ex);
throw new IOException("fail to define stream", ex);
throw ex;
}
}

public void define(Measure measure) throws IOException {
public void define(Measure measure) throws BanyanDBException {
try {
this.client.define(measure);
this.healthChecker.health();
} catch (BanyanDBException ex) {
healthChecker.unHealth(ex);
throw new IOException("fail to define stream", ex);
throw ex;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,22 +551,44 @@ public boolean checkResourceExistence(BanyanDBClient client) throws BanyanDBExce
case STREAM:
resourceExist = client.existStream(this.group, this.name());
if (!resourceExist.hasGroup()) {
Group g = client.define(Group.create(this.group, Catalog.STREAM, this.shard,
IntervalRule.create(IntervalRule.Unit.DAY, this.segmentIntervalDays),
IntervalRule.create(IntervalRule.Unit.DAY, this.ttlDays)));
if (g != null) {
log.info("group {} created", g.name());
try {
Group g = client.define(Group.create(this.group, Catalog.STREAM, this.shard,
IntervalRule.create(
IntervalRule.Unit.DAY, this.segmentIntervalDays),
IntervalRule.create(
IntervalRule.Unit.DAY, this.ttlDays)
));
if (g != null) {
log.info("group {} created", g.name());
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("group {} already created by another OAP node", this.group);
} else {
throw ex;
}
}
}
return resourceExist.hasResource();
case MEASURE:
resourceExist = client.existMeasure(this.group, this.name());
if (!resourceExist.hasGroup()) {
Group g = client.define(Group.create(this.group, Catalog.MEASURE, this.shard,
IntervalRule.create(IntervalRule.Unit.DAY, this.segmentIntervalDays),
IntervalRule.create(IntervalRule.Unit.DAY, this.ttlDays)));
if (g != null) {
log.info("group {} created", g.name());
try {
if (!resourceExist.hasGroup()) {
Group g = client.define(Group.create(this.group, Catalog.MEASURE, this.shard,
IntervalRule.create(
IntervalRule.Unit.DAY, this.segmentIntervalDays),
IntervalRule.create(
IntervalRule.Unit.DAY, this.ttlDays)
));
if (g != null) {
log.info("group {} created", g.name());
}
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("group {} already created by another OAP node", this.group);
} else {
throw ex;
}
}
return resourceExist.hasResource();
Expand Down

0 comments on commit fb6f352

Please sign in to comment.