diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/CachingSinglePointIndex.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/CachingSinglePointIndex.java index 8b544a02f..c3a2ce2cb 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/CachingSinglePointIndex.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/CachingSinglePointIndex.java @@ -23,7 +23,6 @@ import io.pixelsdb.pixels.common.exception.SinglePointIndexException; import io.pixelsdb.pixels.common.utils.ConfigFactory; import io.pixelsdb.pixels.index.IndexProto; -import io.pixelsdb.pixels.common.index.LatestVersionCache.CacheEntry; import java.util.List; @@ -39,8 +38,8 @@ public CachingSinglePointIndex() if (cacheEnabled) { long capacity = Long.parseLong(config.getProperty("index.cache.capacity")); - long expireAfterAccessSeconds = Long.parseLong(config.getProperty("index.cache.expiration.seconds")); - this.cache = new LatestVersionCache(capacity, expireAfterAccessSeconds); + long expirationSeconds = Long.parseLong(config.getProperty("index.cache.expiration.seconds")); + this.cache = new LatestVersionCache(capacity, expirationSeconds); } else { this.cache = null; @@ -52,10 +51,16 @@ public long getUniqueRowId(IndexProto.IndexKey key) throws SinglePointIndexExcep { if (cache != null) { - CacheEntry cacheEntry = cache.get(key); - if (cacheEntry != null && cacheEntry.timestamp <= key.getTimestamp()) + final String cacheKey = LatestVersionCache.buildCacheKey(key); + final String cacheValue = cache.get(cacheKey); + + if (cacheValue != null) { - return cacheEntry.rowId; + long[] entry = LatestVersionCache.parseCacheValue(cacheValue); + if (entry[0] <= key.getTimestamp()) + { + return entry[1]; + } } } @@ -68,7 +73,9 @@ public final boolean putEntry(IndexProto.IndexKey key, long rowId) throws Single boolean success = putEntryInternal(key, rowId); if (isUnique() && cache != null && success) { - cache.put(key, rowId); + final String cacheKey = LatestVersionCache.buildCacheKey(key); + final String cacheValue = LatestVersionCache.buildCacheValue(key.getTimestamp(), rowId); + cache.put(cacheKey, cacheValue); } return success; } @@ -81,7 +88,9 @@ public boolean putPrimaryEntries(List entries) thr { for (IndexProto.PrimaryIndexEntry entry : entries) { - cache.put(entry.getIndexKey(), entry.getRowId()); + final String cacheKey = LatestVersionCache.buildCacheKey(entry.getIndexKey()); + final String cacheValue = LatestVersionCache.buildCacheValue(entry.getIndexKey().getTimestamp(), entry.getRowId()); + cache.put(cacheKey, cacheValue); } } return success; @@ -95,7 +104,9 @@ public boolean putSecondaryEntries(List entries) { for (IndexProto.SecondaryIndexEntry entry : entries) { - cache.put(entry.getIndexKey(), entry.getRowId()); + final String cacheKey = LatestVersionCache.buildCacheKey(entry.getIndexKey()); + final String cacheValue = LatestVersionCache.buildCacheValue(entry.getIndexKey().getTimestamp(), entry.getRowId()); + cache.put(cacheKey, cacheValue); } } return success; @@ -107,7 +118,9 @@ public long updatePrimaryEntry(IndexProto.IndexKey key, long rowId) throws Singl long previousRowId = updatePrimaryEntryInternal(key, rowId); if (cache != null) { - cache.put(key, rowId); + final String cacheKey = LatestVersionCache.buildCacheKey(key); + final String cacheValue = LatestVersionCache.buildCacheValue(key.getTimestamp(), rowId); + cache.put(cacheKey, cacheValue); } return previousRowId; } @@ -118,7 +131,9 @@ public List updateSecondaryEntry(IndexProto.IndexKey key, long rowId) thro List previousRowIds = updateSecondaryEntryInternal(key, rowId); if (isUnique() && cache != null) { - cache.put(key, rowId); + final String cacheKey = LatestVersionCache.buildCacheKey(key); + final String cacheValue = LatestVersionCache.buildCacheValue(key.getTimestamp(), rowId); + cache.put(cacheKey, cacheValue); } return previousRowIds; } @@ -131,7 +146,9 @@ public List updatePrimaryEntries(List entrie { for (IndexProto.PrimaryIndexEntry entry : entries) { - cache.put(entry.getIndexKey(), entry.getRowId()); + final String cacheKey = LatestVersionCache.buildCacheKey(entry.getIndexKey()); + final String cacheValue = LatestVersionCache.buildCacheValue(entry.getIndexKey().getTimestamp(), entry.getRowId()); + cache.put(cacheKey, cacheValue); } } return previousRowIds; @@ -145,7 +162,9 @@ public List updateSecondaryEntries(List en { for (IndexProto.SecondaryIndexEntry entry : entries) { - cache.put(entry.getIndexKey(), entry.getRowId()); + final String cacheKey = LatestVersionCache.buildCacheKey(entry.getIndexKey()); + final String cacheValue = LatestVersionCache.buildCacheValue(entry.getIndexKey().getTimestamp(), entry.getRowId()); + cache.put(cacheKey, cacheValue); } } return previousRowIds; @@ -157,7 +176,8 @@ public long deleteUniqueEntry(IndexProto.IndexKey indexKey) throws SinglePointIn long deleteRowId = deleteUniqueEntryInternal(indexKey); if (cache != null && deleteRowId >= 0) { - cache.invalidate(indexKey); + final String cacheKey = LatestVersionCache.buildCacheKey(indexKey); + cache.invalidate(cacheKey); } return deleteRowId; } @@ -168,7 +188,8 @@ public List deleteEntry(IndexProto.IndexKey key) throws SinglePointIndexEx List deletedRowIds = deleteEntryInternal(key); if (isUnique() && cache != null && !deletedRowIds.isEmpty()) { - cache.invalidate(key); + final String cacheKey = LatestVersionCache.buildCacheKey(key); + cache.invalidate(cacheKey); } return deletedRowIds; } @@ -181,7 +202,8 @@ public List deleteEntries(List keys) throws SinglePoi { for (IndexProto.IndexKey key : keys) { - cache.invalidate(key); + final String cacheKey = LatestVersionCache.buildCacheKey(key); + cache.invalidate(cacheKey); } } return deletedRowIds; @@ -195,7 +217,8 @@ public List purgeEntries(List indexKeys) throws Singl { for (IndexProto.IndexKey key : indexKeys) { - cache.invalidate(key); + final String cacheKey = LatestVersionCache.buildCacheKey(key); + cache.invalidate(cacheKey); } } return purgedRowIds; diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/LatestVersionCache.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/LatestVersionCache.java index 5a324f48e..7808f8509 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/LatestVersionCache.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/LatestVersionCache.java @@ -23,95 +23,72 @@ import com.github.benmanes.caffeine.cache.Cache; import io.pixelsdb.pixels.index.IndexProto; -import java.util.Objects; +import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; public class LatestVersionCache { - private final Cache cache; + private final Cache cache; - /** - * A wrapper for {@link IndexProto.IndexKey} that is used as a key in the cache. - * The {@link #equals(Object)} and {@link #hashCode()} methods are implemented based on - * the table ID, index ID, and key value, ignoring the timestamp. This allows cache - * lookups to succeed for the same logical key regardless of the transaction timestamp. - */ - private static class CacheKey + public LatestVersionCache(long capacity, long expirationSeconds) { - private final IndexProto.IndexKey indexKey; - - public CacheKey(IndexProto.IndexKey indexKey) - { - this.indexKey = indexKey; - } - - public IndexProto.IndexKey getIndexKey() - { - return indexKey; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - CacheKey other = (CacheKey) o; - // Compare based on tableId - return indexKey.getTableId() == other.indexKey.getTableId() && - indexKey.getIndexId() == other.indexKey.getIndexId() && - Objects.equals(indexKey.getKey(), other.indexKey.getKey()); - } - - @Override - public int hashCode() - { - return Objects.hash(indexKey.getTableId(), indexKey.getIndexId(), indexKey.getKey()); - } + this.cache = Caffeine.newBuilder() + .maximumSize(capacity) + .expireAfterWrite(expirationSeconds, TimeUnit.SECONDS) + .build(); } - public static class CacheEntry + public String get(String key) { - final long rowId; - final long timestamp; + return cache.getIfPresent(key); + } - CacheEntry (long rowId, long timestamp) - { - this.rowId = rowId; - this.timestamp = timestamp; - } + public void put(String key, String value) + { + cache.put(key, value); } - public LatestVersionCache(long maximumSize, long expireAfterAccessSeconds) + public void invalidate(String key) { - this.cache = Caffeine.newBuilder() - .maximumSize(maximumSize) - .expireAfterAccess(expireAfterAccessSeconds, TimeUnit.SECONDS) - .build(); + cache.invalidate(key); } - public CacheEntry get(IndexProto.IndexKey key) + public static String buildCacheKey(IndexProto.IndexKey key) { - return cache.getIfPresent(new CacheKey(key)); + String indexKey = key.getKey().toString(StandardCharsets.ISO_8859_1); + return new StringBuilder(20 + 20 + indexKey.length()) + .append(key.getTableId()) + .append(key.getIndexId()) + .append(indexKey) + .toString(); } - public void put(IndexProto.IndexKey key, long rowId) + public static String buildCacheValue(long timestamp, long rowId) { - CacheKey cacheKey = new CacheKey(key); - long newTimestamp = key.getTimestamp(); - cache.asMap().compute(cacheKey, (k, existingEntry) -> { - if (existingEntry == null || newTimestamp >= existingEntry.timestamp) - { - return new CacheEntry(rowId, newTimestamp); - } else - { - return existingEntry; - } - }); + char[] chars = new char[16]; + for (int i = 0; i < 8; i++) + { + chars[i] = (char) ((timestamp >> (i * 8)) & 0xFF); + chars[8 + i] = (char) ((rowId >> (i * 8)) & 0xFF); + } + return new String(chars); } - public void invalidate(IndexProto.IndexKey key) + public static long[] parseCacheValue(String value) { - cache.invalidate(new CacheKey(key)); + if (value == null || value.length() != 16) + { + return null; + } + long timestamp = 0; + long rowId = 0; + char[] chars = value.toCharArray(); + + for (int i = 0; i < 8; i++) + { + timestamp |= (long) (chars[i] & 0xFF) << (i * 8); + rowId |= (long) (chars[8 + i] & 0xFF) << (i * 8); + } + return new long[] { timestamp, rowId }; } } diff --git a/pixels-common/src/main/resources/pixels.properties b/pixels-common/src/main/resources/pixels.properties index a38679fcf..d591c529b 100644 --- a/pixels-common/src/main/resources/pixels.properties +++ b/pixels-common/src/main/resources/pixels.properties @@ -295,6 +295,38 @@ index.rockset.read.only=false index.rocksdb.data.path=/tmp/rocksdb # rocksdb write buffer size (default to 64MB) index.rocksdb.write.buffer.size=67108864 +# rocksdb max write buffer number (default to 3) +index.rocksdb.max.write.buffer.number=3 +# rocksdb max background flush threads (default to 2) +index.rocksdb.max.background.flushes=2 +# rocksdb max background compactions (default to 4) +index.rocksdb.max.background.compactions=4 +# rocksdb max open files (default to 4096) +index.rocksdb.max.open.files=4096 +# rocksdb block cache capacity (default to 1GB) +index.rocksdb.block.cache.capacity=1073741824 +# rocksdb block cache shard bits (default to 6, i.e., 64 shards) +index.rocksdb.block.cache.shard.bits=6 +# rocksdb block size (default to 16KB) +index.rocksdb.block.size=16384 +# rocksdb min write buffer number to merge (default to 2) +index.rocksdb.min.write.buffer.number.to.merge=2 +# rocksdb file number compaction trigger (default to 4) +index.rocksdb.level0.file.num.compaction.trigger=4 +# rocksdb max bytes for level base (default to 256MB) +index.rocksdb.max.bytes.for.level.base=268435456 +rocksdb max bytes for level multiplier (default to 10) +index.rocksdb.max.bytes.for.level.multiplier=10 +# rocksdb target file size base (default to 64MB) +index.rocksdb.target.file.size.base=67108864 +# rocksdb file size multiplier (default to 1) +index.rocksdb.target.file.size.multiplier=1 +# rocksdb max subcompactions +index.rocksdb.max.subcompactions=1 +# rocksdb compression type (e.g. NO_COMPRESSION, SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZ2_COMPRESSION, LZ4_COMPRESSION, LZ4HC_COMPRESSION, ZSTD_COMPRESSION) +index.rocksdb.compression.type=LZ4_COMPRESSION +# rocksdb bottommost compression type +index.rocksdb.bottommost.compression.type=ZSTD_COMPRESSION # Whether to enable the latest version cache for SinglePointIndex index.cache.enabled=false # The maximum number of entries in the cache diff --git a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java index b1e58979b..2b6a9899d 100644 --- a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java +++ b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java @@ -39,6 +39,11 @@ public class RocksDBFactory private static final String dbPath = ConfigFactory.Instance().getProperty("index.rocksdb.data.path"); private static final boolean multiCF = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("index.rocksdb.multicf")); private static RocksDB instance; + + private static Cache blockCache; + private static final long blockCacheCapacity = Long.parseLong(ConfigFactory.Instance().getProperty("index.rocksdb.block.cache.capacity")); + private static final int blockCacheshardBits = Integer.parseInt(ConfigFactory.Instance().getProperty("index.rocksdb.block.cache.shard.bits")); + /** * The reference counter. */ @@ -66,6 +71,12 @@ private static RocksDB createRocksDB() throws RocksDBException existingColumnFamilies = new ArrayList<>(existingColumnFamilies); existingColumnFamilies.add(RocksDB.DEFAULT_COLUMN_FAMILY); } + + if (blockCache == null) + { + blockCache = new LRUCache(blockCacheCapacity, blockCacheshardBits); + } + // 3. Prepare column family descriptors List descriptors = existingColumnFamilies.stream() .map(RocksDBFactory::createCFDescriptor) @@ -73,9 +84,17 @@ private static RocksDB createRocksDB() throws RocksDBException // 4. Open DB List handles = new ArrayList<>(); + int maxBackgroundFlushed = Integer.parseInt(ConfigFactory.Instance().getProperty("index.rocksdb.max.background.flushes")); + int maxBackgroundCompactions = Integer.parseInt(ConfigFactory.Instance().getProperty("index.rocksdb.max.background.compactions")); + int maxSubcompactions = Integer.parseInt(ConfigFactory.Instance().getProperty("index.rocksdb.max.subcompactions")); + int maxOpenFiles = Integer.parseInt(ConfigFactory.Instance().getProperty("index.rocksdb.max.open.files")); DBOptions dbOptions = new DBOptions() .setCreateIfMissing(true) - .setCreateMissingColumnFamilies(true); + .setCreateMissingColumnFamilies(true) + .setMaxBackgroundFlushes(maxBackgroundFlushed) + .setMaxBackgroundCompactions(maxBackgroundCompactions) + .setMaxSubcompactions(maxSubcompactions) + .setMaxOpenFiles(maxOpenFiles); RocksDB db = RocksDB.open(dbOptions, dbPath, descriptors, handles); @@ -91,14 +110,43 @@ private static RocksDB createRocksDB() throws RocksDBException private static ColumnFamilyDescriptor createCFDescriptor(byte[] name) { ConfigFactory config = ConfigFactory.Instance(); - long writeBufferSize = Long.parseLong(config.getProperty("index.rocksdb.write.buffer.size")); + + long blockSize = Long.parseLong(config.getProperty("index.rocksdb.block.size")); BlockBasedTableConfig tableConfig = new BlockBasedTableConfig() .setFilterPolicy(new BloomFilter(10, false)) - .setWholeKeyFiltering(false); + .setWholeKeyFiltering(false) + .setBlockSize(blockSize) + .setBlockCache(blockCache); + + // ColumnFamily Options + long writeBufferSize = Long.parseLong(config.getProperty("index.rocksdb.write.buffer.size")); + int maxWriteBufferNumber = Integer.parseInt(config.getProperty("index.rocksdb.max.write.buffer.number")); + int minWriteBufferNumberToMerge = Integer.parseInt(config.getProperty("index.rocksdb.min.write.buffer.number.to.merge")); + + // Compaction Options + int level0FileNumCompactionTrigger = Integer.parseInt(config.getProperty("index.rocksdb.level0.file.num.compaction.trigger")); + long maxBytesForLevelBase = Long.parseLong(config.getProperty("index.rocksdb.max.bytes.for.level.base")); + int maxBytesForLevelMultiplier = Integer.parseInt(config.getProperty("index.rocksdb.max.bytes.for.level.multiplier")); + long targetFileSizeBase = Long.parseLong(config.getProperty("index.rocksdb.target.file.size.base")); + int targetFileSizeMultiplier = Integer.parseInt(config.getProperty("index.rocksdb.target.file.size.multiplier")); + + // Compression Options + CompressionType compressionType = CompressionType.valueOf(config.getProperty("index.rocksdb.compression.type")); + CompressionType bottommostCompressionType = CompressionType.valueOf(config.getProperty("index.rocksdb.bottommost.compression.type")); + ColumnFamilyOptions cfOptions = new ColumnFamilyOptions() .setWriteBufferSize(writeBufferSize) + .setMaxWriteBufferNumber(maxWriteBufferNumber) + .setMinWriteBufferNumberToMerge(minWriteBufferNumberToMerge) .setMemtablePrefixBloomSizeRatio(0.1) - .setTableFormatConfig(tableConfig); + .setTableFormatConfig(tableConfig) + .setLevel0FileNumCompactionTrigger(level0FileNumCompactionTrigger) + .setMaxBytesForLevelBase(maxBytesForLevelBase) + .setMaxBytesForLevelMultiplier(maxBytesForLevelMultiplier) + .setTargetFileSizeBase(targetFileSizeBase) + .setTargetFileSizeMultiplier(targetFileSizeMultiplier) + .setCompressionType(compressionType) + .setBottommostCompressionType(bottommostCompressionType); return new ColumnFamilyDescriptor(name, cfOptions); }