Skip to content

Commit e2bddfb

Browse files
committed
update
1 parent dadc8ce commit e2bddfb

File tree

1 file changed

+10
-5
lines changed
  • src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter

1 file changed

+10
-5
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use databend_common_expression::RawExpr;
3333
use databend_common_expression::Scalar;
3434
use databend_common_functions::BUILTIN_FUNCTIONS;
3535
use fastbloom::BloomFilter;
36+
use fastbloom::DefaultHasher;
3637

3738
use super::builder::should_enable_runtime_filter;
3839
use super::packet::JoinRuntimeFilterPacket;
@@ -252,9 +253,12 @@ async fn build_bloom_filter(
252253
let probe_key = probe_key.as_column_ref().unwrap();
253254
let column_name = probe_key.id.to_string();
254255
let total_items = bloom.len();
256+
let hasher = DefaultHasher::default();
255257

256258
if total_items < 50000 {
257-
let filter = BloomFilter::with_false_pos(0.01).items(bloom);
259+
let filter = BloomFilter::with_false_pos(0.01)
260+
.hasher(hasher)
261+
.items(bloom);
258262
return Ok(RuntimeFilterBloom {
259263
column_name,
260264
filter,
@@ -271,11 +275,12 @@ async fn build_bloom_filter(
271275
let tasks: Vec<_> = chunks
272276
.into_iter()
273277
.map(|chunk| {
278+
let hasher = hasher.clone();
274279
databend_common_base::runtime::spawn(async move {
275-
let mut filter = BloomFilter::with_false_pos(0.01).expected_items(total_items);
276-
for hash in chunk {
277-
filter.insert_hash(hash);
278-
}
280+
let mut filter = BloomFilter::with_false_pos(0.01)
281+
.hasher(hasher)
282+
.expected_items(total_items);
283+
filter.extend(chunk);
279284
Ok::<BloomFilter, ErrorCode>(filter)
280285
})
281286
})

0 commit comments

Comments
 (0)