Skip to content

Commit b3104a0

Browse files
authored
feat(query): BackpressureSpiller has been changed to a fully synchronous call (#18899)
1 parent 23a1ff9 commit b3104a0

File tree

30 files changed

+888
-688
lines changed

30 files changed

+888
-688
lines changed

src/common/base/src/base/dma.rs

Lines changed: 169 additions & 147 deletions
Large diffs are not rendered by default.

src/query/expression/src/converts/arrow/from.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -243,10 +243,7 @@ impl TryFrom<&Schema> for TableSchema {
243243
}
244244

245245
impl DataBlock {
246-
pub fn from_record_batch(
247-
schema: &DataSchema,
248-
batch: &RecordBatch,
249-
) -> Result<(Self, DataSchema)> {
246+
pub fn from_record_batch(schema: &DataSchema, batch: &RecordBatch) -> Result<Self> {
250247
assert_eq!(
251248
schema.num_fields(),
252249
batch.num_columns(),
@@ -264,15 +261,15 @@ impl DataBlock {
264261
}
265262

266263
if batch.num_columns() == 0 {
267-
return Ok((DataBlock::new(vec![], batch.num_rows()), schema.clone()));
264+
return Ok(DataBlock::new(vec![], batch.num_rows()));
268265
}
269266

270267
let mut columns = Vec::with_capacity(batch.columns().len());
271268
for (array, field) in batch.columns().iter().zip(schema.fields()) {
272269
columns.push(Column::from_arrow_rs(array.clone(), field.data_type())?)
273270
}
274271

275-
Ok((DataBlock::new_from_columns(columns), schema.clone()))
272+
Ok(DataBlock::new_from_columns(columns))
276273
}
277274
}
278275

src/query/expression/src/utils/udf_client.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -344,14 +344,13 @@ impl UDFFlightClient {
344344

345345
let result_batch = result_batch?;
346346
let schema = DataSchema::try_from(&(*result_batch.schema()))?;
347-
let (result_block, result_schema) = DataBlock::from_record_batch(&schema, &result_batch)
348-
.map_err(|err| {
349-
ErrorCode::UDFDataError(format!(
350-
"Cannot convert arrow record batch to data block: {err}"
351-
))
352-
})?;
347+
let result_block = DataBlock::from_record_batch(&schema, &result_batch).map_err(|err| {
348+
ErrorCode::UDFDataError(format!(
349+
"Cannot convert arrow record batch to data block: {err}"
350+
))
351+
})?;
353352

354-
let result_fields = result_schema.fields();
353+
let result_fields = schema.fields();
355354
if result_fields.is_empty() || result_block.is_empty() {
356355
return Err(ErrorCode::EmptyDataFromServer(
357356
"Get empty data from UDF Server",

src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {
202202
partitions.push(GraceJoinPartition::create(&location_prefix)?);
203203
}
204204

205-
let ctx: Arc<dyn TableContext> = ctx.clone();
206205
Ok(GraceHashJoin {
207206
desc,
208207
state,
@@ -213,7 +212,7 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {
213212
function_context: function_ctx,
214213
stage: RestoreStage::FlushMemory,
215214
partitions,
216-
read_settings: ReadSettings::from_ctx(&ctx)?,
215+
read_settings: ReadSettings::from_settings(&ctx.get_settings())?,
217216
build_partition_stream: BlockPartitionStream::create(rows, bytes, 16),
218217
probe_partition_stream: BlockPartitionStream::create(rows, bytes, 16),
219218
})

src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -559,13 +559,12 @@ impl TransformUdfScript {
559559
))
560560
})?;
561561

562-
let (result_block, _) =
563-
DataBlock::from_record_batch(&schema, &result_batch).map_err(|err| {
564-
ErrorCode::UDFDataError(format!(
565-
"Failed to create data block from record batch for function '{}': {}",
566-
func.name, err
567-
))
568-
})?;
562+
let result_block = DataBlock::from_record_batch(&schema, &result_batch).map_err(|err| {
563+
ErrorCode::UDFDataError(format!(
564+
"Failed to create data block from record batch for function '{}': {}",
565+
func.name, err
566+
))
567+
})?;
569568

570569
let entry = if contains_variant(&func.data_type) {
571570
let value = transform_variant(&result_block.get_by_offset(0).value(), false).map_err(

src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use std::any::Any;
2020
use std::collections::VecDeque;
2121
use std::sync::Arc;
2222

23-
use databend_common_base::runtime::GlobalIORuntime;
2423
use databend_common_exception::Result;
2524
use databend_common_expression::BlockMetaInfoDowncast;
2625
use databend_common_expression::DataBlock;
@@ -100,38 +99,49 @@ impl WindowBuffer {
10099
}
101100
}
102101

103-
async fn spill(&mut self) -> Result<()> {
102+
fn spill(&mut self) -> Result<()> {
103+
match self {
104+
WindowBuffer::V1(_) => unreachable!(),
105+
WindowBuffer::V2(inner) => inner.spill(),
106+
}
107+
}
108+
109+
async fn async_spill(&mut self) -> Result<()> {
104110
match self {
105111
WindowBuffer::V1(inner) => inner.spill().await,
106-
WindowBuffer::V2(inner) => inner.spill().await,
112+
WindowBuffer::V2(_) => unreachable!(),
107113
}
108114
}
109115

110-
async fn restore(&mut self) -> Result<Vec<DataBlock>> {
116+
fn restore(&mut self) -> Result<Vec<DataBlock>> {
117+
match self {
118+
WindowBuffer::V1(_) => unreachable!(),
119+
WindowBuffer::V2(inner) => inner.restore(),
120+
}
121+
}
122+
123+
async fn async_restore(&mut self) -> Result<Vec<DataBlock>> {
111124
match self {
112125
WindowBuffer::V1(inner) => inner.restore().await,
113-
WindowBuffer::V2(inner) => inner.restore().await,
126+
WindowBuffer::V2(_) => unreachable!(),
114127
}
115128
}
116-
}
117129

118-
#[derive(Debug, Clone, Copy)]
119-
enum Step {
120-
Sync(SyncStep),
121-
Async(AsyncStep),
122-
Finish,
130+
fn is_async(&self) -> bool {
131+
match self {
132+
WindowBuffer::V1(_) => true,
133+
WindowBuffer::V2(_) => false,
134+
}
135+
}
123136
}
124137

125138
#[derive(Debug, Clone, Copy)]
126-
enum SyncStep {
139+
enum Step {
127140
Collect,
128141
Process,
129-
}
130-
131-
#[derive(Debug, Clone, Copy)]
132-
enum AsyncStep {
133142
Spill,
134143
Restore,
144+
Finish,
135145
}
136146

137147
pub struct TransformWindowPartitionCollect<S: DataProcessorStrategy> {
@@ -190,8 +200,7 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
190200
let spiller = if !settings.get_enable_backpressure_spiller()? {
191201
Either::Left(Spiller::create(ctx, operator, spill_config)?)
192202
} else {
193-
let runtime = GlobalIORuntime::instance();
194-
let buffer_pool = SpillsBufferPool::create(runtime, 128 * 1024 * 1024, 3);
203+
let buffer_pool = SpillsBufferPool::instance();
195204
Either::Right(BackpressureSpiller::create(
196205
ctx,
197206
operator,
@@ -215,19 +224,19 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
215224
is_collect_finished: false,
216225
output_data_blocks: VecDeque::new(),
217226
restored_data_blocks: Vec::new(),
218-
step: Step::Sync(SyncStep::Collect),
227+
step: Step::Collect,
219228
})
220229
}
221230

222231
fn next_step(&mut self, step: Step) -> Result<Event> {
223232
let event = match step {
224-
Step::Sync(_) => Event::Sync,
225-
Step::Async(_) => Event::Async,
226233
Step::Finish => {
227234
self.input.finish();
228235
self.output.finish();
229236
Event::Finished
230237
}
238+
Step::Spill | Step::Restore if self.buffer.is_async() => Event::Async,
239+
_ => Event::Sync,
231240
};
232241
self.step = step;
233242
Ok(event)
@@ -241,7 +250,7 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
241250

242251
// First check. flush memory data to external storage if need
243252
if self.need_spill() {
244-
return self.next_step(Step::Async(AsyncStep::Spill));
253+
return self.next_step(Step::Spill);
245254
}
246255

247256
if self.input.has_data() {
@@ -254,12 +263,12 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
254263

255264
// Check again. flush memory data to external storage if need
256265
if self.need_spill() {
257-
return self.next_step(Step::Async(AsyncStep::Spill));
266+
return self.next_step(Step::Spill);
258267
}
259268

260269
if self.input.is_finished() {
261270
self.is_collect_finished = true;
262-
return self.next_step(Step::Async(AsyncStep::Restore));
271+
return self.next_step(Step::Restore);
263272
}
264273

265274
self.input.set_need_data();
@@ -276,7 +285,7 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
276285
}
277286

278287
if self.need_spill() {
279-
return self.next_step(Step::Async(AsyncStep::Spill));
288+
return self.next_step(Step::Spill);
280289
}
281290

282291
if let Some(data_block) = self.output_data_blocks.pop_front() {
@@ -286,7 +295,7 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
286295

287296
match self.buffer.is_empty() {
288297
true => self.next_step(Step::Finish),
289-
false => self.next_step(Step::Async(AsyncStep::Restore)),
298+
false => self.next_step(Step::Restore),
290299
}
291300
}
292301
}
@@ -304,23 +313,23 @@ impl<S: DataProcessorStrategy> Processor for TransformWindowPartitionCollect<S>
304313
fn event(&mut self) -> Result<Event> {
305314
// (collect <--> spill) -> (process <--> restore) -> finish
306315
match self.step {
307-
Step::Sync(SyncStep::Collect) => self.collect(),
308-
Step::Async(AsyncStep::Spill) => {
316+
Step::Collect => self.collect(),
317+
Step::Spill => {
309318
if self.is_collect_finished {
310-
self.step = Step::Sync(SyncStep::Process);
319+
self.step = Step::Process;
311320
self.output()
312321
} else {
313322
// collect data again.
314-
self.step = Step::Sync(SyncStep::Collect);
323+
self.step = Step::Collect;
315324
self.collect()
316325
}
317326
}
318-
Step::Sync(SyncStep::Process) => self.output(),
319-
Step::Async(AsyncStep::Restore) => {
327+
Step::Process => self.output(),
328+
Step::Restore => {
320329
if self.restored_data_blocks.is_empty() {
321330
self.next_step(Step::Finish)
322331
} else {
323-
self.next_step(Step::Sync(SyncStep::Process))
332+
self.next_step(Step::Process)
324333
}
325334
}
326335
Step::Finish => Ok(Event::Finished),
@@ -329,22 +338,27 @@ impl<S: DataProcessorStrategy> Processor for TransformWindowPartitionCollect<S>
329338

330339
fn process(&mut self) -> Result<()> {
331340
match self.step {
332-
Step::Sync(SyncStep::Process) => {
341+
Step::Process => {
333342
let restored_data_blocks = std::mem::take(&mut self.restored_data_blocks);
334343
let processed_blocks = self.strategy.process_data_blocks(restored_data_blocks)?;
335344
self.output_data_blocks.extend(processed_blocks);
345+
Ok(())
346+
}
347+
Step::Spill => self.buffer.spill(),
348+
Step::Restore => {
349+
self.restored_data_blocks = self.buffer.restore()?;
350+
Ok(())
336351
}
337352
_ => unreachable!(),
338353
}
339-
Ok(())
340354
}
341355

342356
#[async_backtrace::framed]
343357
async fn async_process(&mut self) -> Result<()> {
344358
match &self.step {
345-
Step::Async(AsyncStep::Spill) => self.buffer.spill().await?,
346-
Step::Async(AsyncStep::Restore) => {
347-
self.restored_data_blocks = self.buffer.restore().await?;
359+
Step::Spill => self.buffer.async_spill().await?,
360+
Step::Restore => {
361+
self.restored_data_blocks = self.buffer.async_restore().await?;
348362
}
349363
_ => unreachable!(),
350364
}

0 commit comments

Comments
 (0)