Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding usage of must_obey_client in wrapper to improve performace #45

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ jobs:
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
run: |
if [ "${{ matrix.server_version }}" = "8.0.0" ]; then
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release --features valkey_8_0
else
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
fi
- name: Run unit tests
run: cargo test --features enable-system-alloc
- name: Make valkey-server binary
Expand Down Expand Up @@ -77,7 +82,12 @@ jobs:
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all
- name: Release Build
run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
run: |
if [ "${{ matrix.server_version }}" = "8.0.0" ]; then
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release --features valkey_8_0
else
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
fi
- name: Run unit tests
run: cargo test --features enable-system-alloc
- name: Make Valkey-server binary with asan
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ homepage = "https://github.com/valkey-io/valkey-bloom"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
valkey-module = { version = "0.1.3", features = ["min-valkey-compatibility-version-8-0", "min-redis-compatibility-version-7-2"]}
valkey-module = { version = "0.1.4", features = ["min-valkey-compatibility-version-8-0", "min-redis-compatibility-version-7-2"]}
valkey-module-macros = "0"
linkme = "0"
bloomfilter = { version = "3.0.1", features = ["serde"] }
Expand All @@ -38,3 +38,4 @@ debug-assertions = true
default = ["min-valkey-compatibility-version-8-0"]
enable-system-alloc = ["valkey-module/enable-system-alloc"]
min-valkey-compatibility-version-8-0 = []
valkey_8_0 = [] # Empty feature flag for Valkey 8.0
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Valkey-Bloom (BSD-3-Clause) is a Rust Valkey-Module which brings a native and space efficient probabilistic Module data type to Valkey. With this, users can create filters (space-efficient probabilistic Module data type) to add elements, perform “check” operation to test whether an element exists, auto scale their filters, perform RDB Save and load operations, etc.

Valkey-Bloom is built using bloomfilter::Bloom (https://crates.io/crates/bloomfilter which has a BSD-2-Clause license).
Valkey-Bloom is built using `bloomfilter::Bloom` (https://crates.io/crates/bloomfilter which has a BSD-2-Clause license).

It is compatible with the BloomFilter (BF.*) command APIs in Redis offerings.

Expand Down
10 changes: 8 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ echo "Running cargo and clippy format checks..."
cargo fmt --check
cargo clippy --profile release --all-targets -- -D clippy::all

echo "Running cargo build release..."
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release

echo "Running unit tests..."
cargo test --features enable-system-alloc
Expand All @@ -29,6 +27,14 @@ if [ "$SERVER_VERSION" != "unstable" ] && [ "$SERVER_VERSION" != "8.0.0" ] ; the
exit 1
fi

echo "Running cargo build release..."
if [ "$SERVER_VERSION" == "8.0.0" ] ; then
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release --features valkey_8_0
else
RUSTFLAGS="-D warnings" cargo build --all --all-targets --release
fi


REPO_URL="https://github.com/valkey-io/valkey.git"
BINARY_PATH="tests/.build/binaries/$SERVER_VERSION/valkey-server"

Expand Down
11 changes: 5 additions & 6 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, BLOOM_TIGHTENING_RATIO_MAX, BLOOM_TIGHTENING_RATIO_MIN,
};
use crate::wrapper::must_obey_client;
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
use valkey_module::NotifyEvent;
use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK};

/// Helper function used to add items to a bloom object. It handles both multi item and single item add operations.
/// It is used by any command that allows adding of items: BF.ADD, BF.MADD, and BF.INSERT.
/// Returns the result of the item add operation on success as a ValkeyValue and a ValkeyError on failure.
Expand Down Expand Up @@ -177,7 +176,7 @@ pub fn bloom_filter_add_value(
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let mut add_succeeded = false;
match value {
Some(bloom) => {
Expand Down Expand Up @@ -404,7 +403,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
false => (Some(configs::FIXED_SEED), false),
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Unable to get a lock on tightening ratio static");
Expand Down Expand Up @@ -615,7 +614,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let mut add_succeeded = false;
match value {
Some(bloom) => {
Expand Down Expand Up @@ -811,7 +810,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
None => {
// if filter not exists, create it.
let hex = value.to_vec();
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let bloom = match BloomObject::decode_object(&hex, validate_size_limit) {
Ok(v) => v,
Err(err) => {
Expand Down
33 changes: 33 additions & 0 deletions src/wrapper/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,35 @@
use valkey_module::{Context, ContextFlags};

pub mod bloom_callback;
pub mod defrag;

/// Wrapper for the ValkeyModule_MustObeyClient function.
/// Takes in an Context and returns true if the if commands are arriving
/// from the primary client or AOF client and should never be rejected.
/// False otherwise.
pub fn must_obey_client(ctx: &Context) -> bool {
// If we are using valkey 8.0 then we cannot use ValkeyModule_MustObeyClient so must go back to the default
// of checking for the replicated flag
#[cfg(not(feature = "valkey_8_0"))]
{
let ctx_raw = ctx.get_raw() as *mut valkey_module::ValkeyModuleCtx;

match unsafe { valkey_module::raw::ValkeyModule_MustObeyClient } {
Some(func) => {
let status = unsafe { func(ctx_raw) as isize };
match status {
1 => true,
0 => false,
_ => panic!("We do not expect ValkeyModule_MustObeyClient to return anything other than 1 or 0."),
}
}
// Fallback to checking for replicated flag in the GetContextFlags API as a best effort.
None => ctx.get_flags().contains(ContextFlags::REPLICATED),
}
}

#[cfg(feature = "valkey_8_0")]
{
ctx.get_flags().contains(ContextFlags::REPLICATED)
}
}
6 changes: 3 additions & 3 deletions tests/test_bloom_correctness.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def test_scaling_filter(self):

def test_max_and_validate_scale_to_correctness(self):
validate_scale_to_commands = [
('BF.INSERT key ERROR 0.00000001 VALIDATESCALETO 13107101', "provided VALIDATESCALETO causes bloom object to exceed memory limit" ),
('BF.INSERT MemLimitKey EXPANSION 25 ERROR 0.00000000000000001 VALIDATESCALETO 1627601', "provided VALIDATESCALETO causes bloom object to exceed memory limit" ),
('BF.INSERT key EXPANSION 1 VALIDATESCALETO 101601', "provided VALIDATESCALETO causes false positive to degrade to 0" )
]
for cmd in validate_scale_to_commands:
Expand All @@ -144,12 +144,12 @@ def test_max_and_validate_scale_to_correctness(self):
assert False, "Expect BF.INSERT to fail if the wanted capacity would cause an error"
except Exception as e:
assert cmd[1] == str(e), f"Unexpected error message: {e}"
self.client.execute_command('BF.INSERT MemLimitKey ERROR 0.00000001 VALIDATESCALETO 13107100')
self.client.execute_command('BF.INSERT MemLimitKey EXPANSION 25 ERROR 0.00000000000000001 VALIDATESCALETO 1627600')
self.client.execute_command('BF.INSERT FPKey VALIDATESCALETO 101600 EXPANSION 1')
FPKey_max_capacity = self.client.execute_command(f'BF.INFO FPKey MAXSCALEDCAPACITY')
MemLimitKeyMaxCapacity = self.client.execute_command(f'BF.INFO MemLimitKey MAXSCALEDCAPACITY')
self.add_items_till_capacity(self.client, "FPKey", 101600, 1, "item")
self.add_items_till_capacity(self.client, "MemLimitKey", 13107100, 1, "item")
self.add_items_till_capacity(self.client, "MemLimitKey", 1627600, 1, "item")
key_names = [("MemLimitKey", MemLimitKeyMaxCapacity, "operation exceeds bloom object memory limit"), ("FPKey", FPKey_max_capacity, "false positive degrades to 0 on scale out")]
for key in key_names:
try:
Expand Down
Loading