Skip to content

Commit

Permalink
Adding usage of must_obey_client in wrapper to improve performace
Browse files Browse the repository at this point in the history
Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Feb 18, 2025
1 parent 8077d58 commit efc2ac5
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 15 deletions.
14 changes: 12 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ jobs:
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
run: |
if [ "${{ matrix.server_version }}" = "8.0.0" ]; then
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release --features valkey_8_0
else
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
fi
- name: Run unit tests
run: cargo test --features enable-system-alloc
- name: Make valkey-server binary
Expand Down Expand Up @@ -77,7 +82,12 @@ jobs:
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
run: |
if [ "${{ matrix.server_version }}" = "8.0.0" ]; then
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release --features valkey_8_0
else
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
fi
- name: Run unit tests
run: cargo test --features enable-system-alloc
- name: Make Valkey-server binary with asan
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ homepage = "https://github.com/valkey-io/valkey-bloom"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
valkey-module = { version = "0.1.3", features = ["min-valkey-compatibility-version-8-0", "min-redis-compatibility-version-7-2"]}
valkey-module = { version = "0.1.4", features = ["min-valkey-compatibility-version-8-0", "min-redis-compatibility-version-7-2"]}
valkey-module-macros = "0"
linkme = "0"
bloomfilter = { version = "3.0.1", features = ["serde"] }
Expand All @@ -38,3 +38,4 @@ debug-assertions = true
default = ["min-valkey-compatibility-version-8-0"]
enable-system-alloc = ["valkey-module/enable-system-alloc"]
min-valkey-compatibility-version-8-0 = []
valkey_8_0 = [] # Empty feature flag for Valkey 8.0
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Valkey-Bloom (BSD-3-Clause) is a Rust Valkey-Module which brings a native and space efficient probabilistic Module data type to Valkey. With this, users can create filters (space-efficient probabilistic Module data type) to add elements, perform “check” operation to test whether an element exists, auto scale their filters, perform RDB Save and load operations, etc.

Valkey-Bloom is built using bloomfilter::Bloom (https://crates.io/crates/bloomfilter which has a BSD-2-Clause license).
Valkey-Bloom is built using `bloomfilter::Bloom` (https://crates.io/crates/bloomfilter which has a BSD-2-Clause license).

It is compatible with the BloomFilter (BF.*) command APIs in Redis offerings.

Expand Down
10 changes: 8 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ echo "Running cargo and clippy format checks..."
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all

echo "Running cargo build release..."
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release

echo "Running unit tests..."
cargo test --features enable-system-alloc
Expand All @@ -29,6 +27,14 @@ if [ "$SERVER_VERSION" != "unstable" ] && [ "$SERVER_VERSION" != "8.0.0" ] ; the
exit 1
fi

echo "Running cargo build release..."
if [ "$SERVER_VERSION" == "8.0.0" ] ; then
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release --features valkey_8_0
else
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
fi


REPO_URL="https://github.com/valkey-io/valkey.git"
BINARY_PATH="tests/.build/binaries/$SERVER_VERSION/valkey-server"

Expand Down
11 changes: 5 additions & 6 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, BLOOM_TIGHTENING_RATIO_MAX, BLOOM_TIGHTENING_RATIO_MIN,
};
use crate::wrapper::must_obey_client;
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
use valkey_module::NotifyEvent;
use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK};

/// Helper function used to add items to a bloom object. It handles both multi item and single item add operations.
/// It is used by any command that allows adding of items: BF.ADD, BF.MADD, and BF.INSERT.
/// Returns the result of the item add operation on success as a ValkeyValue and a ValkeyError on failure.
Expand Down Expand Up @@ -177,7 +176,7 @@ pub fn bloom_filter_add_value(
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let mut add_succeeded = false;
match value {
Some(bloom) => {
Expand Down Expand Up @@ -404,7 +403,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
false => (Some(configs::FIXED_SEED), false),
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Unable to get a lock on tightening ratio static");
Expand Down Expand Up @@ -615,7 +614,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let mut add_succeeded = false;
match value {
Some(bloom) => {
Expand Down Expand Up @@ -811,7 +810,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
None => {
// if filter not exists, create it.
let hex = value.to_vec();
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let bloom = match BloomObject::decode_object(&hex, validate_size_limit) {
Ok(v) => v,
Err(err) => {
Expand Down
33 changes: 33 additions & 0 deletions src/wrapper/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,35 @@
use valkey_module::{Context, ContextFlags};

pub mod bloom_callback;
pub mod defrag;

/// Wrapper for the ValkeyModule_MustObeyClient function.
/// Takes in an Context and returns true if the if commands are arriving
/// from the primary client or AOF client and should never be rejected.
/// False otherwise.
pub fn must_obey_client(ctx: &Context) -> bool {
// If we are using valkey 8.0 then we cannot use ValkeyModule_MustObeyClient so must go back to the default
// of checking for the replicated flag
#[cfg(not(feature = "valkey_8_0"))]
{
let ctx_raw = ctx.get_raw() as *mut valkey_module::ValkeyModuleCtx;

match unsafe { valkey_module::raw::ValkeyModule_MustObeyClient } {
Some(func) => {
let status = unsafe { func(ctx_raw) as isize };
match status {
1 => true,
0 => false,
_ => panic!("We do not expect ValkeyModule_MustObeyClient to return anything other than 1 or 0."),
}
}
// Fallback to checking for replicated flag in the GetContextFlags API as a best effort.
None => ctx.get_flags().contains(ContextFlags::REPLICATED),
}
}

#[cfg(feature = "valkey_8_0")]
{
ctx.get_flags().contains(ContextFlags::REPLICATED)
}
}
6 changes: 3 additions & 3 deletions tests/test_bloom_correctness.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def test_scaling_filter(self):

def test_max_and_validate_scale_to_correctness(self):
validate_scale_to_commands = [
('BF.INSERT key ERROR 0.00000001 VALIDATESCALETO 13107101', "provided VALIDATESCALETO causes bloom object to exceed memory limit" ),
('BF.INSERT MemLimitKey EXPANSION 25 ERROR 0.00000000000000001 VALIDATESCALETO 1627601', "provided VALIDATESCALETO causes bloom object to exceed memory limit" ),
('BF.INSERT key EXPANSION 1 VALIDATESCALETO 101601', "provided VALIDATESCALETO causes false positive to degrade to 0" )
]
for cmd in validate_scale_to_commands:
Expand All @@ -144,12 +144,12 @@ def test_max_and_validate_scale_to_correctness(self):
assert False, "Expect BF.INSERT to fail if the wanted capacity would cause an error"
except Exception as e:
assert cmd[1] == str(e), f"Unexpected error message: {e}"
self.client.execute_command('BF.INSERT MemLimitKey ERROR 0.00000001 VALIDATESCALETO 13107100')
self.client.execute_command('BF.INSERT MemLimitKey EXPANSION 25 ERROR 0.00000000000000001 VALIDATESCALETO 1627600')
self.client.execute_command('BF.INSERT FPKey VALIDATESCALETO 101600 EXPANSION 1')
FPKey_max_capacity = self.client.execute_command(f'BF.INFO FPKey MAXSCALEDCAPACITY')
MemLimitKeyMaxCapacity = self.client.execute_command(f'BF.INFO MemLimitKey MAXSCALEDCAPACITY')
self.add_items_till_capacity(self.client, "FPKey", 101600, 1, "item")
self.add_items_till_capacity(self.client, "MemLimitKey", 13107100, 1, "item")
self.add_items_till_capacity(self.client, "MemLimitKey", 1627600, 1, "item")
key_names = [("MemLimitKey", MemLimitKeyMaxCapacity, "operation exceeds bloom object memory limit"), ("FPKey", FPKey_max_capacity, "false positive degrades to 0 on scale out")]
for key in key_names:
try:
Expand Down

0 comments on commit efc2ac5

Please sign in to comment.