From fb6f35297b400c6c695a261d0bd50340f4bc4f69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Wed, 3 Jul 2024 16:47:53 +0800 Subject: [PATCH] Polish BanyanDB group and schema creation logic (#12408) --- docs/en/changes/changes.md | 1 + .../banyandb/BanyanDBIndexInstaller.java | 58 +++++++++++++++---- .../banyandb/BanyanDBStorageClient.java | 8 +-- .../plugin/banyandb/MetadataRegistry.java | 44 ++++++++++---- 4 files changed, 85 insertions(+), 26 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 49730287cacb..dd5f0275d92a 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -26,6 +26,7 @@ * Support `labelCount` function in the OAL engine. * Support BanyanDB internal metrics query execution tracing. * BanyanDB client config: rise the default `maxBulkSize` to 10000, add `flushTimeout` and set default to 10s. +* Polish BanyanDB group and schema creation logic to fix the schema creation failure issue in distributed race conditions. #### UI * Highlight search log keywords. diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java index 0ea91671b16a..b95737843bcf 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; +import io.grpc.Status; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.banyandb.v1.client.BanyanDBClient; import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; @@ -31,8 +32,6 @@ import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.module.ModuleManager; -import java.io.IOException; - @Slf4j public class BanyanDBIndexInstaller extends ModelInstaller { private final BanyanDBStorageConfig config; @@ -48,8 +47,11 @@ public boolean isExists(Model model) throws StorageException { if (!model.isTimeSeries()) { return true; } - final ConfigService configService = moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class); - final MetadataRegistry.SchemaMetadata metadata = MetadataRegistry.INSTANCE.parseMetadata(model, config, configService); + final ConfigService configService = moduleManager.find(CoreModule.NAME) + .provider() + .getService(ConfigService.class); + final MetadataRegistry.SchemaMetadata metadata = MetadataRegistry.INSTANCE.parseMetadata( + model, config, configService); try { final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client; // first check resource existence and create group if necessary @@ -78,26 +80,60 @@ public boolean isExists(Model model) throws StorageException { @Override public void createTable(Model model) throws StorageException { try { - ConfigService configService = moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class); + ConfigService configService = moduleManager.find(CoreModule.NAME) + .provider() + .getService(ConfigService.class); if (model.isRecord()) { // stream Stream stream = MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService); if (stream != null) { log.info("install stream schema {}", model.getName()); - ((BanyanDBStorageClient) client).define(stream); + try { + ((BanyanDBStorageClient) client).define(stream); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info( + "Stream schema {}_{} already created by another OAP node", + model.getName(), + model.getDownsampling() + ); + } else { + throw ex; + } + } } } else { // measure Measure measure = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService); if (measure != null) { log.info("install measure schema {}", measure.name()); - ((BanyanDBStorageClient) client).define(measure); final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client; - MetadataRegistry.INSTANCE.findMetadata(model).installTopNAggregation(c); + try { + c.define(measure); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("Measure schema {}_{} already created by another OAP node", + model.getName(), + model.getDownsampling()); + } else { + throw ex; + } + } + final MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); + try { + schema.installTopNAggregation(c); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("Measure schema {}_{} TopN({}) already created by another OAP node", + model.getName(), + model.getDownsampling(), + schema.getTopNSpec()); + } else { + throw ex; + } + } } } - } catch (IOException ex) { - throw new StorageException("fail to install schema", ex); } catch (BanyanDBException ex) { - throw new StorageException("fail to install TopN schema", ex); + throw new StorageException("fail to create schema " + model.getName(), ex); } } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java index 25b6cb2c5bef..26069cdc0d73 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java @@ -180,23 +180,23 @@ public void define(Property property, PropertyStore.Strategy strategy) throws IO } } - public void define(Stream stream) throws IOException { + public void define(Stream stream) throws BanyanDBException { try { this.client.define(stream); this.healthChecker.health(); } catch (BanyanDBException ex) { healthChecker.unHealth(ex); - throw new IOException("fail to define stream", ex); + throw ex; } } - public void define(Measure measure) throws IOException { + public void define(Measure measure) throws BanyanDBException { try { this.client.define(measure); this.healthChecker.health(); } catch (BanyanDBException ex) { healthChecker.unHealth(ex); - throw new IOException("fail to define stream", ex); + throw ex; } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index 0f94173f4d37..86795cf4967a 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -551,22 +551,44 @@ public boolean checkResourceExistence(BanyanDBClient client) throws BanyanDBExce case STREAM: resourceExist = client.existStream(this.group, this.name()); if (!resourceExist.hasGroup()) { - Group g = client.define(Group.create(this.group, Catalog.STREAM, this.shard, - IntervalRule.create(IntervalRule.Unit.DAY, this.segmentIntervalDays), - IntervalRule.create(IntervalRule.Unit.DAY, this.ttlDays))); - if (g != null) { - log.info("group {} created", g.name()); + try { + Group g = client.define(Group.create(this.group, Catalog.STREAM, this.shard, + IntervalRule.create( + IntervalRule.Unit.DAY, this.segmentIntervalDays), + IntervalRule.create( + IntervalRule.Unit.DAY, this.ttlDays) + )); + if (g != null) { + log.info("group {} created", g.name()); + } + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("group {} already created by another OAP node", this.group); + } else { + throw ex; + } } } return resourceExist.hasResource(); case MEASURE: resourceExist = client.existMeasure(this.group, this.name()); - if (!resourceExist.hasGroup()) { - Group g = client.define(Group.create(this.group, Catalog.MEASURE, this.shard, - IntervalRule.create(IntervalRule.Unit.DAY, this.segmentIntervalDays), - IntervalRule.create(IntervalRule.Unit.DAY, this.ttlDays))); - if (g != null) { - log.info("group {} created", g.name()); + try { + if (!resourceExist.hasGroup()) { + Group g = client.define(Group.create(this.group, Catalog.MEASURE, this.shard, + IntervalRule.create( + IntervalRule.Unit.DAY, this.segmentIntervalDays), + IntervalRule.create( + IntervalRule.Unit.DAY, this.ttlDays) + )); + if (g != null) { + log.info("group {} created", g.name()); + } + } + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("group {} already created by another OAP node", this.group); + } else { + throw ex; } } return resourceExist.hasResource();