Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions chasm/lib/scheduler/backfiller_tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,9 @@ func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
// Either type of request will spawn a Backfiller and schedule an immediate pure task.
_, err = s.node.CloseTransaction()
s.NoError(err)
s.GreaterOrEqual(1, len(s.addedTasks))
task, ok := s.addedTasks[0].(*tasks.ChasmTaskPure)
s.True(ok)
s.Equal(chasm.TaskScheduledTimeImmediate, task.GetVisibilityTime())
s.True(s.hasTask(&tasks.ChasmTaskPure{}, chasm.TaskScheduledTimeImmediate))

// Run a backfill task.
s.addedTasks = make([]tasks.Task, 0) // Clear old tasks.
err = s.executor.Execute(ctx, backfiller, chasm.TaskAttributes{}, &schedulerpb.BackfillerTask{})
s.NoError(err)
_, err = s.node.CloseTransaction() // TODO - remove this when CHASM has unit testing hooks for task generation
Expand All @@ -260,7 +256,11 @@ func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
s.NoError(err)
s.Nil(res)
} else {
s.GreaterOrEqual(1, len(s.addedTasks))
// TODO - check that a pure task to continue driving backfill exists here. Because
// a pure task in the tree already has the physically-created status, closing the
// transaction won't call our backend mock for AddTasks twice. Fix this when CHASM
// offers unit testing hooks for task generation.

s.Equal(int64(c.ExpectedAttempt), backfiller.GetAttempt())
s.Equal(c.ExpectedLastProcessedTime.UTC(), backfiller.GetLastProcessedTime().AsTime())
}
Expand Down
15 changes: 13 additions & 2 deletions chasm/lib/scheduler/generator_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ func (g *GeneratorTaskExecutor) Execute(
result, err := g.SpecProcessor.ProcessTimeRange(scheduler, t1, t2, scheduler.overlapPolicy(), "", false, nil)
if err != nil {
// An error here should be impossible, send to the DLQ.
logger.Error("error processing time range", tag.Error(err))
msg := "failed to process a time range"
logger.Error(msg, tag.Error(err))
return fmt.Errorf("%w: %w",
serviceerror.NewInternalf("failed to process a time range"),
serviceerror.NewInternal(msg),
err)
}

Expand All @@ -91,6 +92,16 @@ func (g *GeneratorTaskExecutor) Execute(
// Write the new high water mark.
generator.LastProcessedTime = timestamppb.New(result.LastActionTime)

// Update visibility, since future action times may have changed.
err = scheduler.UpdateVisibility(ctx, g.SpecProcessor, nil)
if err != nil {
msg := "failed to update visibility"
logger.Error(msg, tag.Error(err))
return fmt.Errorf("%w: %w",
serviceerror.NewInternal(msg),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a non retryable error to ensure this task doesn't stay in the queue?

Copy link
Contributor Author

@lina-temporal lina-temporal Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it should - I updated it here, I also did a pass of the rest of the scheduler package for similar changes (will open a separate PR with those momentarily actually it's small enough I'm going to include it here).

err)
}

// Check if the schedule has gone idle.
idleTimeTotal := g.Config.Tweakables(scheduler.Namespace).IdleTime
idleExpiration, isIdle := scheduler.getIdleExpiration(ctx, idleTimeTotal, result.NextWakeupTime)
Expand Down
5 changes: 1 addition & 4 deletions chasm/lib/scheduler/generator_tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,5 @@ func (s *generatorTasksSuite) TestExecuteBufferTask_Basic() {
// Ensure we scheduled an immediate physical pure task on the tree.
_, err = s.node.CloseTransaction()
s.NoError(err)
s.Equal(1, len(s.addedTasks))
task, ok := s.addedTasks[0].(*tasks.ChasmTaskPure)
s.True(ok)
s.Equal(chasm.TaskScheduledTimeImmediate, task.GetVisibilityTime())
s.True(s.hasTask(&tasks.ChasmTaskPure{}, chasm.TaskScheduledTimeImmediate))
}
4 changes: 3 additions & 1 deletion chasm/lib/scheduler/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduler_test
import (
"time"

commonpb "go.temporal.io/api/common/v1"
schedulepb "go.temporal.io/api/schedule/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/server/chasm/lib/scheduler"
Expand Down Expand Up @@ -34,7 +35,8 @@ func defaultSchedule() *schedulepb.Schedule {
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
WorkflowId: "scheduled-wf",
WorkflowId: "scheduled-wf",
WorkflowType: &commonpb.WorkflowType{Name: "scheduled-wf-type"},
},
},
},
Expand Down
5 changes: 1 addition & 4 deletions chasm/lib/scheduler/invoker_execute_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/testing/mockapi/workflowservicemock/v1"
"go.temporal.io/server/service/history/tasks"
"go.uber.org/mock/gomock"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand All @@ -43,6 +42,7 @@ func (s *invokerExecuteTaskSuite) SetupTest() {
Config: defaultConfig(),
MetricsHandler: metrics.NoopMetricsHandler,
BaseLogger: s.logger,
SpecProcessor: s.specProcessor,
HistoryClient: s.mockHistoryClient,
FrontendClient: s.mockFrontendClient,
})
Expand Down Expand Up @@ -340,9 +340,6 @@ func (s *invokerExecuteTaskSuite) runExecuteTestCase(c *executeTestCase) {
s.ExpectReadComponent(invoker)
s.ExpectUpdateComponent(invoker)

// Clear old tasks and run the execute task.
s.addedTasks = make([]tasks.Task, 0)

// Create engine context for side effect task execution
engineCtx := s.newEngineContext()
err = s.executor.Execute(engineCtx, chasm.ComponentRef{}, chasm.TaskAttributes{}, &schedulerpb.InvokerExecuteTask{})
Expand Down
5 changes: 1 addition & 4 deletions chasm/lib/scheduler/invoker_process_buffer_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/tasks"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand All @@ -33,6 +32,7 @@ func (s *invokerProcessBufferTaskSuite) SetupTest() {
Config: defaultConfig(),
MetricsHandler: metrics.NoopMetricsHandler,
BaseLogger: s.logger,
SpecProcessor: s.specProcessor,
})
}

Expand Down Expand Up @@ -313,9 +313,6 @@ func (s *invokerProcessBufferTaskSuite) runProcessBufferTestCase(c *processBuffe
// Set LastProcessedTime to current time to ensure time checks pass
invoker.LastProcessedTime = timestamppb.New(s.timeSource.Now())

// Clear old tasks and run the process buffer task
s.addedTasks = make([]tasks.Task, 0)

err = s.executor.Execute(ctx, invoker, chasm.TaskAttributes{}, &schedulerpb.InvokerProcessBufferTask{})
s.NoError(err)
_, err = s.node.CloseTransaction()
Expand Down
7 changes: 7 additions & 0 deletions chasm/lib/scheduler/invoker_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type (
Config *Config
MetricsHandler metrics.Handler
BaseLogger log.Logger
SpecProcessor SpecProcessor

HistoryClient resource.HistoryClient

Expand Down Expand Up @@ -176,6 +177,12 @@ func (e *InvokerExecuteTaskExecutor) Execute(
i.recordExecuteResult(ctx, &result)
s.recordActionResult(&schedulerActionResult{starts: startResults})

// Update visibility, since RecentActions may have been updated.
err = s.UpdateVisibility(ctx, e.SpecProcessor, nil)
if err != nil {
return struct{}{}, err
}

return struct{}{}, nil
},
nil,
Expand Down
182 changes: 181 additions & 1 deletion chasm/lib/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package scheduler

import (
"bytes"
"fmt"
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
schedulepb "go.temporal.io/api/schedule/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/worker/scheduler"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -29,6 +35,8 @@ type Scheduler struct {
Invoker chasm.Field[*Invoker]
Backfillers chasm.Map[string, *Backfiller] // Backfill ID => *Backfiller

Visibility chasm.Field[*chasm.Visibility]

// Locally-cached state, invalidated whenever cacheConflictToken != ConflictToken.
cacheConflictToken int64
compiledSpec *scheduler.CompiledSpec // compiledSpec is only ever replaced whole, not mutated.
Expand All @@ -37,6 +45,12 @@ type Scheduler struct {
const (
// How many recent actions to keep on the Info.RecentActions list.
recentActionCount = 10

// Item limit per spec field on the ScheduleInfo memo.
listInfoSpecFieldLimit = 10

// Field in which the schedule's memo is stored.
visibilityMemoFieldInfo = "ScheduleInfo"
)

// NewScheduler returns an initialized CHASM scheduler root component.
Expand Down Expand Up @@ -66,10 +80,14 @@ func NewScheduler(
}

invoker := NewInvoker(ctx, sched)
generator := NewGenerator(ctx, sched, invoker)
sched.Invoker = chasm.NewComponentField(ctx, invoker)

generator := NewGenerator(ctx, sched, invoker)
sched.Generator = chasm.NewComponentField(ctx, generator)

visibility := chasm.NewVisibility(ctx)
sched.Visibility = chasm.NewComponentField(ctx, visibility)

return sched
}

Expand Down Expand Up @@ -290,3 +308,165 @@ func (s *Scheduler) recordActionResult(result *schedulerActionResult) {
}
}
}

// UpdateVisibility updates the schedule's visibility record, including memo
// and search attributes. customSearchAttributes is optional, and custom search
// attributes will be left as-is with it unset.
//
// See mergeCustomSearchAttributes for how custom search attributes are merged.
func (s *Scheduler) UpdateVisibility(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this approach, it assumes that the component author will remember to update visibility anytime that an attribute changes. This should be the framework's responsibility and should be done automatically when a transaction is closed.

ctx chasm.MutableContext,
specProcessor SpecProcessor,
customSearchAttributes *commonpb.SearchAttributes,
) error {
needsTask := false // Set to true if we need to write anything to Visibility.
visibility, err := s.Visibility.Get(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR but are we ever expecting an error getting a component to be an error that the application needs to handle? What reasons would be for this failure? Can we just make the method panic instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we support partial read in the future, there could be some transient error returned here. For now, it will only be serialization errors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a transient, unexpected error, it should be okay to panic IMHO. That would be better than complicating the user interface. There's never really a way for a developer to react to this situation.

if err != nil {
return err
}

// Update the schedule's search attributes. This includes both Temporal-managed
// fields (paused state), as well as upserts for customer-specified fields.
upsertAttrs := make(map[string]any)
currentAttrs, err := visibility.GetSearchAttributes(ctx)
if err != nil {
return err
}

// Only attempt to merge additional search attributes if any are given, otherwise
// we'll unset existing attributes.
if customSearchAttributes != nil &&
len(customSearchAttributes.GetIndexedFields()) > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is enough because proto getters work with nils.

Suggested change
if customSearchAttributes != nil &&
len(customSearchAttributes.GetIndexedFields()) > 0 {
if len(customSearchAttributes.GetIndexedFields()) > 0 {

mergeCustomSearchAttributes(
currentAttrs,
customSearchAttributes.GetIndexedFields(),
upsertAttrs,
)
}

// Update Paused status.
var currentPaused bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly why I didn't want components to use imperative logic to update their search attributes. This logic should be part of the framework.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can afford it timeline-wise I'd much prefer if we coded up a better abstraction in the framework.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I agree. Also acknowledging that imperative logic means component author needs to remember update those attributes.

We discussed about the field tag approach which addresses this concern and also the SA registration problem but decided to do that as a follow up to unblock the Scheduler migration and deliver value sooner. It's more of a ROI & priority discussion for the entire OSS team.

currentPausedPayload, ok := currentAttrs[searchattribute.TemporalSchedulePaused]
if ok {
err = payload.Decode(currentPausedPayload, &currentPaused)
if err != nil {
return err
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use the generic chasm.GetSearchAttribute() method here as well and you don't need to do the decoding.

if !ok || currentPaused != s.Schedule.State.Paused {
upsertAttrs[searchattribute.TemporalSchedulePaused] = s.Schedule.State.Paused
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like a "refresh visibility" approach. Do we have a like PauseSchedule method where we can update the field?


if len(upsertAttrs) > 0 {
err = visibility.UpsertSearchAttributes(ctx, upsertAttrs)
if err != nil {
return err
}
needsTask = true
}

newInfo, err := s.GetListInfo(ctx, specProcessor)
if err != nil {
return err
}
newInfoPayload, err := newInfo.Marshal()
if err != nil {
return err
}
currentMemo, err := visibility.GetMemo(ctx)
if err != nil {
return err
}
currentInfoPayload := currentMemo[visibilityMemoFieldInfo]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use the generic chasm.GetMemo() function which returns a strongly typed value instead of Payload


// Update visibility if the memo is out-of-date or absent.
if currentInfoPayload == nil ||
!bytes.Equal(currentInfoPayload.Data, newInfoPayload) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proto marshal is not deterministic by default, this comparison will give you false negatives. You will want to unmarshal and use proto.Equal instead.

Another reason why component authors shouldn't implement all of this logic.

newMemo := map[string]any{visibilityMemoFieldInfo: newInfoPayload}
err = visibility.UpsertMemo(ctx, newMemo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpsertMemo will perform the encoding to Payload type, should just use newInfo here.

if err != nil {
return err
}
needsTask = true
}

if needsTask {
visibility.GenerateTask(ctx)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary, Visibility component automatically generates a task when it's updated.


return nil
}

// GetListInfo returns the ScheduleListInfo, used as the visibility memo, and to
// answer List queries.
func (s *Scheduler) GetListInfo(
ctx chasm.Context,
specProcessor SpecProcessor,
) (*schedulepb.ScheduleListInfo, error) {
spec := common.CloneProto(s.Schedule.Spec)

// Clear fields that are too large/not useful for the list view.
spec.TimezoneData = nil

// Limit the number of specs and exclusions stored on the memo.
spec.ExcludeStructuredCalendar = util.SliceHead(spec.ExcludeStructuredCalendar, listInfoSpecFieldLimit)
spec.Interval = util.SliceHead(spec.Interval, listInfoSpecFieldLimit)
spec.StructuredCalendar = util.SliceHead(spec.StructuredCalendar, listInfoSpecFieldLimit)

futureActionTimes, err := s.getFutureActionTimes(ctx, specProcessor)
if err != nil {
return nil, err
}

return &schedulepb.ScheduleListInfo{
Spec: spec,
WorkflowType: s.Schedule.Action.GetStartWorkflow().GetWorkflowType(),
Notes: s.Schedule.State.Notes,
Paused: s.Schedule.State.Paused,
RecentActions: util.SliceTail(s.Info.RecentActions, recentActionCount),
FutureActionTimes: futureActionTimes,
}, nil
}

// getFutureActionTimes returns up to min(`recentActionCount`, `RemainingActions`)
// future action times. Future action times that precede the schedule's UpdateTime
// are not included.
func (s *Scheduler) getFutureActionTimes(
ctx chasm.Context,
specProcessor SpecProcessor,
) ([]*timestamppb.Timestamp, error) {
generator, err := s.Generator.Get(ctx)
if err != nil {
return nil, err
}

nextTime := func(t time.Time) (time.Time, error) {
res, err := specProcessor.GetNextTime(s, t)
return res.Next, err
}

count := recentActionCount
if s.Schedule.State.LimitedActions {
count = min(int(s.Schedule.State.RemainingActions), recentActionCount)
}
out := make([]*timestamppb.Timestamp, 0, count)
t := timestamp.TimeValue(generator.LastProcessedTime)
for len(out) < count {
t, err = nextTime(t)
if err != nil {
return nil, err
}
if t.IsZero() {
break
}

if s.Info.UpdateTime.AsTime().After(t) {
// Skip action times whose nominal times are prior to the schedule's update time.
continue
}

out = append(out, timestamppb.New(t))
}

return out, nil
}
Loading
Loading