From b7d9bc6f8dce91a87e4e49883dd44687e09ad97d Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Wed, 2 Oct 2024 23:28:09 -0400 Subject: [PATCH 1/3] Time Series Metadata --- src/storage/redis_metadata.h | 7 +- src/storage/storage.h | 12 ++- src/storage/time_encoding.h | 183 +++++++++++++++++++++++++++++++++++ 3 files changed, 196 insertions(+), 6 deletions(-) create mode 100644 src/storage/time_encoding.h diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 5590609be37..8ee98fed02d 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -50,6 +50,7 @@ enum RedisType : uint8_t { kRedisBloomFilter = 9, kRedisJson = 10, kRedisHyperLogLog = 11, + kRedisTimeSeries = 12, }; struct RedisTypes { @@ -91,9 +92,9 @@ enum RedisCommand { kRedisCmdLMove, }; -const std::vector RedisTypeNames = {"none", "string", "hash", "list", - "set", "zset", "bitmap", "sortedint", - "stream", "MBbloom--", "ReJSON-RL", "hyperloglog"}; +const std::vector RedisTypeNames = {"none", "string", "hash", "list", "set", + "zset", "bitmap", "sortedint", "stream", "MBbloom--", + "ReJSON-RL", "hyperloglog", "timeseries"}; constexpr const char *kErrMsgWrongType = "WRONGTYPE Operation against a key holding the wrong kind of value"; constexpr const char *kErrMsgKeyExpired = "the key was expired"; diff --git a/src/storage/storage.h b/src/storage/storage.h index 5c3baa0b06c..1c3dcf84ef9 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -68,6 +68,7 @@ enum class ColumnFamilyID : uint32_t { Propagate, Stream, Search, + TimeSeries, }; constexpr uint32_t kMaxColumnFamilyID = static_cast(ColumnFamilyID::Search); @@ -140,6 +141,7 @@ constexpr const std::string_view kPubSubColumnFamilyName = "pubsub"; constexpr const std::string_view kPropagateColumnFamilyName = "propagate"; constexpr const std::string_view kStreamColumnFamilyName = "stream"; constexpr const std::string_view kSearchColumnFamilyName = "search"; +constexpr const std::string_view kTimeSeriesColumnFamilyName = "time_series"; class ColumnFamilyConfigs { public: @@ -178,6 +180,10 @@ class ColumnFamilyConfigs { return {ColumnFamilyID::Search, kSearchColumnFamilyName, /*is_minor=*/true}; } + static ColumnFamilyConfig TimeSeriesColumnFamily() { + return {ColumnFamilyID::TimeSeries, kTimeSeriesColumnFamilyName, /*is_minor=*/true}; + } + /// ListAllColumnFamilies returns all column families in kvrocks. static const std::vector &ListAllColumnFamilies() { return AllCfs; } @@ -189,11 +195,11 @@ class ColumnFamilyConfigs { // Caution: don't change the order of column family, or the handle will be mismatched inline const static std::vector AllCfs = { PrimarySubkeyColumnFamily(), MetadataColumnFamily(), SecondarySubkeyColumnFamily(), PubSubColumnFamily(), - PropagateColumnFamily(), StreamColumnFamily(), SearchColumnFamily(), + PropagateColumnFamily(), StreamColumnFamily(), SearchColumnFamily(), TimeSeriesColumnFamily(), }; inline const static std::vector AllCfsWithoutDefault = { - MetadataColumnFamily(), SecondarySubkeyColumnFamily(), PubSubColumnFamily(), - PropagateColumnFamily(), StreamColumnFamily(), SearchColumnFamily(), + MetadataColumnFamily(), SecondarySubkeyColumnFamily(), PubSubColumnFamily(), PropagateColumnFamily(), + StreamColumnFamily(), SearchColumnFamily(), TimeSeriesColumnFamily(), }; }; diff --git a/src/storage/time_encoding.h b/src/storage/time_encoding.h new file mode 100644 index 00000000000..74edf4a8d3c --- /dev/null +++ b/src/storage/time_encoding.h @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include +#include + +#include + +namespace redis { + +enum class TimeSeriesSubkeyType : uint8_t { + METADATA = 0, + DATA_POINT = 1, + LABEL_INDEX = 2, + AGGREGATION_RULE = 3, +}; + +enum class DuplicatePolicy : uint8_t { + BLOCK = 0, + FIRST = 1, + LAST = 2, + MIN = 3, + MAX = 4, + SUM = 5, +}; + +class TimeSeriesMetadata : public Metadata { + public: + uint64_t num_samples = 0; + uint64_t total_memory = 0; + uint64_t first_ts = 0; + uint64_t last_ts = 0; + uint64_t retention_time = 0; // In milliseconds + uint32_t chunk_count = 0; + uint32_t chunk_size = 4096; // Default chunk size + uint8_t chunk_type = 1; // Compressed = 1, Decompressed = 0 + DuplicatePolicy duplicate_policy = DuplicatePolicy::BLOCK; + std::string source_key; // For aggregations + + explicit TimeSeriesMetadata(bool generate_version = true) : Metadata(kRedisTimeSeries, generate_version) {} + + void Encode(std::string *dst) const override { + Metadata::Encode(dst); + PutFixed64(dst, num_samples); + PutFixed64(dst, total_memory); + PutFixed64(dst, first_ts); + PutFixed64(dst, last_ts); + PutFixed64(dst, retention_time); + PutFixed32(dst, chunk_count); + PutFixed32(dst, chunk_size); + PutFixed8(dst, chunk_type); + PutFixed8(dst, uint8_t(duplicate_policy)); + PutSizedString(dst, source_key); + } + + rocksdb::Status Decode(Slice *input) override { + rocksdb::Status s = Metadata::Decode(input); + GetFixed64(input, &num_samples); + GetFixed64(input, &total_memory); + GetFixed64(input, &first_ts); + GetFixed64(input, &last_ts); + GetFixed64(input, &retention_time); + GetFixed32(input, &chunk_count); + GetFixed32(input, &chunk_size); + GetFixed8(input, &chunk_type); + uint8_t dp{}; + GetFixed8(input, &dp); + duplicate_policy = static_cast(dp); + + rocksdb::Slice source_key_slice; + if (!GetSizedString(input, &source_key_slice)) { + return rocksdb::Status::Corruption("Failed to decode source_key"); + } + source_key = std::string(source_key_slice.data(), source_key_slice.size()); + return rocksdb::Status::OK(); + } +}; + +struct TimeSeriesKey { + std::string_view ns; + std::string_view key; + uint64_t version = 0; + uint64_t timestamp = 0; + TimeSeriesSubkeyType subkey_type; + + TimeSeriesKey(std::string_view ns, std::string_view key, TimeSeriesSubkeyType subkey_type) + : ns(ns), key(key), subkey_type(subkey_type) {} + + TimeSeriesKey(std::string_view ns, std::string_view key, uint64_t version, uint64_t timestamp) + : ns(ns), key(key), version(version), timestamp(timestamp), subkey_type(TimeSeriesSubkeyType::DATA_POINT) {} + + void PutNamespace(std::string *dst) const { + PutFixed8(dst, ns.size()); + dst->append(ns); + } + + void PutType(std::string *dst) const { PutFixed8(dst, uint8_t(subkey_type)); } + + void PutKey(std::string *dst) const { PutSizedString(dst, key); } + + std::string ConstructMetadataKey() const { + std::string dst; + PutNamespace(&dst); + PutType(&dst); + PutKey(&dst); + return dst; + } + + std::string ConstructDataPointKey() const { + std::string dst; + PutNamespace(&dst); + PutType(&dst); + PutKey(&dst); + PutFixed64(&dst, version); + PutFixed64(&dst, timestamp); + return dst; + } + + std::string ConstructLabelIndexKey(const std::string &label_key, const std::string &label_value) const { + std::string dst; + PutNamespace(&dst); + PutFixed8(&dst, uint8_t(TimeSeriesSubkeyType::LABEL_INDEX)); + PutSizedString(&dst, label_key); + PutSizedString(&dst, label_value); + PutKey(&dst); + return dst; + } + + std::string ConstructAggregationRuleKey(const std::string &dest_key) const { + std::string dst; + PutNamespace(&dst); + PutFixed8(&dst, uint8_t(TimeSeriesSubkeyType::AGGREGATION_RULE)); + PutKey(&dst); + PutFixed64(&dst, version); + PutSizedString(&dst, dest_key); + return dst; + } +}; + +enum class AggregationRules : uint8_t { + avg = 0, + sum, + min, + max, +}; + +struct AggregationIndex { + uint64_t bucket_duration; + AggregationRules aggregator; + + void Encode(std::string *dst) const { + PutFixed64(dst, bucket_duration); + PutFixed8(dst, uint8_t(aggregator)); + } + + rocksdb::Status Decode(Slice *input) { + GetFixed64(input, &bucket_duration); + uint8_t aggregator = 0; + GetFixed8(input, &aggregator); + this->aggregator = static_cast(aggregator); + return rocksdb::Status::OK(); + } +}; +} // namespace redis From da3bdb38ded79db8eadcfbc74acd9a548441e3bd Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Thu, 3 Oct 2024 12:13:15 -0400 Subject: [PATCH 2/3] Take out column --- src/storage/storage.h | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/storage/storage.h b/src/storage/storage.h index 1c3dcf84ef9..5c3baa0b06c 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -68,7 +68,6 @@ enum class ColumnFamilyID : uint32_t { Propagate, Stream, Search, - TimeSeries, }; constexpr uint32_t kMaxColumnFamilyID = static_cast(ColumnFamilyID::Search); @@ -141,7 +140,6 @@ constexpr const std::string_view kPubSubColumnFamilyName = "pubsub"; constexpr const std::string_view kPropagateColumnFamilyName = "propagate"; constexpr const std::string_view kStreamColumnFamilyName = "stream"; constexpr const std::string_view kSearchColumnFamilyName = "search"; -constexpr const std::string_view kTimeSeriesColumnFamilyName = "time_series"; class ColumnFamilyConfigs { public: @@ -180,10 +178,6 @@ class ColumnFamilyConfigs { return {ColumnFamilyID::Search, kSearchColumnFamilyName, /*is_minor=*/true}; } - static ColumnFamilyConfig TimeSeriesColumnFamily() { - return {ColumnFamilyID::TimeSeries, kTimeSeriesColumnFamilyName, /*is_minor=*/true}; - } - /// ListAllColumnFamilies returns all column families in kvrocks. static const std::vector &ListAllColumnFamilies() { return AllCfs; } @@ -195,11 +189,11 @@ class ColumnFamilyConfigs { // Caution: don't change the order of column family, or the handle will be mismatched inline const static std::vector AllCfs = { PrimarySubkeyColumnFamily(), MetadataColumnFamily(), SecondarySubkeyColumnFamily(), PubSubColumnFamily(), - PropagateColumnFamily(), StreamColumnFamily(), SearchColumnFamily(), TimeSeriesColumnFamily(), + PropagateColumnFamily(), StreamColumnFamily(), SearchColumnFamily(), }; inline const static std::vector AllCfsWithoutDefault = { - MetadataColumnFamily(), SecondarySubkeyColumnFamily(), PubSubColumnFamily(), PropagateColumnFamily(), - StreamColumnFamily(), SearchColumnFamily(), TimeSeriesColumnFamily(), + MetadataColumnFamily(), SecondarySubkeyColumnFamily(), PubSubColumnFamily(), + PropagateColumnFamily(), StreamColumnFamily(), SearchColumnFamily(), }; }; From 9e5386e8cb6513f8f1fd599b2df365b51b8b16fe Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Fri, 4 Oct 2024 09:55:43 -0400 Subject: [PATCH 3/3] Removed some code --- src/storage/time_encoding.h | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/storage/time_encoding.h b/src/storage/time_encoding.h index 74edf4a8d3c..7c12b08553e 100644 --- a/src/storage/time_encoding.h +++ b/src/storage/time_encoding.h @@ -117,14 +117,6 @@ struct TimeSeriesKey { void PutKey(std::string *dst) const { PutSizedString(dst, key); } - std::string ConstructMetadataKey() const { - std::string dst; - PutNamespace(&dst); - PutType(&dst); - PutKey(&dst); - return dst; - } - std::string ConstructDataPointKey() const { std::string dst; PutNamespace(&dst);