[DO_NOT_MERGE][FLINK-38544] Validation: Force disable checkpointing during recovery in tests#28559
Draft
rkhachatryan wants to merge 23 commits into
Draft
[DO_NOT_MERGE][FLINK-38544] Validation: Force disable checkpointing during recovery in tests#28559rkhachatryan wants to merge 23 commits into
rkhachatryan wants to merge 23 commits into
Conversation
…m toBeConsumedBuffers
Split the single toBeConsumedBuffers queue into two queues with disjoint
responsibilities:
- recoveredBuffers (new): holds buffers migrated from RecoveredInputChannel
during construction; consumed by getNextRecoveredBuffer() which retains
the priority-event interleaving and last-buffer dynamic next-data-type
detection introduced by FLINK-39018.
- toBeConsumedBuffers (existing): reverted to its pre-FLINK-39018 role of
holding FullyFilledBuffer partial-buffer splits only. The recovery-aware
early branch in getNextBuffer() and the checkpointStarted inflight scan
no longer touch this queue.
Restores the checkState(toBeConsumedBuffers.isEmpty()) guard in
requestSubpartitions() (removed by cebc174). hasPendingPriorityEvent,
notifyPriorityEvent, and the constructor signature are unchanged.
Pure refactor: no public API change, no new tests; verified by the 9 existing
LocalInputChannelTest regression cases.
(cherry picked from commit 292cc4b)
(cherry picked from commit 7fbfc78)
…lling v2 - Adds BufferRequester, RecoverableInputChannel, RecoveryCheckpointTrigger interfaces with their final signatures (including getChannelInfo on RecoverableInputChannel and NO_OP singleton on RecoveryCheckpointTrigger). - Adds RecoveryCheckpointBarrier sentinel + DiskSnapshot data class with final 3-arg constructor signature and Chunk / StartPos / empty() helpers. - ChannelStateWriter gains addInputDataFromSpill and peekWriteResult default methods so all callers can compile against the interface without the dispatcher implementation landing in this phase. - RecoveredInputChannel#releaseAllResources visibility: package-private -> public References to SpillFile in DiskSnapshot's constructor are forward references; SpillFile itself lands in Phase 3. Each phase commit only needs to compile as a whole tree at the final commit, not in isolation. Design: requirements/38544/phase1_interfaces/design.md (cherry picked from commit 98c7b42)
- Local/Remote InputChannel implement RecoverableInputChannel from Phase 1 - recoveredBuffers reshaped to Deque<Buffer>; allRecoveredBuffersDelivered flag - getNextBuffer() unified under a single inRecovery predicate - checkpointStarted split into mutually-exclusive in-recovery / not-in-recovery - stateConsumedFuture triggered by (allRecoveredBuffersDelivered && queue empty) - RecoveredInputChannel.toInputChannel migrates via the new push interface; the initialRecoveredBuffers constructor parameter is gone. - LocalInputChannel.getNextRecoveredBuffer helper deleted Design: requirements/38544/phase2_input_channel/design.md (cherry picked from commit 8290409)
- New SpillFile: append-only segmented disk store with 64 MiB segments, reference counter + cleanedUp guard, and Snapshot view over segments and entries. All public signatures (append, snapshot, readBytesAt, acquire, release, isClosed) land in this commit; later phases only fill in bodies. - New FilteredBufferWriter: prefilter + postfilter buffer accumulator, flushing the post-filter buffer to disk on rotation. - New SpillFileWriter: thin facade exposing SpillFile lifecycle to filter callers. - RecoveredChannelStateHandler.recover filter branch routes output to a SpillFile instead of channel.onRecoveredStateBuffer; the accumulator's prefilter and postfilter buffers are sourced from the source channel's exclusive pool (no heap fallback). - InputChannelRecoveredStateHandler exposes getProducedSpillFile so Phase 4 drain wiring can pick up the frozen file after filter completes; spill-tmp-directories argument is required (no backward-compat shim). Design: requirements/38544/phase3_spill_writer/design.md (cherry picked from commit 2cbbbd6)
… removal - New SpillFileReader implements RecoveryCheckpointTrigger + Closeable. drain(): buffer alloc + disk read outside lock; deliver + offset advance inside lock. snapshotAndInsertBarriers(cpId): atomic startPos snapshot + per-channel barrier insert. Constructor derives the InputChannelInfo map internally; bodies pair acquire/release against SpillFile's ref counter. - New RecoveredChannelBufferRequester delegates to RecoveredInputChannel pool. - RecoveredInputChannel.requestBufferBlocking heap fallback removed (no more MemorySegmentFactory.allocateUnpooledSegment; OOM path eliminated). - channelIOExecutor wired: filter-on submits drain after conversion completes; exceptions bubble via StreamTask.asyncExceptionHandler. Design: requirements/38544/phase4_spill_reader/design.md (cherry picked from commit 1315d38)
- ChannelState dispatcher onCheckpointStartedForAllInputs implements Step 1 (snapshotAndInsertBarriers) -> Step 2 (per-input checkpointStarted) -> Step 3 (addInputDataFromSpill) -> cpId-completion release callback. - Hook AlternatingWaitingForFirstBarrierUnaligned.barrierReceived and AlternatingCollectingBarriers.alignedCheckpointTimeout into the dispatcher. - ChannelStateWriterImpl.addInputDataFromSpill: async demux by Chunk.channelInfo, empty snapshot inline early return, failures propagate via ChannelStateWriteResult. - Stream task pipelines (One/Two/Multiple) wire ChannelState through the InputProcessorUtil + SingleCheckpointBarrierHandler so the dispatcher hook reaches the right barrier-handler instance. - ITCases (relocated under flink-runtime to share the package with SpillFile): rescale + filter + large record OOM regression, UC during recovery. FLINK-38544 spilling v2 feature complete. Design: requirements/38544/phase5_coordination/design.md (cherry picked from commit 7badbd2)
Delete the verbose explanatory comments describing the old channel-state recovery wiring (the recoverySetupCompleteFuture javadoc, the allOf-vs-thenRun race essay, the setCheckpointingDuringRecoveryEnabled note). These annotate code that the subsequent async-recovery rewrite replaces; removing them up front keeps the rewrite diff free of comment-deletion churn.
Rewrite StreamTask channel-state recovery to run asynchronously on the channelIOExecutor: split restoreStateAndGates into recoverChannelsWithCheckpointing / recoverChannelsWithoutCheckpointing, threading the recovery checkpoint trigger and the fetched-state drainer through the new SequentialChannelStateReader / FetchedChannelState(Drainer) / RecoveryCheckpointTrigger interfaces. Also release channel state before returning in SequentialChannelStateReaderImpl.
…annel-state read The async-recovery rewrite put the channel-state read future (runAsync(readInputChannelState)) into the completeAll(...) set that gates the recovery-completion suspend() poison mail. That future is never already-complete, so suspend() was deferred past the start of the restore mailbox loop. The loop then ran the default action (record processing) before recovery finished: - records were processed before gate conversion/requestPartitions, losing them (MultipleInputStreamTaskTest: 10 -> 7); - processInput hitting END_OF_INPUT during restore called suspend() itself, exiting the loop before the recovery future was done, so restoreInternal's checkState(allGatesRecoveredFuture.isDone()) threw "Mailbox loop interrupted before recovery was finished" (StreamTaskTest.testProcessWith*); and - on downscale, recovery could stall entirely (UnalignedCheckpointRescaleITCase hang). Run readInputChannelState as a fire-and-forget feeder instead, and gate suspend()/recoveryCompletionFuture on the gates' stateConsumedFutures only. A stateConsumedFuture completes only once the consumer drains the end-of-state sentinel that readInputChannelState pushes, so gating on it already implies the read finished, and requestPartitions still runs after the read. For a task with no recovered state the futures are already complete, so suspend() is enqueued before the loop runs the default action -- restoring the pre-rewrite ordering. Unlike suspending the default action, this does not block the consumer, so recovery that must drain real channel state still completes. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… record-filter context When checkpointing-during-recovery is disabled, the RecordFilterContext never spills, so it needs no spill directories. createRecordFilterContext nevertheless dereferenced getEnvironment().getIOManager().getSpillingDirectoriesPaths(), which NPEs in minimal environments that return a null IOManager (e.g. DummyEnvironment-based StreamTaskTest cases). On the channelIOExecutor during recovery that NPE was routed to handleAsyncException -> failExternally, which DummyEnvironment rejects with UnsupportedOperationException, escalating to the fatal error handler and crashing the surefire fork. Pass an empty spill-directory array for the disabled context instead of touching the IOManager. The async-recovery rewrite introduced this IOManager access during recovery; these StreamTaskTest cases passed before it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drop the needsRecovery and checkpointingDuringRecoveryEnabled fields and their accessors from the input-gate API. needsRecovery is now passed as a parameter (requestPartitions(boolean) / convertRecoveredInputChannels(boolean)) and checkpointing-during-recovery is read from the job config at the call site instead of being pushed onto each gate. Updates the gate mocks and test builders accordingly.
…ier missing When collectPreRecoveryBarrier finds no during-recovery sentinel for a checkpoint in a still-recovering channel, decline with CHECKPOINT_DECLINED_TASK_NOT_READY (not an IOException). That reason is not counted against the tolerable-failure threshold, so the checkpoint is deferred and retried; the recovered buffers stay queued and are captured by a later checkpoint, so no in-flight data is lost.
A disabled RecordFilterContext never spills, so it does not need spill directories. Only require non-empty tmpDirectories when checkpointing-during-recovery is enabled; otherwise tolerate a null/empty value (e.g. an environment without IOManager spilling directories).
Inline // review / // review nit / // review todo annotations left for the reviewer; drop this commit before merge.
…state tests CI (build 76428) failed the spotless-check on flink-runtime for four recovery test files. Applied spotless:apply (google-java-format); formatting-only changes. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…overy recoverChannelsWithoutCheckpointing deferred requestPartitions/conversion until completeAll(all gates' stateConsumedFutures), so no gate converted until every gate's recovered state had been drained. A selective-reading multi-input operator only drains the *selected* input's end-of-state sentinel, so an unselected gate never drained (it is read only after conversion) while conversion waited for it to drain first -- a circular wait that deadlocked the restore mailbox loop (parked in processMailsWhenDefaultActionUnavailable -> mailbox.take()). This hung StreamTaskSelectiveReadingITCase and other multi-input recovery in CI (build 76435: tests/table/python groups, watchdog-killed). Trigger each gate's requestPartitions(false) off its own getStateConsumedFuture() (restoring the pre-rewrite per-gate behavior from d9fc48), so a drained gate converts immediately and the reader can progress; suspend() remains gated on completeAll(futures). The flag-ON path (recoverChannelsWithCheckpointing) is unaffected -- it is drainer-driven, not gated on the consumer draining. Validated: StreamTaskSelectiveReadingITCase clean over 75 runs under JDK-17 load (was hanging on iteration 1); TaskCheckpointingBehaviourTest, recovered-channel tests and MultipleInputStreamTaskTest green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Collaborator
…ore-loop race
With checkpointing-during-recovery enabled, recoverChannelsWithCheckpointing
completes the recovery future asynchronously (mailbox mails + channelIOExecutor
stages) and, unlike the checkpointing-disabled path, requests partitions
mid-chain -- enabling real input before the recovery-completion suspend(). The
restore mailbox loop can then be suspended before recovery finishes, tripping
checkState(allGatesRecoveredFuture.isDone()) in restoreInternal with "Mailbox
loop interrupted before recovery was finished". Two cases were observed:
- Source tasks: the restore loop's default action starts the legacy source,
which finishes and calls mailboxProcessor.suspend() from its completion
callback (SourceStreamTask).
- Non-source operators whose input is already fully available (e.g. a
bounded/batch operator behind a blocking exchange, as in TPC-DS): after
partitions are requested mid-recovery, the default action drains the input
straight to END_OF_INPUT and suspends.
The checkpointing-disabled path never hit either: it completes synchronously
(no input gates) or requests partitions only as the recovery-completion
suspend() -- an urgent poison mail -- is enqueued, so the restore loop exits
before any input is processed.
Mirror that ordering for the checkpointing-enabled path:
- No input gates (source): complete recovery synchronously, straight to NO_OP,
so suspend() is enqueued before the restore loop runs the default action.
- Input gates but no recovered channel state (initial deploy, savepoint or
aligned restore): defer requestPartitions until after recovery completes;
the urgent recovery suspend() ends the restore loop first and partitions are
requested in the main mailbox loop.
- Recovered channel state present (unaligned restore): unchanged -- request
partitions and drain through the checkpoint trigger during recovery.
Validated: deterministic repro (bounded BATCH job, recovery window widened)
failed before and passes after; heavy-deployment e2e 8/8; operator-restore
(ChainOrderTest, KeyedComplexChainTest) and UnalignedCheckpoint{,Rescale}ITCase
(the with-state path) all green.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The preceding commit deferred requestPartitions to after recovery when there was no recovered channel state, relying on the recovery-completion suspend() (an urgent poison mail) being processed before the requestPartitions mail in the restore loop. That ordering does not reliably hold: requestPartitions was observed running during the restore loop, so input could still be processed and reach END_OF_INPUT before recovery completed. It also withheld input (and thus checkpoint barriers) during recovery, which is contrary to the goal of checkpointing during recovery. Revert to requesting partitions during recovery for tasks with input channels (restoring the intended behaviour). The source short-circuit for tasks with no input gates is kept; the restore-loop race for operators that finish their input during recovery is addressed structurally in the following commit. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A task that reaches END_OF_INPUT during recovery -- e.g. a bounded/batch
operator whose input is already fully available behind a blocking exchange, as
seen on TPC-H/TPC-DS HashAggregate -- previously suspended the restore mailbox
loop immediately. With checkpointing during recovery the recovery future
completes asynchronously, so this could end the restore loop before recovery
finished, tripping checkState(allGatesRecoveredFuture.isDone()) in
restoreInternal with "Mailbox loop interrupted before recovery was finished".
Input is intentionally processed during recovery (so checkpoint barriers can
arrive from inputs); the bug is only the premature lifecycle finish. When
END_OF_INPUT is reached while recovery is still in progress, suspend only the
default action (to avoid busy-spinning) and resume it once recovery completes,
via a new recoveryCompletionFuture field set in restoreStateAndGates.
processInput then re-fires END_OF_INPUT from the main mailbox loop and finishes
normally. Sources remain covered by the no-input-gates short-circuit.
Validated with a deterministic repro (bounded BATCH job, recovery window
widened): the race reproduces with this deferral disabled and is gone with it
enabled. Full regression green: heavy-deployment e2e 6/6; ChainOrderTest,
KeyedComplexChainTest, KeyedPartitionWindowedStreamITCase; UnalignedCheckpoint
{,Rescale}ITCase (the with-recovered-state path) 11/11 and 50/50.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…all inputs finished
StreamMultipleInputProcessor.processInput returned NOTHING_AVAILABLE when no input
was selectable, even once all inputs were finished. Because getAvailableFuture()
returns AVAILABLE as soon as all inputs are finished, the mailbox loop then never
blocks and spins on processInput at 100% CPU.
This is normally not reached (a task suspends on the first END_OF_INPUT and
processInput is not called again), but the preceding "defer task finish until
recovery completes" change resumes the default action after recovery and calls
processInput again on an operator that finished during recovery. Single-input
processors keep returning END_OF_INPUT once finished, so they finish cleanly;
multiple-input operators (e.g. NestedLoopJoin in TPC-DS) instead busy-spun until
the job was killed.
Report END_OF_INPUT when there is nothing to read and all inputs are finished,
consistent with single-input processors and with
MultipleInputSelectionHandler#calculateOverallStatus.
Reproduced deterministically (bounded BATCH non-equi join -> NestedLoopJoin under
CDR, recovery window widened): busy-spins without this change, finishes with it.
Full regression green incl. MultipleInputITCase, heavy-deployment e2e 6/6,
UnalignedCheckpoint{,Rescale}ITCase.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…veBatchScheduler)
createRecordFilterContext built per-physical-gate filter configs from
StreamConfig#getInPhysicalEdges and asserted it matched the number of runtime
input gates. For a dynamically connected input -- e.g. reading a cached
intermediate result under the AdaptiveBatchScheduler -- the task has an input
gate but the config carries no physical edges (the connection, and thus the
partitioner, is decided by the scheduler at runtime). With checkpointing during
recovery enabled this tripped "Number of input gates (1) does not match number
of physical edges (0)" -> AsynchronousException "Unable to set up recovered
channel state", failing the job (seen in CacheITCase).
Such jobs are batch and have no unaligned-checkpoint channel state to filter, so
when there are input gates but no physical edges, return a disabled filter
context instead of failing.
Validated: CacheITCase 6/6 (was 5 errors); record-filter/recovered-channel-state
unit tests 32/32; UnalignedCheckpoint{,Rescale}ITCase (the with-channel-state
filtering path) 11/11 and 50/50 unregressed.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Testing #28554