diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index c915c566fe91..adcbb527186e 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/span" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -67,6 +68,8 @@ type invertedJoiner struct { unlimitedMemMonitor *mon.BytesMonitor diskMonitor *mon.BytesMonitor + cancelChecker cancelchecker.CancelChecker + // prefixEqualityCols are the ordinals of the columns from the join input // that represent join values for the non-inverted prefix columns of // multi-column inverted indexes. The length is equal to the number of @@ -414,6 +417,11 @@ func (ij *invertedJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.ProducerMetadata) { // Read the next batch of input rows. for len(ij.inputRows) < ij.batchSize { + if err := ij.cancelChecker.Check(); err != nil { + ij.MoveToDraining(err) + return ijStateUnknown, nil + } + row, meta := ij.input.Next() if meta != nil { if meta.Err != nil { @@ -742,6 +750,7 @@ func (ij *invertedJoiner) Start(ctx context.Context) { &ij.scanStatsListener, &ij.tenantConsumptionListener, ) ij.input.Start(ctx) + ij.cancelChecker.Reset(ctx, 16 /* checkInterval */) ij.runningState = ijReadingInput }