Skip to content

Commit

Permalink
Always reprocess from last action time in schedule workflow (#6028)
Browse files Browse the repository at this point in the history
## What changed?
In the schedule workflow, we should always reprocess from the last
action in order to handle getting woken up by a signal in between the
nominal time and actual time. This is basically what #5381 should have
been in the first place, and applies that logic to all iterations.

Note that this adds the logic but doesn't activate it yet.

## Why?
Fixes bug: signals (including refresh) in between nominal and actual
time could lead to dropped actions. This only happens if the cache runs
out or we do a CaN at just the right time, so it's not that easy to
reproduce.

This has also blocked activating #5799 since that makes this bug more
likely.

## How did you test it?
Added new unit test and replay test.
Reproduced locally by disabling cache and sending frequent signals to
try to disrupt a schedule. Verified that new version did not drop
actions.

## Potential risks
This changes workflow logic, but is pretty easy to see that the old
control flow is unaffected.

There's one more potential situation which isn't always handled
correctly: unpause in between nominal and actual times will usually run
the jittered action (this is probably the less surprising behavior), but
rarely, if a CaN happens at the same time, it can get skipped because
the cache will be regenerated.

---------

Co-authored-by: Rodrigo Zhou <[email protected]>
  • Loading branch information
2 people authored and pdoerner committed May 31, 2024
1 parent a55ba77 commit be4326e
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 21 deletions.
14 changes: 8 additions & 6 deletions service/worker/scheduler/testdata/generate_history.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Consider running this script to generate a new history for TestReplays
# whenever there's some change to the scheduler workflow.
# To use it, run a local server (any backend) and run this script.
#
# Note: this requires temporal cli >= 0.12

set -x

Expand All @@ -12,19 +14,19 @@ id=sched1
trap "temporal schedule delete -s '$id'" EXIT

temporal schedule create -s "$id" \
--overlap-policy bufferall \
--overlap-policy BufferAll \
--interval 10s \
--jitter 8s \
-w mywf \
-t mytq \
--workflow-type mywf \
--execution-timeout 5
--type mywf \
--execution-timeout 5s

sleep 50 # ~5 normal actions, some may be buffered

# backfill 3 actions
temporal schedule backfill -s "$id" \
--overlap-policy allowall \
--overlap-policy AllowAll \
--start-time 2022-05-09T11:22:22Z \
--end-time 2022-05-09T11:22:55Z

Expand All @@ -46,8 +48,8 @@ temporal schedule update -s "$id" \
--remaining-actions 1 \
-w mywf \
-t mytq \
--workflow-type mywf \
--execution-timeout 3
--type mywf \
--execution-timeout 3s

sleep 12
# should have used one action and be idle now
Expand Down
Binary file not shown.
35 changes: 23 additions & 12 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ const (
UpdateFromPrevious = 6
// do continue-as-new after pending signals
CANAfterSignals = 7
// set LastProcessedTime to last action instead of now
UseLastAction = 8
)

const (
Expand Down Expand Up @@ -295,19 +297,23 @@ func (s *scheduler) run() error {
s.logger.Warn("Time went backwards", "from", t1, "to", t2)
t2 = t1
}
nextWakeup := s.processTimeRange(
nextWakeup, lastAction := s.processTimeRange(
t1, t2,
// resolve this to the schedule's policy as late as possible
enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED,
false,
nil,
)
s.State.LastProcessedTime = timestamppb.New(t2)
if s.hasMinVersion(UseLastAction) {
s.State.LastProcessedTime = timestamppb.New(lastAction)
} else {
s.State.LastProcessedTime = timestamppb.New(t2)
}
// handle signals after processing time range that just elapsed
scheduleChanged := s.processSignals()
if scheduleChanged {
// need to calculate sleep again. note that processSignals may move LastProcessedTime backwards.
nextWakeup = s.processTimeRange(
// need to calculate sleep again
nextWakeup, _ = s.processTimeRange(
s.State.LastProcessedTime.AsTime(),
t2,
enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED,
Expand All @@ -330,8 +336,8 @@ func (s *scheduler) run() error {
if suggestContinueAsNew && s.pendingUpdate == nil && s.pendingPatch == nil && s.hasMinVersion(CANAfterSignals) {
// If suggestContinueAsNew was true but we had a pending update or patch, we would
// not break above, but process the update/patch. Now that we're done, we should
// break here to do CAN. (pendingUpdate and pendingPatch should always nil here,
// the check check above is just being defensive.)
// break here to do CAN. (pendingUpdate and pendingPatch should always be nil here,
// the check above is just being defensive.)
break
}

Expand Down Expand Up @@ -617,11 +623,11 @@ func (s *scheduler) processTimeRange(
overlapPolicy enumspb.ScheduleOverlapPolicy,
manual bool,
limit *int,
) time.Time {
) (time.Time, time.Time) {
s.logger.Debug("processTimeRange", "start", start, "end", end, "overlap-policy", overlapPolicy, "manual", manual)

if s.cspec == nil {
return time.Time{}
return time.Time{}, end
}

catchupWindow := s.getCatchupWindow()
Expand All @@ -634,10 +640,12 @@ func (s *scheduler) processTimeRange(
// take an action now. (Don't count as missed catchup window either.)
// Skip over entire time range if paused or no actions can be taken
if !s.canTakeScheduledAction(manual, false) {
return s.getNextTime(end).Next
// use end as last action time so that we don't reprocess time spent paused
return s.getNextTime(end).Next, end
}
}

lastAction := start
var next getNextTimeResult
for next = s.getNextTime(start); !(next.Next.IsZero() || next.Next.After(end)); next = s.getNextTime(next.Next) {
if !s.hasMinVersion(BatchAndCacheTimeQueries) && !s.canTakeScheduledAction(manual, false) {
Expand All @@ -656,14 +664,15 @@ func (s *scheduler) processTimeRange(
continue
}
s.addStart(next.Nominal, next.Next, overlapPolicy, manual)
lastAction = next.Next

if limit != nil {
if (*limit)--; *limit <= 0 {
break
}
}
}
return next.Next
return next.Next, lastAction
}

func (s *scheduler) canTakeScheduledAction(manual, decrement bool) bool {
Expand Down Expand Up @@ -769,7 +778,7 @@ func (s *scheduler) processBackfills() {
bfr := s.State.OngoingBackfills[0]
startTime := timestamp.TimeValue(bfr.GetStartTime())
endTime := timestamp.TimeValue(bfr.GetEndTime())
next := s.processTimeRange(
next, _ := s.processTimeRange(
startTime,
endTime,
bfr.GetOverlapPolicy(),
Expand Down Expand Up @@ -869,11 +878,13 @@ func (s *scheduler) processUpdate(req *schedspb.FullUpdateRequest) {

s.updateCustomSearchAttributes(req.SearchAttributes)

if s.hasMinVersion(UpdateFromPrevious) {
if s.hasMinVersion(UpdateFromPrevious) && !s.hasMinVersion(UseLastAction) {
// We need to start re-processing from the last event, so that we catch actions whose
// nominal time is before now but actual time (with jitter) is after now. Logic in
// processTimeRange will discard actions before the UpdateTime.
// Note: get last event time before updating s.Info.UpdateTime, otherwise it'll always be now.
// After version UseLastAction, this is effectively done in the main loop for all
// wakeups, not just updates, so we don't need to do it here.
s.State.LastProcessedTime = timestamppb.New(s.getLastEvent())
}

Expand Down
124 changes: 121 additions & 3 deletions service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,9 +1335,10 @@ func (s *workflowSuite) TestHugeBackfillAllowAll() {
t := base.Add(time.Duration(i) * time.Hour)
actual := baseStartTime.Add(time.Minute).Add(time.Duration(i/rateLimit) * time.Second)
runs[i] = workflowRun{
id: "myid-" + t.Format(time.RFC3339),
start: actual,
startTolerance: 5 * time.Second, // the "rate limit" isn't exact
id: "myid-" + t.Format(time.RFC3339),
start: actual,
// increase this number if this test fails for no reason:
startTolerance: 8 * time.Second, // the "rate limit" isn't exact
end: actual.Add(time.Minute),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
}
Expand Down Expand Up @@ -1726,6 +1727,123 @@ func (s *workflowSuite) TestUpdateBetweenNominalAndJitter() {
)
}

// Tests that a signal between a nominal time and jittered time for a start won't interrupt the start.
func (s *workflowSuite) TestSignalBetweenNominalAndJittered() {
// TODO: remove once default version is UseLastAction
prevTweakables := currentTweakablePolicies
currentTweakablePolicies.Version = UseLastAction
defer func() { currentTweakablePolicies = prevTweakables }()

s.runAcrossContinue(
[]workflowRun{
{
id: "myid-2022-06-01T01:00:00Z",
start: time.Date(2022, 6, 1, 1, 49, 22, 594000000, time.UTC),
end: time.Date(2022, 6, 1, 1, 53, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
{
id: "myid-2022-06-01T02:00:00Z",
start: time.Date(2022, 6, 1, 2, 2, 39, 204000000, time.UTC),
end: time.Date(2022, 6, 1, 2, 11, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
{
id: "myid-2022-06-01T03:00:00Z",
start: time.Date(2022, 6, 1, 3, 37, 29, 538000000, time.UTC),
end: time.Date(2022, 6, 1, 3, 41, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
{
id: "myid-2022-06-01T04:00:00Z",
start: time.Date(2022, 6, 1, 4, 23, 34, 755000000, time.UTC),
end: time.Date(2022, 6, 1, 4, 27, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
},
[]delayedCallback{
{
// signal between nominal and jittered time
at: time.Date(2022, 6, 1, 3, 22, 10, 0, time.UTC),
f: func() {
s.env.SignalWorkflow(SignalNameRefresh, nil)
},
},
{
at: time.Date(2022, 6, 1, 5, 0, 0, 0, time.UTC),
finishTest: true,
},
},
&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: durationpb.New(1 * time.Hour),
}},
Jitter: durationpb.New(1 * time.Hour),
},
},
)
}

func (s *workflowSuite) TestPauseUnpauseBetweenNominalAndJittered() {
s.T().Skip("this illustrates an existing bug")

s.runAcrossContinue(
[]workflowRun{
{
id: "myid-2022-06-01T01:00:00Z",
start: time.Date(2022, 6, 1, 1, 49, 22, 594000000, time.UTC),
end: time.Date(2022, 6, 1, 1, 53, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
{
id: "myid-2022-06-01T02:00:00Z",
start: time.Date(2022, 6, 1, 2, 2, 39, 204000000, time.UTC),
end: time.Date(2022, 6, 1, 2, 11, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
{
id: "myid-2022-06-01T03:00:00Z",
start: time.Date(2022, 6, 1, 3, 37, 29, 538000000, time.UTC),
end: time.Date(2022, 6, 1, 3, 41, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
{
id: "myid-2022-06-01T04:00:00Z",
start: time.Date(2022, 6, 1, 4, 23, 34, 755000000, time.UTC),
end: time.Date(2022, 6, 1, 4, 27, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
},
[]delayedCallback{
{
at: time.Date(2022, 6, 1, 3, 20, 0, 0, time.UTC),
f: func() {
s.env.SignalWorkflow(SignalNamePatch, &schedpb.SchedulePatch{Pause: "paused"})
},
},
{
at: time.Date(2022, 6, 1, 3, 30, 0, 0, time.UTC),
f: func() {
s.env.SignalWorkflow(SignalNamePatch, &schedpb.SchedulePatch{Unpause: "go ahead"})
},
},
{
at: time.Date(2022, 6, 1, 5, 0, 0, 0, time.UTC),
finishTest: true,
},
},
&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: durationpb.New(1 * time.Hour),
}},
Jitter: durationpb.New(1 * time.Hour),
},
},
)
}

func (s *workflowSuite) TestLimitedActions() {
// written using low-level mocks so we can sleep forever

Expand Down

0 comments on commit be4326e

Please sign in to comment.