Skip to content

Commit

Permalink
Adding usage of must_obey_client in wrapper to improve performace
Browse files Browse the repository at this point in the history
Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Feb 17, 2025
1 parent 8077d58 commit ce344e6
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
11 changes: 5 additions & 6 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, BLOOM_TIGHTENING_RATIO_MAX, BLOOM_TIGHTENING_RATIO_MIN,
};
use crate::wrapper::must_obey_client;
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
use valkey_module::NotifyEvent;
use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK};

/// Helper function used to add items to a bloom object. It handles both multi item and single item add operations.
/// It is used by any command that allows adding of items: BF.ADD, BF.MADD, and BF.INSERT.
/// Returns the result of the item add operation on success as a ValkeyValue and a ValkeyError on failure.
Expand Down Expand Up @@ -177,7 +176,7 @@ pub fn bloom_filter_add_value(
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let mut add_succeeded = false;
match value {
Some(bloom) => {
Expand Down Expand Up @@ -404,7 +403,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
false => (Some(configs::FIXED_SEED), false),
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Unable to get a lock on tightening ratio static");
Expand Down Expand Up @@ -615,7 +614,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let mut add_succeeded = false;
match value {
Some(bloom) => {
Expand Down Expand Up @@ -811,7 +810,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
None => {
// if filter not exists, create it.
let hex = value.to_vec();
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let validate_size_limit = !must_obey_client(ctx);
let bloom = match BloomObject::decode_object(&hex, validate_size_limit) {
Ok(v) => v,
Err(err) => {
Expand Down
22 changes: 22 additions & 0 deletions src/wrapper/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,24 @@
use valkey_module::{raw, Context, ContextFlags, ValkeyModuleCtx};

pub mod bloom_callback;
pub mod defrag;

/// Wrapper for the ValkeyModule_MustObeyClient function.
/// Takes in an Context and returns true if the if commands are arriving
/// from the primary client or AOF client and should never be rejected.
/// False otherwise.
pub fn must_obey_client(ctx: &Context) -> bool {
let ctx_raw = ctx.get_raw() as *mut ValkeyModuleCtx;
match unsafe { raw::ValkeyModule_MustObeyClient } {
Some(func) => {
let status = unsafe { func(ctx_raw) as isize };
match status {
1 => true,
0 => false,
_ => panic!("We do not expect ValkeyModule_MustObeyClient to return anything other than 1 or 0."),
}
}
// Fallback to checking for replicated flag in the GetContextFlags API as a best effort.
None => ctx.get_flags().contains(ContextFlags::REPLICATED),
}
}
6 changes: 3 additions & 3 deletions tests/test_bloom_correctness.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def test_scaling_filter(self):

def test_max_and_validate_scale_to_correctness(self):
validate_scale_to_commands = [
('BF.INSERT key ERROR 0.00000001 VALIDATESCALETO 13107101', "provided VALIDATESCALETO causes bloom object to exceed memory limit" ),
('BF.INSERT MemLimitKey EXPANSION 25 ERROR 0.00000000000000001 VALIDATESCALETO 1627601', "provided VALIDATESCALETO causes bloom object to exceed memory limit" ),
('BF.INSERT key EXPANSION 1 VALIDATESCALETO 101601', "provided VALIDATESCALETO causes false positive to degrade to 0" )
]
for cmd in validate_scale_to_commands:
Expand All @@ -144,12 +144,12 @@ def test_max_and_validate_scale_to_correctness(self):
assert False, "Expect BF.INSERT to fail if the wanted capacity would cause an error"
except Exception as e:
assert cmd[1] == str(e), f"Unexpected error message: {e}"
self.client.execute_command('BF.INSERT MemLimitKey ERROR 0.00000001 VALIDATESCALETO 13107100')
self.client.execute_command('BF.INSERT MemLimitKey EXPANSION 25 ERROR 0.00000000000000001 VALIDATESCALETO 1627600')
self.client.execute_command('BF.INSERT FPKey VALIDATESCALETO 101600 EXPANSION 1')
FPKey_max_capacity = self.client.execute_command(f'BF.INFO FPKey MAXSCALEDCAPACITY')
MemLimitKeyMaxCapacity = self.client.execute_command(f'BF.INFO MemLimitKey MAXSCALEDCAPACITY')
self.add_items_till_capacity(self.client, "FPKey", 101600, 1, "item")
self.add_items_till_capacity(self.client, "MemLimitKey", 13107100, 1, "item")
self.add_items_till_capacity(self.client, "MemLimitKey", 1627600, 1, "item")
key_names = [("MemLimitKey", MemLimitKeyMaxCapacity, "operation exceeds bloom object memory limit"), ("FPKey", FPKey_max_capacity, "false positive degrades to 0 on scale out")]
for key in key_names:
try:
Expand Down

0 comments on commit ce344e6

Please sign in to comment.