Skip to content

Commit

Permalink
* Refactor metadata object to original protocol.
Browse files Browse the repository at this point in the history
* Complemented the Schema management API.
* Enhance the MetadataCache.
* Add more IT tests.
  • Loading branch information
wankai123 committed Sep 13, 2024
1 parent 24ca171 commit b17c47e
Show file tree
Hide file tree
Showing 44 changed files with 1,794 additions and 3,825 deletions.
13 changes: 13 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@ Changes by Version
==================
Release Notes.

0.7.0-rc3
------------------

### Features

* Refactor metadata object to original protocol.
* Complemented the Schema management API.
* Enhance the MetadataCache.
* Add more IT tests.

### Bugs


0.7.0-rc0
------------------

Expand Down
185 changes: 147 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,53 +50,162 @@ options are listed below,

### Stream and index rules

Then we may define a stream with customized configurations. The following example uses `SegmentRecord` in SkyWalking OAP
as an illustration,

```java
// build a stream default(group)/sw(name) with 2 shards and ttl equals to 30 days
Stream s = Stream.create("default", "sw")
// set entities
.setEntityRelativeTags("service_id", "service_instance_id", "is_error")
// add a tag family "data"
.addTagFamily(TagFamilySpec.create("data")
.addTagSpec(TagFamilySpec.TagSpec.newBinaryTag("data_binary"))
.build())
// add a tag family "searchable"
.addTagFamily(TagFamilySpec.create("searchable")
// create a string tag "trace_id"
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("trace_id"))
.addTagSpec(TagFamilySpec.TagSpec.newIntTag("is_error"))
// service_id is not stored, but can be searched through the index
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("service_id").indexedOnly())
.build())
.build();
#### Define a Group
```java
// build a group sw_record for Stream with 2 shards and ttl equals to 3 days
Group g = Group.newBuilder().setMetadata(Metadata.newBuilder().setName("sw_record"))
.setCatalog(Catalog.CATALOG_STREAM)
.setResourceOpts(ResourceOpts.newBuilder()
.setShardNum(2)
.setSegmentInterval(
IntervalRule.newBuilder()
.setUnit(
IntervalRule.Unit.UNIT_DAY)
.setNum(
1))
.setTtl(
IntervalRule.newBuilder()
.setUnit(
IntervalRule.Unit.UNIT_DAY)
.setNum(
3)))
.build();
client.define(g);
```

Then we may define a stream with customized configurations.

#### Define a Stream
```java
// build a stream trace with above group
Stream s = Stream.newBuilder()
.setMetadata(Metadata.newBuilder()
.setGroup("sw_record")
.setName("trace"))
.setEntity(Entity.newBuilder().addAllTagNames(
Arrays.asList("service_id", "service_instance_id", "is_error")))
.addTagFamilies(TagFamilySpec.newBuilder()
.setName("data")
.addTags(TagSpec.newBuilder()
.setName("data_binary")
.setType(TagType.TAG_TYPE_DATA_BINARY)))
.addTagFamilies(TagFamilySpec.newBuilder()
.setName("searchable")
.addTags(TagSpec.newBuilder()
.setName("trace_id")
.setType(TagType.TAG_TYPE_STRING))
.addTags(TagSpec.newBuilder()
.setName("is_error")
.setType(TagType.TAG_TYPE_INT))
.addTags(TagSpec.newBuilder()
.setName("service_id")
.setType(TagType.TAG_TYPE_STRING)
.setIndexedOnly(true)))
.build();
client.define(s);
```

#### Define a IndexRules
```java
IndexRule.Builder ir = IndexRule.newBuilder()
.setMetadata(Metadata.newBuilder()
.setGroup("sw_record")
.setName("trace_id"))
.addTags("trace_id")
.setType(IndexRule.Type.TYPE_INVERTED)
.setAnalyzer(IndexRule.Analyzer.ANALYZER_UNSPECIFIED);
client.define(ir.build());
```

#### Define a IndexRuleBinding
```java
IndexRuleBinding.Builder irb = IndexRuleBinding.newBuilder()
.setMetadata(BanyandbCommon.Metadata.newBuilder()
.setGroup("sw_record")
.setName("trace_binding"))
.setSubject(BanyandbDatabase.Subject.newBuilder()
.setCatalog(
BanyandbCommon.Catalog.CATALOG_STREAM)
.setName("trace"))
.addAllRules(
Arrays.asList("trace_id"))
.setBeginAt(TimeUtils.buildTimestamp(ZonedDateTime.of(2024, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC)))
.setExpireAt(TimeUtils.buildTimestamp(DEFAULT_EXPIRE_AT));
client.define(irb.build());
```

For the last line in the code block, a simple API (i.e. `BanyanDBClient.define(Stream)`) is used to define the schema of `Stream`.
The same works for `Measure` which will be demonstrated later.

### Measure and index rules

`Measure` can also be defined directly with `BanyanDBClient`,

#### Define a Group
```java
// build a group sw_metrics for Measure with 2 shards and ttl equals to 7 days
Group g = Group.newBuilder().setMetadata(Metadata.newBuilder().setName("sw_metric"))
.setCatalog(Catalog.CATALOG_MEASURE)
.setResourceOpts(ResourceOpts.newBuilder()
.setShardNum(2)
.setSegmentInterval(
IntervalRule.newBuilder()
.setUnit(
IntervalRule.Unit.UNIT_DAY)
.setNum(
1))
.setTtl(
IntervalRule.newBuilder()
.setUnit(
IntervalRule.Unit.UNIT_DAY)
.setNum(
7)))
.build();
client.define(g);
```

#### Define a Measure
```java
// create a new measure schema with an additional interval
// the interval is used to specify how frequently to send a data point
Measure m = Measure.create("sw_metric", "service_cpm_minute", Duration.ofHours(1))
// set entity
.setEntityRelativeTags("entity_id")
// define a tag family "default"
.addTagFamily(TagFamilySpec.create("default")
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("id"))
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("entity_id"))
.build())
// define field specs
// compressMethod and encodingMethod can be specified
.addField(Measure.FieldSpec.newIntField("total").compressWithZSTD().encodeWithGorilla().build())
.addField(Measure.FieldSpec.newIntField("value").compressWithZSTD().encodeWithGorilla().build())
.build();
Measure m = Measure.newBuilder()
.setMetadata(Metadata.newBuilder()
.setGroup("sw_metric")
.setName("service_cpm_minute"))
.setInterval(Duration.ofMinutes(1).format())
.setEntity(Entity.newBuilder().addTagNames("entity_id"))
.addTagFamilies(
TagFamilySpec.newBuilder()
.setName("default")
.addTags(
TagSpec.newBuilder()
.setName("entity_id")
.setType(
TagType.TAG_TYPE_STRING))
.addTags(
TagSpec.newBuilder()
.setName("scope")
.setType(
TagType.TAG_TYPE_STRING)))
.addFields(
FieldSpec.newBuilder()
.setName("total")
.setFieldType(
FieldType.FIELD_TYPE_INT)
.setCompressionMethod(
CompressionMethod.COMPRESSION_METHOD_ZSTD)
.setEncodingMethod(
EncodingMethod.ENCODING_METHOD_GORILLA))
.addFields(
FieldSpec.newBuilder()
.setName("value")
.setFieldType(
FieldType.FIELD_TYPE_INT)
.setCompressionMethod(
CompressionMethod.COMPRESSION_METHOD_ZSTD)
.setEncodingMethod(
EncodingMethod.ENCODING_METHOD_GORILLA))
.build();
// define a measure, as we've mentioned above
client.define(m);
```
Expand All @@ -118,14 +227,14 @@ For example,
Instant end = Instant.now();
Instant begin = end.minus(15, ChronoUnit.MINUTES);
// with stream schema, group=default, name=sw
StreamQuery query = new StreamQuery(Lists.newArrayList("default"), "sw",
StreamQuery query = new StreamQuery(Lists.newArrayList("sw_record"), "trace",
new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
// projection tags which are indexed
ImmutableSet.of("state", "start_time", "duration", "trace_id"));
// search for all states
query.and(PairQueryCondition.LongQueryCondition.eq("searchable", "state" , 0L));
query.and(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id" , "1a60e0846817447eac4cd498eefd3743.1.17261060724190003"));
// set order by condition
query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC));
query.setOrderBy(new AbstractQuery.OrderBy(AbstractQuery.Sort.DESC));
// set projection for un-indexed tags
query.setDataProjections(ImmutableSet.of("data_binary"));
// send the query request
Expand Down Expand Up @@ -424,4 +533,4 @@ Please follow the [REPORTING GUIDELINES](https://www.apache.org/foundation/polic
* [bilibili B站 视频](https://space.bilibili.com/390683219)

# License
[Apache 2.0 License.](LICENSE)
[Apache 2.0 License.](LICENSE)
Loading

0 comments on commit b17c47e

Please sign in to comment.