Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ option(USE_AVX2 "Enable AVX2 (FMA, F16C)" OFF)
option(USE_SVE2 "Enable SVE2 (INT8/16, FP16)" OFF)
option(USE_NEON "Enable NEON (FP16, DotProd)" OFF)
option(NDD_INV_IDX_STORE_FLOATS "Store raw float 32 values in sparse index (no quantization)" OFF)
set(NDD_DB_BACKEND "mdbx" CACHE STRING "Database backend to compile against (mdbx or lmdb)")
set_property(CACHE NDD_DB_BACKEND PROPERTY STRINGS mdbx lmdb)

# Check if any SIMD option is selected
if(NOT USE_AVX512 AND NOT USE_AVX2 AND NOT USE_SVE2 AND NOT USE_NEON)
Expand Down Expand Up @@ -224,9 +226,34 @@ endif()
# =======================
find_package(Threads REQUIRED)

# Find MDBX (replaces LMDB)
find_path(LMDB_INCLUDE_DIR NAMES mdbx.h PATHS ${CMAKE_SOURCE_DIR}/third_party/mdbx NO_DEFAULT_PATH)
file(GLOB LMDB_SOURCES ${CMAKE_SOURCE_DIR}/third_party/mdbx/*.c)
file(STRINGS "${CMAKE_CURRENT_SOURCE_DIR}/src/utils/settings.hpp" NDD_DB_PAGE_BITS_LINE
REGEX "constexpr size_t MDBX_PAGE_SIZE_BITS = [0-9]+")
if(NOT NDD_DB_PAGE_BITS_LINE)
message(FATAL_ERROR "Failed to extract MDBX_PAGE_SIZE_BITS from src/utils/settings.hpp")
endif()
list(GET NDD_DB_PAGE_BITS_LINE 0 NDD_DB_PAGE_BITS_LINE)
string(REGEX MATCH "constexpr size_t MDBX_PAGE_SIZE_BITS = ([0-9]+)" _match "${NDD_DB_PAGE_BITS_LINE}")
set(NDD_DB_PAGE_SIZE_BITS "${CMAKE_MATCH_1}")
if(NOT NDD_DB_PAGE_SIZE_BITS)
message(FATAL_ERROR "Could not parse MDBX_PAGE_SIZE_BITS from: ${NDD_DB_PAGE_BITS_LINE}")
endif()
math(EXPR NDD_DB_PAGE_SIZE_BYTES "1 << ${NDD_DB_PAGE_SIZE_BITS}")

string(TOLOWER "${NDD_DB_BACKEND}" NDD_DB_BACKEND)
if(NDD_DB_BACKEND STREQUAL "mdbx")
set(NDD_DB_BACKEND_DEFINITION NDD_DB_BACKEND_MDBX)
set(DB_BACKEND_INCLUDE_DIR "${CMAKE_SOURCE_DIR}/third_party/mdbx")
file(GLOB DB_BACKEND_SOURCES ${CMAKE_SOURCE_DIR}/third_party/mdbx/*.c)
elseif(NDD_DB_BACKEND STREQUAL "lmdb")
set(NDD_DB_BACKEND_DEFINITION NDD_DB_BACKEND_LMDB)
set(DB_BACKEND_INCLUDE_DIR "${CMAKE_SOURCE_DIR}/third_party/lmdb")
set(DB_BACKEND_SOURCES
${CMAKE_SOURCE_DIR}/third_party/lmdb/mdb.c
${CMAKE_SOURCE_DIR}/third_party/lmdb/midl.c
)
else()
message(FATAL_ERROR "Unsupported NDD_DB_BACKEND='${NDD_DB_BACKEND}'. Use 'mdbx' or 'lmdb'.")
endif()

# -----------------------
# Derive binary name
Expand Down Expand Up @@ -263,14 +290,15 @@ add_library(ndd_core OBJECT ${NDD_CORE_SOURCES})
add_executable(${NDD_BINARY_NAME}
src/main.cpp
$<TARGET_OBJECTS:ndd_core>
${LMDB_SOURCES}
${DB_BACKEND_SOURCES}
third_party/roaring_bitmap/roaring.c
)

# Set MDBX-specific compile flags
set_source_files_properties(${LMDB_SOURCES} PROPERTIES
COMPILE_FLAGS "-DMDBX_BUILD_SHARED_LIBRARY=0 -DMDBX_BUILD_FLAGS=\\\"NDD_EMBEDDED\\\""
)
if(NDD_DB_BACKEND STREQUAL "mdbx")
set_source_files_properties(${DB_BACKEND_SOURCES} PROPERTIES
COMPILE_FLAGS "-DMDBX_BUILD_SHARED_LIBRARY=0 -DMDBX_BUILD_FLAGS=\\\"NDD_EMBEDDED\\\""
)
endif()

# Include directories
target_include_directories(ndd_core PRIVATE
Expand All @@ -283,7 +311,7 @@ target_include_directories(ndd_core PRIVATE
${CROW_INCLUDE_DIR}
${MSGPACK_INCLUDE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/src/roaring
${LMDB_INCLUDE_DIR}
${DB_BACKEND_INCLUDE_DIR}
${ASIO_INCLUDE_DIR}
${OPENSSL_INCLUDE_DIR}
${CURL_INCLUDE_DIRS}
Expand All @@ -298,7 +326,7 @@ target_include_directories(${NDD_BINARY_NAME} PRIVATE
${CROW_INCLUDE_DIR}
${MSGPACK_INCLUDE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/src/roaring
${LMDB_INCLUDE_DIR}
${DB_BACKEND_INCLUDE_DIR}
${ASIO_INCLUDE_DIR}
${OPENSSL_INCLUDE_DIR}
${CURL_INCLUDE_DIRS}
Expand Down Expand Up @@ -347,6 +375,13 @@ if(NDD_INV_IDX_STORE_FLOATS)
target_compile_definitions(${NDD_BINARY_NAME} PRIVATE NDD_INV_IDX_STORE_FLOATS)
endif()

target_compile_definitions(ndd_core PRIVATE ${NDD_DB_BACKEND_DEFINITION})
target_compile_definitions(${NDD_BINARY_NAME} PRIVATE ${NDD_DB_BACKEND_DEFINITION})
if(NDD_DB_BACKEND STREQUAL "lmdb")
target_compile_definitions(ndd_core PRIVATE MDB_FIXED_PAGESIZE=${NDD_DB_PAGE_SIZE_BYTES})
target_compile_definitions(${NDD_BINARY_NAME} PRIVATE MDB_FIXED_PAGESIZE=${NDD_DB_PAGE_SIZE_BYTES})
endif()

# Add ASIO definitions
target_compile_definitions(ndd_core PRIVATE
ASIO_STANDALONE
Expand Down Expand Up @@ -384,6 +419,8 @@ if(ENABLE_TESTING)
endif()

message(STATUS "Processor: ${CMAKE_SYSTEM_PROCESSOR}")
message(STATUS "DB backend: ${NDD_DB_BACKEND}")
message(STATUS "DB page size: ${NDD_DB_PAGE_SIZE_BYTES}")
if(USE_AVX512)
message(STATUS "SIMD Mode: AVX512")
elseif(USE_AVX2)
Expand All @@ -394,7 +431,7 @@ elseif(USE_NEON)
message(STATUS "SIMD Mode: NEON")
endif()
message(STATUS "ASIO include dir: ${ASIO_INCLUDE_DIR}")
message(STATUS "LMDB include dir: ${LMDB_INCLUDE_DIR}")
message(STATUS "DB backend include dir: ${DB_BACKEND_INCLUDE_DIR}")
message(STATUS "OpenSSL include dir: ${OPENSSL_INCLUDE_DIR}")

# Create a symbolic link named 'ndd' pointing to the architecture-specific binary
Expand Down
63 changes: 44 additions & 19 deletions src/core/ndd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class IndexManager {
std::deque<std::string> indices_list_;
std::unordered_map<std::string, CacheEntry> indices_;
std::shared_mutex indices_mutex_;
size_t max_active_indices_;
std::string data_dir_;
// This is for locking the LRU
std::shared_mutex active_indices_mutex_;
Expand Down Expand Up @@ -385,36 +386,55 @@ class IndexManager {

public:
// Evict the last index if the total size exceeds the limit
void evictIfNeeded() {
void evictIfNeeded(size_t reserve_slots = 0) {
// Go through indices and get the total size. If it exceeds the limit, evict the last one
size_t total_size = 0;
for(auto& [index_id, entry] : indices_) {
if(entry.alg) {
total_size += entry.alg->getApproxSizeGB();
}
}
if(total_size > settings::MAX_MEMORY_GB) {
// Make sure that there is at least one index in memory and we use only 80% of the total
// size
while((total_size > 0.80 * settings::MAX_MEMORY_GB) && (indices_list_.size() > 1)) {
// Pop from the back of the active indices list

const size_t min_indices_to_keep = reserve_slots > 0 ? 0 : 1;
auto exceeds_limits = [&]() {
return total_size > 0.80 * settings::MAX_MEMORY_GB
|| indices_.size() + reserve_slots > max_active_indices_;
};

while(exceeds_limits() && indices_.size() > min_indices_to_keep) {
bool evicted = false;
size_t candidates_remaining = indices_list_.size();

while(candidates_remaining-- > 0 && indices_.size() > min_indices_to_keep) {
std::string to_evict = indices_list_.back();
indices_list_.pop_back();

auto it = indices_.find(to_evict);
if(it != indices_.end()) {
total_size -= it->second.alg->getApproxSizeGB();

// Only evict if the index is not dirty (hasn't been updated)
if(it->second.updated) {
LOG_WARN(2015, to_evict, "Cannot evict dirty index; it must be saved first");
// Put it back at the front to try other indices
indices_list_.push_front(to_evict);
continue;
}
if(it == indices_.end()) {
continue;
}

LOG_INFO(2016, to_evict, "Evicting clean index from cache");
indices_.erase(it);
// Only evict if the index is not dirty (hasn't been updated)
if(it->second.updated) {
LOG_WARN(2015, to_evict, "Cannot evict dirty index; it must be saved first");
// Put it back at the front to try other indices
indices_list_.push_front(to_evict);
continue;
}

total_size -= it->second.alg ? it->second.alg->getApproxSizeGB() : 0;
// std::cerr << "\033[1m[EVICT] Evicting inactive index from cache: " << to_evict
// << "\033[0m" << std::endl;
LOG_INFO(2016, to_evict, "Evicting clean index from cache");
indices_.erase(it);
evicted = true;
break;
}

if(!evicted) {
LOG_WARN(2047,
"Unable to evict enough indexes to satisfy active-index or memory limits");
break;
}
}
}
Expand Down Expand Up @@ -471,6 +491,7 @@ class IndexManager {
IndexManager(size_t max_indices,
const std::string& data_dir,
const PersistenceConfig& persistence_config = PersistenceConfig{}) :
max_active_indices_(std::max<size_t>(1, max_indices)),
data_dir_(data_dir),
persistence_config_(persistence_config),
backup_store_(data_dir) {
Expand Down Expand Up @@ -599,7 +620,7 @@ class IndexManager {
// Evict if needed (clean indices only)
{
std::unique_lock<std::shared_mutex> temp_lock(indices_mutex_);
evictIfNeeded();
evictIfNeeded(1);
}

hnswlib::SpaceType space_type = hnswlib::getSpaceType(config.space_type_str);
Expand Down Expand Up @@ -687,6 +708,10 @@ class IndexManager {
LOG_INFO(2022, index_id, "Saving newly created index");
// Index is marked as updated so it needs to be saved immediately for crash recovery
saveIndex(index_id);
{
std::unique_lock<std::shared_mutex> lock(indices_mutex_);
evictIfNeeded();
}
return true;
}

Expand Down
56 changes: 17 additions & 39 deletions src/filter/category_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <vector>
#include <stdexcept>
#include <iostream>
#include "mdbx/mdbx.h"
#include "db_backend.hpp"
#include "../utils/log.hpp"
#include "../core/types.hpp"

Expand All @@ -15,6 +15,8 @@ namespace ndd {
private:
MDBX_env* env_;
MDBX_dbi dbi_;
ndd::db::EnvResizeConfig* map_config_;
std::string context_;

static std::string format_filter_key(const std::string& field,
const std::string& value) {
Expand Down Expand Up @@ -75,48 +77,24 @@ namespace ndd {

std::vector<char> buffer(required_size);
bitmap.write(buffer.data(), true);
ndd::db::with_write_txn_retry(env_, *map_config_, context_, [&](MDBX_txn* txn) {
MDBX_val key{const_cast<char*>(filter_key.c_str()), filter_key.size()};
MDBX_val data{const_cast<char*>(buffer.data()), buffer.size()};

MDBX_val key{const_cast<char*>(filter_key.c_str()), filter_key.size()};
MDBX_val data{const_cast<char*>(buffer.data()), buffer.size()};

MDBX_txn* txn;
int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn);
if(rc != MDBX_SUCCESS) {
throw std::runtime_error("Failed to begin write transaction: "
+ std::string(mdbx_strerror(rc)));
}

rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT);
if(rc != MDBX_SUCCESS) {
mdbx_txn_abort(txn);
throw std::runtime_error("Failed to store bitmap: "
+ std::string(mdbx_strerror(rc)));
}

rc = mdbx_txn_commit(txn);
if(rc != MDBX_SUCCESS) {
throw std::runtime_error("Failed to commit transaction: "
+ std::string(mdbx_strerror(rc)));
}
const int rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT);
ndd::db::throw_if_error(rc, "Failed to store category bitmap");
});
}

public:
CategoryIndex(MDBX_env* env) :
env_(env) {
MDBX_txn* txn;
int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn);
if(rc != MDBX_SUCCESS) {
throw std::runtime_error("Failed to begin txn for CategoryIndex init");
}

// Open named DB for category/boolean
rc = mdbx_dbi_open(txn, "category_idx", MDBX_CREATE, &dbi_);
if(rc != MDBX_SUCCESS) {
mdbx_txn_abort(txn);
throw std::runtime_error("Failed to open category_idx dbi");
}

mdbx_txn_commit(txn);
CategoryIndex(MDBX_env* env, ndd::db::EnvResizeConfig* map_config, const std::string& context) :
env_(env),
map_config_(map_config),
context_(context) {
ndd::db::with_write_txn_retry(env_, *map_config_, context_, [&](MDBX_txn* txn) {
const int rc = mdbx_dbi_open(txn, "category_idx", MDBX_CREATE, &dbi_);
ndd::db::throw_if_error(rc, "Failed to open category_idx dbi");
});
}

// Faceting: List all unique values for a field
Expand Down
Loading