Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt BanyanDB Java Client 0.7.0. #12621

Merged
merged 4 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
* Fix the previous analysis result missing in the ALS `k8s-mesh` analyzer.
* Fix `findEndpoint` query require `keyword` when using BanyanDB.
* Support to analysis the ztunnel mapped IP address in eBPF Access Log Receiver.
* Adapt BanyanDB Java Client 0.7.0-rc3.

#### UI

Expand Down
2 changes: 1 addition & 1 deletion oap-server-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<httpcore.version>4.4.13</httpcore.version>
<httpasyncclient.version>4.1.5</httpasyncclient.version>
<commons-compress.version>1.21</commons-compress.version>
<banyandb-java-client.version>0.7.0-rc2</banyandb-java-client.version>
<banyandb-java-client.version>0.7-rc3</banyandb-java-client.version>
<kafka-clients.version>3.4.0</kafka-clients.version>
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
<consul.client.version>1.5.3</consul.client.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;

@Slf4j
public class BanyanDBIndexInstaller extends ModelInstaller {
Expand Down Expand Up @@ -58,20 +60,20 @@ public boolean isExists(Model model) throws StorageException {
final boolean resourceExist = metadata.checkResourceExistence(c);
if (!resourceExist) {
return false;
}

// then check entity schema
if (metadata.findRemoteSchema(c).isPresent()) {
// register models only locally but not remotely
} else {
// register models only locally(Schema cache) but not remotely
if (model.isRecord()) { // stream
MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
} else { // measure
MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
}
// pre-load remote schema for java client
MetadataCache.EntityMetadata remoteMeta = metadata.updateRemoteSchema(c);
if (remoteMeta == null) {
throw new IllegalStateException("inconsistent state: metadata:" + metadata + ", remoteMeta: null");
}
return true;
}

throw new IllegalStateException("inconsistent state:" + metadata);
} catch (BanyanDBException ex) {
throw new StorageException("fail to check existence", ex);
}
Expand All @@ -84,11 +86,17 @@ public void createTable(Model model) throws StorageException {
.provider()
.getService(ConfigService.class);
if (model.isRecord()) { // stream
Stream stream = MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
StreamModel streamModel = MetadataRegistry.INSTANCE.registerStreamModel(model, config, configService);
Stream stream = streamModel.getStream();
if (stream != null) {
log.info("install stream schema {}", model.getName());
final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client;
try {
((BanyanDBStorageClient) client).define(stream);
if (CollectionUtils.isNotEmpty(streamModel.getIndexRules())) {
client.define(stream, streamModel.getIndexRules());
} else {
client.define(stream);
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info(
Expand All @@ -102,12 +110,17 @@ public void createTable(Model model) throws StorageException {
}
}
} else { // measure
Measure measure = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
MeasureModel measureModel = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
Measure measure = measureModel.getMeasure();
if (measure != null) {
log.info("install measure schema {}", measure.name());
final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client;
log.info("install measure schema {}", model.getName());
final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client;
try {
c.define(measure);
if (CollectionUtils.isNotEmpty(measureModel.getIndexRules())) {
client.define(measure, measureModel.getIndexRules());
} else {
client.define(measure);
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("Measure schema {}_{} already created by another OAP node",
Expand All @@ -119,7 +132,7 @@ public void createTable(Model model) throws StorageException {
}
final MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
try {
schema.installTopNAggregation(c);
schema.installTopNAggregation(client);
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("Measure schema {}_{} TopN({}) already created by another OAP node",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void insert(Model model, NoneStream noneStream) throws IOException {
if (schema == null) {
throw new IOException(model.getName() + " is not registered");
}
StreamWrite streamWrite = getClient().client.createStreamWrite(
StreamWrite streamWrite = getClient().createStreamWrite(
schema.getMetadata().getGroup(), // group name
schema.getMetadata().name(), // stream-name
noneStream.id().build() // identity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;

import io.grpc.Status;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
Expand All @@ -30,14 +31,15 @@
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
import org.apache.skywalking.banyandb.v1.client.TopNQuery;
import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.ApplyRequest.Strategy;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.DeleteResponse;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.AlreadyExistsException;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Group;
import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
import org.apache.skywalking.banyandb.v1.client.metadata.Property;
import org.apache.skywalking.banyandb.v1.client.metadata.PropertyStore;
import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
Expand Down Expand Up @@ -103,9 +105,9 @@ public Property queryProperty(String group, String name, String id) throws IOExc
}
}

public PropertyStore.DeleteResult deleteProperty(String group, String name, String id, String... tags) throws IOException {
public DeleteResponse deleteProperty(String group, String name, String id, String... tags) throws IOException {
try {
PropertyStore.DeleteResult result = this.client.deleteProperty(group, name, id, tags);
DeleteResponse result = this.client.deleteProperty(group, name, id, tags);
this.healthChecker.health();
return result;
} catch (BanyanDBException ex) {
Expand Down Expand Up @@ -158,7 +160,7 @@ public TopNQueryResponse query(TopNQuery q) throws IOException {
}

/**
* PropertyStore.Strategy is default to {@link PropertyStore.Strategy#MERGE}
* PropertyStore.Strategy is default to {@link Strategy#STRATEGY_MERGE}
*/
public void define(Property property) throws IOException {
try {
Expand All @@ -170,7 +172,7 @@ public void define(Property property) throws IOException {
}
}

public void define(Property property, PropertyStore.Strategy strategy) throws IOException {
public void define(Property property, Strategy strategy) throws IOException {
try {
this.client.apply(property, strategy);
this.healthChecker.health();
Expand All @@ -190,6 +192,16 @@ public void define(Stream stream) throws BanyanDBException {
}
}

public void define(Stream stream, List<BanyandbDatabase.IndexRule> indexRules) throws BanyanDBException {
try {
this.client.define(stream, indexRules);
this.healthChecker.health();
} catch (BanyanDBException ex) {
healthChecker.unHealth(ex);
throw ex;
}
}

public void define(Measure measure) throws BanyanDBException {
try {
this.client.define(measure);
Expand All @@ -200,6 +212,16 @@ public void define(Measure measure) throws BanyanDBException {
}
}

public void define(Measure measure, List<BanyandbDatabase.IndexRule> indexRules) throws BanyanDBException {
try {
this.client.define(measure, indexRules);
this.healthChecker.health();
} catch (BanyanDBException ex) {
healthChecker.unHealth(ex);
throw ex;
}
}

public void defineIfEmpty(Group group) throws IOException {
try {
try {
Expand All @@ -223,12 +245,20 @@ public void define(TopNAggregation topNAggregation) throws IOException {
}
}

public StreamWrite createStreamWrite(String group, String name, String elementId) {
return this.client.createStreamWrite(group, name, elementId);
public StreamWrite createStreamWrite(String group, String name, String elementId) throws IOException {
try {
return this.client.createStreamWrite(group, name, elementId);
} catch (BanyanDBException e) {
throw new IOException("fail to create stream write", e);
}
}

public MeasureWrite createMeasureWrite(String group, String name, long timestamp) {
return this.client.createMeasureWrite(group, name, timestamp);
public MeasureWrite createMeasureWrite(String group, String name, long timestamp) throws IOException {
try {
return this.client.createMeasureWrite(group, name, timestamp);
} catch (BanyanDBException e) {
throw new IOException("fail to create measure write", e);
}
}

public void write(StreamWrite streamWrite) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.skywalking.oap.server.storage.plugin.banyandb;

import org.apache.skywalking.banyandb.v1.client.metadata.Group;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
Expand Down Expand Up @@ -175,7 +175,14 @@ public void start() throws ServiceNotProvidedException, ModuleStartException {
this.client.registerChecker(healthChecker);
try {
this.client.connect();
this.client.defineIfEmpty(Group.create(BanyanDBUITemplateManagementDAO.GROUP));
this.client.defineIfEmpty(BanyandbCommon.Group.newBuilder()
.setMetadata(
BanyandbCommon.Metadata.newBuilder()
.setName(
BanyanDBUITemplateManagementDAO.GROUP))
.setCatalog(BanyandbCommon.Catalog.CATALOG_UNSPECIFIED)
.build());
//this.client.defineIfEmpty(Group.create(BanyanDBUITemplateManagementDAO.GROUP));
this.modelInstaller.start();

getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(modelInstaller);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;

import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.banyandb.v1.client.metadata.Property;
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
import org.apache.skywalking.oap.server.core.management.ui.menu.UIMenu;
import org.apache.skywalking.oap.server.core.storage.management.UIMenuManagementDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
Expand All @@ -46,17 +49,31 @@ public UIMenu getMenu(String id) throws IOException {

@Override
public void saveMenu(UIMenu menu) throws IOException {
this.getClient().define(Property.create(GROUP, UIMenu.INDEX_NAME, menu.id().build())
.addTag(TagAndValue.newStringTag(UIMenu.CONFIGURATION, menu.getConfigurationJson()))
.addTag(TagAndValue.newLongTag(UIMenu.UPDATE_TIME, menu.getUpdateTime()))
.build());
Property property = Property.newBuilder()
.setMetadata(BanyandbProperty.Metadata.newBuilder().setId(menu.getMenuId())
.setContainer(
BanyandbCommon.Metadata.newBuilder()
.setGroup(GROUP)
.setName(
UIMenu.INDEX_NAME)))

.addTags(TagAndValue.newStringTag(UIMenu.CONFIGURATION, menu.getConfigurationJson())
.build())
.addTags(TagAndValue.newLongTag(UIMenu.UPDATE_TIME, menu.getUpdateTime()).build())
.build();
// this.getClient().define(Property.create(GROUP, UIMenu.INDEX_NAME, menu.id().build())
// .addTag(TagAndValue.newStringTag(UIMenu.CONFIGURATION, menu.getConfigurationJson()))
// .addTag(TagAndValue.newLongTag(UIMenu.UPDATE_TIME, menu.getUpdateTime()))
// .build());
this.getClient().define(property);
}

public UIMenu parse(Property property) {
UIMenu menu = new UIMenu();
menu.setMenuId(property.id());
menu.setMenuId(property.getMetadata().getId());

for (TagAndValue<?> tagAndValue : property.tags()) {
for (BanyandbModel.Tag tag : property.getTagsList()) {
TagAndValue<?> tagAndValue = TagAndValue.fromProtobuf(tag);
if (tagAndValue.getTagName().equals(UIMenu.CONFIGURATION)) {
menu.setConfigurationJson((String) tagAndValue.getValue());
} else if (tagAndValue.getTagName().equals(UIMenu.UPDATE_TIME)) {
Expand Down
Loading
Loading