From 88f4a5e53f0e7d55076198679b1cc737b32a0926 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Sat, 20 Dec 2025 01:30:18 +0000 Subject: [PATCH] rowexec: add cancel checking to inverted joiner We just saw a test failure where the stmt didn't respect the statement timeout. The goroutine dump showed that we were evaluating a geo inverted expression in the inverted joiner "read" phase, so this commit adds the cancel checking on each 16th input row (note that this is higher frequency than elsewhere in the row-by-row engine where we check every 128 rows since geo evaluations could be more time-consuming). Note that we could've gone deeper and added the cancellation checks into the geo evaluation functions themselves, but it's not clear whether that's needed. Release note: None --- pkg/sql/rowexec/inverted_joiner.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index c915c566fe91..adcbb527186e 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/span" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -67,6 +68,8 @@ type invertedJoiner struct { unlimitedMemMonitor *mon.BytesMonitor diskMonitor *mon.BytesMonitor + cancelChecker cancelchecker.CancelChecker + // prefixEqualityCols are the ordinals of the columns from the join input // that represent join values for the non-inverted prefix columns of // multi-column inverted indexes. The length is equal to the number of @@ -414,6 +417,11 @@ func (ij *invertedJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.ProducerMetadata) { // Read the next batch of input rows. for len(ij.inputRows) < ij.batchSize { + if err := ij.cancelChecker.Check(); err != nil { + ij.MoveToDraining(err) + return ijStateUnknown, nil + } + row, meta := ij.input.Next() if meta != nil { if meta.Err != nil { @@ -742,6 +750,7 @@ func (ij *invertedJoiner) Start(ctx context.Context) { &ij.scanStatsListener, &ij.tenantConsumptionListener, ) ij.input.Start(ctx) + ij.cancelChecker.Reset(ctx, 16 /* checkInterval */) ij.runningState = ijReadingInput }