Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions chasm/lib/scheduler/gen/schedulerpb/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 54 additions & 3 deletions chasm/lib/scheduler/generator_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package scheduler

import (
"fmt"
"time"

"go.temporal.io/api/serviceerror"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
"go.uber.org/fx"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -77,9 +79,10 @@ func (g *GeneratorTaskExecutor) Execute(
result, err := g.SpecProcessor.ProcessTimeRange(scheduler, t1, t2, scheduler.overlapPolicy(), "", false, nil)
if err != nil {
// An error here should be impossible, send to the DLQ.
logger.Error("error processing time range", tag.Error(err))
msg := "failed to process a time range"
logger.Error(msg, tag.Error(err))
return fmt.Errorf("%w: %w",
serviceerror.NewInternalf("failed to process a time range"),
serviceerror.NewInternal(msg),
err)
}

Expand All @@ -88,8 +91,15 @@ func (g *GeneratorTaskExecutor) Execute(
invoker.EnqueueBufferedStarts(ctx, result.BufferedStarts)
}

// Write the new high water mark.
// Write the new high water mark and future action times.
generator.LastProcessedTime = timestamppb.New(result.LastActionTime)
if err := g.updateFutureActionTimes(ctx, generator, scheduler); err != nil {
msg := "failed to update future action times"
logger.Error(msg, tag.Error(err))
return fmt.Errorf("%w: %w",
serviceerror.NewInternal(msg),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a non retryable error to ensure this task doesn't stay in the queue?

Copy link
Contributor Author

@lina-temporal lina-temporal Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it should - I updated it here, I also did a pass of the rest of the scheduler package for similar changes (will open a separate PR with those momentarily actually it's small enough I'm going to include it here).

err)
}

// Check if the schedule has gone idle.
idleTimeTotal := g.Config.Tweakables(scheduler.Namespace).IdleTime
Expand Down Expand Up @@ -125,6 +135,47 @@ func (g *GeneratorTaskExecutor) logSchedule(logger log.Logger, msg string, sched
tag.NewStringerTag("policies", jsonStringer{scheduler.Schedule.Policies}))
}

func (g *GeneratorTaskExecutor) updateFutureActionTimes(
ctx chasm.MutableContext,
generator *Generator,
scheduler *Scheduler,
) error {
nextTime := func(t time.Time) (time.Time, error) {
res, err := g.SpecProcessor.GetNextTime(scheduler, t)
return res.Next, err
}

// Make sure we don't emit more future times than are remaining.
count := recentActionCount
if scheduler.Schedule.State.LimitedActions {
count = min(int(scheduler.Schedule.State.RemainingActions), recentActionCount)
}

futureTimes := make([]*timestamppb.Timestamp, 0, count)
t := timestamp.TimeValue(generator.LastProcessedTime)
var err error
for len(futureTimes) < count {
t, err = nextTime(t)
if err != nil {
return err
}
if t.IsZero() {
break
}

if scheduler.Info.UpdateTime.AsTime().After(t) {
// Skip action times that occur before the schedule's update time.
continue
}

futureTimes = append(futureTimes, timestamppb.New(t))
}

generator.FutureActionTimes = futureTimes

return nil
}

func (g *GeneratorTaskExecutor) Validate(
ctx chasm.Context,
generator *Generator,
Expand Down
53 changes: 53 additions & 0 deletions chasm/lib/scheduler/generator_tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,56 @@ func (s *generatorTasksSuite) TestExecuteBufferTask_Basic() {
s.NoError(err)
s.True(s.hasTask(&tasks.ChasmTaskPure{}, chasm.TaskScheduledTimeImmediate))
}

func (s *generatorTasksSuite) TestUpdateFutureActionTimes_UnlimitedActions() {
ctx := s.newMutableContext()
sched := s.scheduler
generator, err := sched.Generator.Get(ctx)
s.NoError(err)

s.executor.SpecProcessor = newTestSpecProcessor(s.controller)

err = s.executor.Execute(ctx, generator, chasm.TaskAttributes{}, &schedulerpb.GeneratorTask{})
s.NoError(err)

s.NotEmpty(generator.FutureActionTimes)
s.Require().Len(generator.FutureActionTimes, 10)
}

func (s *generatorTasksSuite) TestUpdateFutureActionTimes_LimitedActions() {
ctx := s.newMutableContext()
sched := s.scheduler
generator, err := sched.Generator.Get(ctx)
s.NoError(err)

sched.Schedule.State.LimitedActions = true
sched.Schedule.State.RemainingActions = 2
s.executor.SpecProcessor = newTestSpecProcessor(s.controller)

err = s.executor.Execute(ctx, generator, chasm.TaskAttributes{}, &schedulerpb.GeneratorTask{})
s.NoError(err)

s.Equal(2, len(generator.FutureActionTimes))
}

func (s *generatorTasksSuite) TestUpdateFutureActionTimes_SkipsBeforeUpdateTime() {
ctx := s.newMutableContext()
sched := s.scheduler
generator, err := sched.Generator.Get(ctx)
s.NoError(err)

s.executor.SpecProcessor = newTestSpecProcessor(s.controller)

// UpdateTime acts as a floor - action times at or before it are skipped.
baseTime := ctx.Now(generator).UTC()
updateTime := baseTime.Add(defaultInterval / 2)
sched.Info.UpdateTime = timestamppb.New(updateTime)

err = s.executor.Execute(ctx, generator, chasm.TaskAttributes{}, &schedulerpb.GeneratorTask{})
s.NoError(err)

s.Require().NotEmpty(generator.FutureActionTimes)
for _, futureTime := range generator.FutureActionTimes {
s.True(futureTime.AsTime().After(updateTime))
}
}
4 changes: 3 additions & 1 deletion chasm/lib/scheduler/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduler_test
import (
"time"

commonpb "go.temporal.io/api/common/v1"
schedulepb "go.temporal.io/api/schedule/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/server/chasm/lib/scheduler"
Expand Down Expand Up @@ -34,7 +35,8 @@ func defaultSchedule() *schedulepb.Schedule {
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
WorkflowId: "scheduled-wf",
WorkflowId: "scheduled-wf",
WorkflowType: &commonpb.WorkflowType{Name: "scheduled-wf-type"},
},
},
},
Expand Down
1 change: 1 addition & 0 deletions chasm/lib/scheduler/invoker_execute_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *invokerExecuteTaskSuite) SetupTest() {
Config: defaultConfig(),
MetricsHandler: metrics.NoopMetricsHandler,
BaseLogger: s.logger,
SpecProcessor: s.specProcessor,
HistoryClient: s.mockHistoryClient,
FrontendClient: s.mockFrontendClient,
})
Expand Down
1 change: 1 addition & 0 deletions chasm/lib/scheduler/invoker_process_buffer_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (s *invokerProcessBufferTaskSuite) SetupTest() {
Config: defaultConfig(),
MetricsHandler: metrics.NoopMetricsHandler,
BaseLogger: s.logger,
SpecProcessor: s.specProcessor,
})
}

Expand Down
10 changes: 8 additions & 2 deletions chasm/lib/scheduler/invoker_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type (
Config *Config
MetricsHandler metrics.Handler
BaseLogger log.Logger
SpecProcessor SpecProcessor

HistoryClient resource.HistoryClient

Expand Down Expand Up @@ -540,9 +541,14 @@ func (e *InvokerExecuteTaskExecutor) startWorkflow(
return nil, err
}

// realTime may be slightly past the time of the action's first scheduled WFT.
realTime := time.Now()
desiredTime := start.ActualTime
e.MetricsHandler.Timer(metrics.ScheduleActionDelay.Name()).Record(realTime.Sub(desiredTime.AsTime()))

return &schedulepb.ScheduleActionResult{
ScheduleTime: start.ActualTime,
ActualTime: timestamppb.New(time.Now()),
ScheduleTime: desiredTime,
ActualTime: timestamppb.New(realTime),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also from another PR (nexus-watcher). Ignore, will fix before merge.

StartWorkflowResult: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: result.RunId,
Expand Down
3 changes: 3 additions & 0 deletions chasm/lib/scheduler/proto/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ message SchedulerState {
message GeneratorState {
// High water mark.
google.protobuf.Timestamp last_processed_time = 3;

// A list of upcoming times an action will be triggered.
repeated google.protobuf.Timestamp future_action_times = 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be persisted? Can't it be calculated on the fly when calculating the memo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it can't be, because the Memo is computed in the context of a component (not a task executor), and therefore it doesn't have the wired-in dependencies that computing this relies upon (SpecProcessor).

}

// CHASM scheduler's Invoker internal state.
Expand Down
Loading
Loading