Skip to content

Commit d17575b

Browse files
committed
Add composite aggregation
1 parent b2f99c6 commit d17575b

File tree

20 files changed

+4663
-15
lines changed

20 files changed

+4663
-15
lines changed

benches/agg_bench.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use binggan::plugins::PeakMemAllocPlugin;
22
use binggan::{black_box, InputGroup, PeakMemAlloc, INSTRUMENTED_SYSTEM};
3+
use common::DateTime;
34
use rand::distributions::WeightedIndex;
45
use rand::prelude::SliceRandom;
56
use rand::rngs::StdRng;
@@ -70,6 +71,12 @@ fn bench_agg(mut group: InputGroup<Index>) {
7071

7172
register!(group, terms_many_json_mixed_type_with_avg_sub_agg);
7273

74+
register!(group, composite_term_many_page_1000);
75+
register!(group, composite_term_many_page_1000_with_avg_sub_agg);
76+
register!(group, composite_term_few);
77+
register!(group, composite_histogram);
78+
register!(group, composite_histogram_calendar);
79+
7380
register!(group, cardinality_agg);
7481
register!(group, terms_few_with_cardinality_agg);
7582

@@ -310,6 +317,75 @@ fn terms_many_json_mixed_type_with_avg_sub_agg(index: &Index) {
310317
});
311318
execute_agg(index, agg_req);
312319
}
320+
fn composite_term_few(index: &Index) {
321+
let agg_req = json!({
322+
"my_ctf": {
323+
"composite": {
324+
"sources": [
325+
{ "text_few_terms": { "terms": { "field": "text_few_terms" } } }
326+
],
327+
"size": 1000
328+
}
329+
},
330+
});
331+
execute_agg(index, agg_req);
332+
}
333+
fn composite_term_many_page_1000(index: &Index) {
334+
let agg_req = json!({
335+
"my_ctmp1000": {
336+
"composite": {
337+
"sources": [
338+
{ "text_many_terms": { "terms": { "field": "text_many_terms" } } }
339+
],
340+
"size": 1000
341+
}
342+
},
343+
});
344+
execute_agg(index, agg_req);
345+
}
346+
fn composite_term_many_page_1000_with_avg_sub_agg(index: &Index) {
347+
let agg_req = json!({
348+
"my_ctmp1000wasa": {
349+
"composite": {
350+
"sources": [
351+
{ "text_many_terms": { "terms": { "field": "text_many_terms" } } }
352+
],
353+
"size": 1000,
354+
355+
},
356+
"aggs": {
357+
"average_f64": { "avg": { "field": "score_f64" } }
358+
}
359+
},
360+
});
361+
execute_agg(index, agg_req);
362+
}
363+
fn composite_histogram(index: &Index) {
364+
let agg_req = json!({
365+
"my_ch": {
366+
"composite": {
367+
"sources": [
368+
{ "f64_histogram": { "histogram": { "field": "score_f64", "interval": 1 } } }
369+
],
370+
"size": 1000
371+
}
372+
},
373+
});
374+
execute_agg(index, agg_req);
375+
}
376+
fn composite_histogram_calendar(index: &Index) {
377+
let agg_req = json!({
378+
"my_chc": {
379+
"composite": {
380+
"sources": [
381+
{ "time_histogram": { "date_histogram": { "field": "timestamp", "calendar_interval": "month" } } }
382+
],
383+
"size": 1000
384+
}
385+
},
386+
});
387+
execute_agg(index, agg_req);
388+
}
313389

314390
fn execute_agg(index: &Index, agg_req: serde_json::Value) {
315391
let agg_req: Aggregations = serde_json::from_value(agg_req).unwrap();
@@ -494,6 +570,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
494570
let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone());
495571
let score_field_f64 = schema_builder.add_f64_field("score_f64", score_fieldtype.clone());
496572
let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype);
573+
let date_field = schema_builder.add_date_field("timestamp", FAST);
497574
let index = Index::create_from_tempdir(schema_builder.build())?;
498575
let few_terms_data = ["INFO", "ERROR", "WARN", "DEBUG"];
499576
// Approximate production log proportions: INFO dominant, WARN and DEBUG occasional, ERROR rare.
@@ -559,6 +636,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
559636
score_field => val as u64,
560637
score_field_f64 => lg_norm.sample(&mut rng),
561638
score_field_i64 => val as i64,
639+
date_field => DateTime::from_timestamp_millis((val * 1_000_000.) as i64),
562640
))?;
563641
if cardinality == Cardinality::OptionalSparse {
564642
for _ in 0..20 {

columnar/src/column_values/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub use u64_based::{
3131
serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
3232
};
3333
pub use u128_based::{
34-
CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped,
34+
CompactHit, CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped,
3535
serialize_column_values_u128,
3636
};
3737
pub use vec_column::VecColumn;

columnar/src/column_values/u128_based/compact_space/mod.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,19 @@ impl BinarySerializable for IPCodecParams {
292292
}
293293
}
294294

295+
/// Represents the result of looking up a u128 value in the compact space.
296+
///
297+
/// If a value is outside the compact space, the next compact value is returned.
298+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
299+
pub enum CompactHit {
300+
/// The value exists in the compact space
301+
Exact(u32),
302+
/// The value does not exist in the compact space, but the next higher value does
303+
Next(u32),
304+
/// The value is greater than the maximum compact value
305+
AfterLast,
306+
}
307+
295308
/// Exposes the compact space compressed values as u64.
296309
///
297310
/// This allows faster access to the values, as u64 is faster to work with than u128.
@@ -309,6 +322,11 @@ impl CompactSpaceU64Accessor {
309322
pub fn compact_to_u128(&self, compact: u32) -> u128 {
310323
self.0.compact_to_u128(compact)
311324
}
325+
326+
/// Finds the next compact space value for a given u128 value.
327+
pub fn u128_to_next_compact(&self, value: u128) -> CompactHit {
328+
self.0.u128_to_next_compact(value)
329+
}
312330
}
313331

314332
impl ColumnValues<u64> for CompactSpaceU64Accessor {
@@ -430,6 +448,26 @@ impl CompactSpaceDecompressor {
430448
Ok(decompressor)
431449
}
432450

451+
/// Finds the next compact space value for a given u128 value
452+
pub fn u128_to_next_compact(&self, value: u128) -> CompactHit {
453+
// Try to convert to compact space
454+
match self.u128_to_compact(value) {
455+
// Value is in compact space, return its compact representation
456+
Ok(compact) => CompactHit::Exact(compact),
457+
// Value is not in compact space
458+
Err(pos) => {
459+
if pos >= self.params.compact_space.ranges_mapping.len() {
460+
// Value is beyond all ranges, no next value exists
461+
CompactHit::AfterLast
462+
} else {
463+
// Get the next range and return its start compact value
464+
let next_range = &self.params.compact_space.ranges_mapping[pos];
465+
CompactHit::Next(next_range.compact_start)
466+
}
467+
}
468+
}
469+
}
470+
433471
/// Converting to compact space for the decompressor is more complex, since we may get values
434472
/// which are outside the compact space. e.g. if we map
435473
/// 1000 => 5
@@ -823,6 +861,41 @@ mod tests {
823861
let _data = test_aux_vals(vals);
824862
}
825863

864+
#[test]
865+
fn test_u128_to_next_compact() {
866+
let vals = &[100u128, 200u128, 1_000_000_000u128, 1_000_000_100u128];
867+
let mut data = test_aux_vals(vals);
868+
869+
let _header = U128Header::deserialize(&mut data);
870+
let decomp = CompactSpaceDecompressor::open(data).unwrap();
871+
872+
// Test value that's already in a range
873+
let compact_100 = decomp.u128_to_compact(100).unwrap();
874+
assert_eq!(
875+
decomp.u128_to_next_compact(100),
876+
CompactHit::Exact(compact_100)
877+
);
878+
879+
// Test value between two ranges
880+
let compact_million = decomp.u128_to_compact(1_000_000_000).unwrap();
881+
assert_eq!(
882+
decomp.u128_to_next_compact(250),
883+
CompactHit::Next(compact_million)
884+
);
885+
886+
// Test value before the first range
887+
assert_eq!(
888+
decomp.u128_to_next_compact(50),
889+
CompactHit::Next(compact_100)
890+
);
891+
892+
// Test value after the last range
893+
assert_eq!(
894+
decomp.u128_to_next_compact(10_000_000_000),
895+
CompactHit::AfterLast
896+
);
897+
}
898+
826899
use proptest::prelude::*;
827900

828901
fn num_strategy() -> impl Strategy<Value = u128> {

columnar/src/column_values/u128_based/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ mod compact_space;
77

88
use common::{BinarySerializable, OwnedBytes, VInt};
99
pub use compact_space::{
10-
CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
10+
CompactHit, CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor,
1111
};
1212

1313
use crate::column_values::monotonic_map_column;

columnar/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct RowAddr {
5959
pub row_id: RowId,
6060
}
6161

62-
pub use sstable::Dictionary;
62+
pub use sstable::{Dictionary, TermOrdHit};
6363
pub type Streamer<'a> = sstable::Streamer<'a, VoidSSTable>;
6464

6565
pub use common::DateTime;

src/aggregation/accessor_helpers.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,21 @@ pub(crate) fn get_all_ff_reader_or_empty(
9595
allowed_column_types: Option<&[ColumnType]>,
9696
fallback_type: ColumnType,
9797
) -> crate::Result<Vec<(columnar::Column<u64>, ColumnType)>> {
98-
let ff_fields = reader.fast_fields();
99-
let mut ff_field_with_type =
100-
ff_fields.u64_lenient_for_type_all(allowed_column_types, field_name)?;
98+
let mut ff_field_with_type = get_all_ff_readers(reader, field_name, allowed_column_types)?;
10199
if ff_field_with_type.is_empty() {
102100
ff_field_with_type.push((Column::build_empty_column(reader.num_docs()), fallback_type));
103101
}
104102
Ok(ff_field_with_type)
105103
}
104+
105+
/// Get all fast field reader.
106+
pub(crate) fn get_all_ff_readers(
107+
reader: &SegmentReader,
108+
field_name: &str,
109+
allowed_column_types: Option<&[ColumnType]>,
110+
) -> crate::Result<Vec<(columnar::Column<u64>, ColumnType)>> {
111+
let ff_fields = reader.fast_fields();
112+
let ff_field_with_type =
113+
ff_fields.u64_lenient_for_type_all(allowed_column_types, field_name)?;
114+
Ok(ff_field_with_type)
115+
}

0 commit comments

Comments
 (0)