diff --git a/chasm/lib/scheduler/backfiller_tasks.go b/chasm/lib/scheduler/backfiller_tasks.go index 9102f7d7bc7..6d0d483a51f 100644 --- a/chasm/lib/scheduler/backfiller_tasks.go +++ b/chasm/lib/scheduler/backfiller_tasks.go @@ -1,10 +1,8 @@ package scheduler import ( - "fmt" "time" - "go.temporal.io/api/serviceerror" schedulespb "go.temporal.io/server/api/schedule/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" @@ -61,17 +59,15 @@ func (b *BackfillerTaskExecutor) Execute( scheduler, err := backfiller.Scheduler.Get(ctx) if err != nil { - return fmt.Errorf("%w: %w", - serviceerror.NewInternal("scheduler tree missing node"), - err) + b.baseLogger.Error("scheduler tree missing node", tag.Error(err)) + return ErrUnprocessable } logger := newTaggedLogger(b.baseLogger, scheduler) invoker, err := scheduler.Invoker.Get(ctx) if err != nil { - return fmt.Errorf("%w: %w", - serviceerror.NewInternal("scheduler tree missing node"), - err) + logger.Error("scheduler tree missing node", tag.Error(err)) + return ErrUnprocessable } // If the buffer is already full, don't move the watermark at all, just back off @@ -95,7 +91,8 @@ func (b *BackfillerTaskExecutor) Execute( case RequestTypeTrigger: result, err = b.processTrigger(ctx, scheduler, backfiller) default: - return serviceerror.NewInternalf("unknown backfill type: %v", backfiller.RequestType()) + logger.Error("unknown backfill type", tag.NewAnyTag("type", backfiller.RequestType())) + return ErrUnprocessable } if err != nil { logger.Error("failed to process backfill", tag.Error(err)) diff --git a/chasm/lib/scheduler/gen/schedulerpb/v1/message.pb.go b/chasm/lib/scheduler/gen/schedulerpb/v1/message.pb.go index 71dd558da40..56dd95bf169 100644 --- a/chasm/lib/scheduler/gen/schedulerpb/v1/message.pb.go +++ b/chasm/lib/scheduler/gen/schedulerpb/v1/message.pb.go @@ -131,6 +131,8 @@ type GeneratorState struct { state protoimpl.MessageState `protogen:"open.v1"` // High water mark. LastProcessedTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=last_processed_time,json=lastProcessedTime,proto3" json:"last_processed_time,omitempty"` + // A list of upcoming times an action will be triggered. + FutureActionTimes []*timestamppb.Timestamp `protobuf:"bytes,4,rep,name=future_action_times,json=futureActionTimes,proto3" json:"future_action_times,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -172,6 +174,13 @@ func (x *GeneratorState) GetLastProcessedTime() *timestamppb.Timestamp { return nil } +func (x *GeneratorState) GetFutureActionTimes() []*timestamppb.Timestamp { + if x != nil { + return x.FutureActionTimes + } + return nil +} + // CHASM scheduler's Invoker internal state. type InvokerState struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -472,9 +481,10 @@ const file_temporal_server_chasm_lib_scheduler_proto_v1_message_proto_rawDesc = "\vschedule_id\x18\a \x01(\tR\n" + "scheduleId\x12%\n" + "\x0econflict_token\x18\b \x01(\x03R\rconflictToken\x12\x16\n" + - "\x06closed\x18\t \x01(\bR\x06closed\"\\\n" + + "\x06closed\x18\t \x01(\bR\x06closed\"\xa8\x01\n" + "\x0eGeneratorState\x12J\n" + - "\x13last_processed_time\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\x11lastProcessedTime\"\xc1\x04\n" + + "\x13last_processed_time\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\x11lastProcessedTime\x12J\n" + + "\x13future_action_times\x18\x04 \x03(\v2\x1a.google.protobuf.TimestampR\x11futureActionTimes\"\xc1\x04\n" + "\fInvokerState\x12W\n" + "\x0fbuffered_starts\x18\x02 \x03(\v2..temporal.server.api.schedule.v1.BufferedStartR\x0ebufferedStarts\x12T\n" + "\x10cancel_workflows\x18\x03 \x03(\v2).temporal.api.common.v1.WorkflowExecutionR\x0fcancelWorkflows\x12Z\n" + @@ -531,21 +541,22 @@ var file_temporal_server_chasm_lib_scheduler_proto_v1_message_proto_depIdxs = [] 6, // 0: temporal.server.chasm.lib.scheduler.proto.v1.SchedulerState.schedule:type_name -> temporal.api.schedule.v1.Schedule 7, // 1: temporal.server.chasm.lib.scheduler.proto.v1.SchedulerState.info:type_name -> temporal.api.schedule.v1.ScheduleInfo 8, // 2: temporal.server.chasm.lib.scheduler.proto.v1.GeneratorState.last_processed_time:type_name -> google.protobuf.Timestamp - 9, // 3: temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.buffered_starts:type_name -> temporal.server.api.schedule.v1.BufferedStart - 10, // 4: temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.cancel_workflows:type_name -> temporal.api.common.v1.WorkflowExecution - 10, // 5: temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.terminate_workflows:type_name -> temporal.api.common.v1.WorkflowExecution - 8, // 6: temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.last_processed_time:type_name -> google.protobuf.Timestamp - 5, // 7: temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.request_id_to_workflow_id:type_name -> temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.RequestIdToWorkflowIdEntry - 11, // 8: temporal.server.chasm.lib.scheduler.proto.v1.BackfillerState.backfill_request:type_name -> temporal.api.schedule.v1.BackfillRequest - 12, // 9: temporal.server.chasm.lib.scheduler.proto.v1.BackfillerState.trigger_request:type_name -> temporal.api.schedule.v1.TriggerImmediatelyRequest - 8, // 10: temporal.server.chasm.lib.scheduler.proto.v1.BackfillerState.last_processed_time:type_name -> google.protobuf.Timestamp - 13, // 11: temporal.server.chasm.lib.scheduler.proto.v1.LastCompletionResult.success:type_name -> temporal.api.common.v1.Payload - 14, // 12: temporal.server.chasm.lib.scheduler.proto.v1.LastCompletionResult.failure:type_name -> temporal.api.failure.v1.Failure - 13, // [13:13] is the sub-list for method output_type - 13, // [13:13] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 8, // 3: temporal.server.chasm.lib.scheduler.proto.v1.GeneratorState.future_action_times:type_name -> google.protobuf.Timestamp + 9, // 4: temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.buffered_starts:type_name -> temporal.server.api.schedule.v1.BufferedStart + 10, // 5: temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.cancel_workflows:type_name -> temporal.api.common.v1.WorkflowExecution + 10, // 6: temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.terminate_workflows:type_name -> temporal.api.common.v1.WorkflowExecution + 8, // 7: temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.last_processed_time:type_name -> google.protobuf.Timestamp + 5, // 8: temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.request_id_to_workflow_id:type_name -> temporal.server.chasm.lib.scheduler.proto.v1.InvokerState.RequestIdToWorkflowIdEntry + 11, // 9: temporal.server.chasm.lib.scheduler.proto.v1.BackfillerState.backfill_request:type_name -> temporal.api.schedule.v1.BackfillRequest + 12, // 10: temporal.server.chasm.lib.scheduler.proto.v1.BackfillerState.trigger_request:type_name -> temporal.api.schedule.v1.TriggerImmediatelyRequest + 8, // 11: temporal.server.chasm.lib.scheduler.proto.v1.BackfillerState.last_processed_time:type_name -> google.protobuf.Timestamp + 13, // 12: temporal.server.chasm.lib.scheduler.proto.v1.LastCompletionResult.success:type_name -> temporal.api.common.v1.Payload + 14, // 13: temporal.server.chasm.lib.scheduler.proto.v1.LastCompletionResult.failure:type_name -> temporal.api.failure.v1.Failure + 14, // [14:14] is the sub-list for method output_type + 14, // [14:14] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_temporal_server_chasm_lib_scheduler_proto_v1_message_proto_init() } diff --git a/chasm/lib/scheduler/generator_tasks.go b/chasm/lib/scheduler/generator_tasks.go index 4fbda5fbd10..dc9d2a38958 100644 --- a/chasm/lib/scheduler/generator_tasks.go +++ b/chasm/lib/scheduler/generator_tasks.go @@ -1,14 +1,14 @@ package scheduler import ( - "fmt" + "time" - "go.temporal.io/api/serviceerror" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/primitives/timestamp" "go.uber.org/fx" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" @@ -49,17 +49,15 @@ func (g *GeneratorTaskExecutor) Execute( ) error { scheduler, err := generator.Scheduler.Get(ctx) if err != nil { - return fmt.Errorf("%w: %w", - serviceerror.NewInternal("scheduler tree missing node"), - err) + g.baseLogger.Error("scheduler tree missing node", tag.Error(err)) + return ErrUnprocessable } logger := newTaggedLogger(g.baseLogger, scheduler) invoker, err := scheduler.Invoker.Get(ctx) if err != nil { - return fmt.Errorf("%w: %w", - serviceerror.NewInternal("scheduler tree missing node"), - err) + logger.Error("scheduler tree missing node", tag.Error(err)) + return ErrUnprocessable } // If we have no last processed time, this is a new schedule. @@ -92,10 +90,8 @@ func (g *GeneratorTaskExecutor) Execute( ) if err != nil { // An error here should be impossible, send to the DLQ. - logger.Error("error processing time range", tag.Error(err)) - return fmt.Errorf("%w: %w", - serviceerror.NewInternalf("failed to process a time range"), - err) + logger.Error("failed to process a time range", tag.Error(err)) + return ErrUnprocessable } // Enqueue newly-generated buffered starts. @@ -103,8 +99,12 @@ func (g *GeneratorTaskExecutor) Execute( invoker.EnqueueBufferedStarts(ctx, result.BufferedStarts) } - // Write the new high water mark. + // Write the new high water mark and future action times. generator.LastProcessedTime = timestamppb.New(result.LastActionTime) + if err := g.updateFutureActionTimes(ctx, generator, scheduler); err != nil { + logger.Error("failed to update future action times", tag.Error(err)) + return err + } // Check if the schedule has gone idle. idleTimeTotal := g.config.Tweakables(scheduler.Namespace).IdleTime @@ -142,6 +142,47 @@ func (g *GeneratorTaskExecutor) logSchedule(logger log.Logger, msg string, sched tag.NewStringerTag("policies", jsonStringer{scheduler.Schedule.Policies})) } +func (g *GeneratorTaskExecutor) updateFutureActionTimes( + ctx chasm.MutableContext, + generator *Generator, + scheduler *Scheduler, +) error { + nextTime := func(t time.Time) (time.Time, error) { + res, err := g.SpecProcessor.GetNextTime(scheduler, t) + return res.Next, err + } + + // Make sure we don't emit more future times than are remaining. + count := recentActionCount + if scheduler.Schedule.State.LimitedActions { + count = min(int(scheduler.Schedule.State.RemainingActions), recentActionCount) + } + + futureTimes := make([]*timestamppb.Timestamp, 0, count) + t := timestamp.TimeValue(generator.LastProcessedTime) + var err error + for len(futureTimes) < count { + t, err = nextTime(t) + if err != nil { + return err + } + if t.IsZero() { + break + } + + if scheduler.Info.UpdateTime.AsTime().After(t) { + // Skip action times that occur before the schedule's update time. + continue + } + + futureTimes = append(futureTimes, timestamppb.New(t)) + } + + generator.FutureActionTimes = futureTimes + + return nil +} + func (g *GeneratorTaskExecutor) Validate( ctx chasm.Context, generator *Generator, diff --git a/chasm/lib/scheduler/generator_tasks_test.go b/chasm/lib/scheduler/generator_tasks_test.go index a03ba6dc5d0..f1e58e34182 100644 --- a/chasm/lib/scheduler/generator_tasks_test.go +++ b/chasm/lib/scheduler/generator_tasks_test.go @@ -8,7 +8,6 @@ import ( "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/scheduler" "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" - "go.temporal.io/server/common" "go.temporal.io/server/common/metrics" "go.temporal.io/server/service/history/tasks" "go.uber.org/mock/gomock" @@ -47,7 +46,7 @@ func (s *generatorTasksSuite) TestExecute_ProcessTimeRangeFails() { generator, err := sched.Generator.Get(ctx) s.NoError(err) err = s.executor.Execute(ctx, generator, chasm.TaskAttributes{}, &schedulerpb.GeneratorTask{}) - s.True(common.IsInternalError(err)) + s.ErrorIs(err, scheduler.ErrUnprocessable) } func (s *generatorTasksSuite) TestExecuteBufferTask_Basic() { @@ -89,3 +88,56 @@ func (s *generatorTasksSuite) TestExecuteBufferTask_Basic() { s.NoError(err) s.True(s.hasTask(&tasks.ChasmTaskPure{}, chasm.TaskScheduledTimeImmediate)) } + +func (s *generatorTasksSuite) TestUpdateFutureActionTimes_UnlimitedActions() { + ctx := s.newMutableContext() + sched := s.scheduler + generator, err := sched.Generator.Get(ctx) + s.NoError(err) + + s.executor.SpecProcessor = newTestSpecProcessor(s.controller) + + err = s.executor.Execute(ctx, generator, chasm.TaskAttributes{}, &schedulerpb.GeneratorTask{}) + s.NoError(err) + + s.NotEmpty(generator.FutureActionTimes) + s.Require().Len(generator.FutureActionTimes, 10) +} + +func (s *generatorTasksSuite) TestUpdateFutureActionTimes_LimitedActions() { + ctx := s.newMutableContext() + sched := s.scheduler + generator, err := sched.Generator.Get(ctx) + s.NoError(err) + + sched.Schedule.State.LimitedActions = true + sched.Schedule.State.RemainingActions = 2 + s.executor.SpecProcessor = newTestSpecProcessor(s.controller) + + err = s.executor.Execute(ctx, generator, chasm.TaskAttributes{}, &schedulerpb.GeneratorTask{}) + s.NoError(err) + + s.Len(generator.FutureActionTimes, 2) +} + +func (s *generatorTasksSuite) TestUpdateFutureActionTimes_SkipsBeforeUpdateTime() { + ctx := s.newMutableContext() + sched := s.scheduler + generator, err := sched.Generator.Get(ctx) + s.NoError(err) + + s.executor.SpecProcessor = newTestSpecProcessor(s.controller) + + // UpdateTime acts as a floor - action times at or before it are skipped. + baseTime := ctx.Now(generator).UTC() + updateTime := baseTime.Add(defaultInterval / 2) + sched.Info.UpdateTime = timestamppb.New(updateTime) + + err = s.executor.Execute(ctx, generator, chasm.TaskAttributes{}, &schedulerpb.GeneratorTask{}) + s.NoError(err) + + s.Require().NotEmpty(generator.FutureActionTimes) + for _, futureTime := range generator.FutureActionTimes { + s.True(futureTime.AsTime().After(updateTime)) + } +} diff --git a/chasm/lib/scheduler/helper_test.go b/chasm/lib/scheduler/helper_test.go index 04a5dec0e00..ece2fc3c036 100644 --- a/chasm/lib/scheduler/helper_test.go +++ b/chasm/lib/scheduler/helper_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + commonpb "go.temporal.io/api/common/v1" schedulepb "go.temporal.io/api/schedule/v1" workflowpb "go.temporal.io/api/workflow/v1" persistencespb "go.temporal.io/server/api/persistence/v1" @@ -41,7 +42,8 @@ func defaultSchedule() *schedulepb.Schedule { Action: &schedulepb.ScheduleAction{ Action: &schedulepb.ScheduleAction_StartWorkflow{ StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{ - WorkflowId: "scheduled-wf", + WorkflowId: "scheduled-wf", + WorkflowType: &commonpb.WorkflowType{Name: "scheduled-wf-type"}, }, }, }, @@ -76,7 +78,11 @@ func setupSchedulerForTest(t *testing.T) (*scheduler.Scheduler, chasm.MutableCon nodePathEncoder := chasm.DefaultPathEncoder registry := chasm.NewRegistry(logger) - err := registry.Register(&scheduler.Library{}) + err := registry.Register(&chasm.CoreLibrary{}) + if err != nil { + t.Fatalf("failed to register core library: %v", err) + } + err = registry.Register(&scheduler.Library{}) if err != nil { t.Fatalf("failed to register scheduler library: %v", err) } diff --git a/chasm/lib/scheduler/invoker_execute_task_test.go b/chasm/lib/scheduler/invoker_execute_task_test.go index 34013cde1f9..ac19f12f35f 100644 --- a/chasm/lib/scheduler/invoker_execute_task_test.go +++ b/chasm/lib/scheduler/invoker_execute_task_test.go @@ -42,6 +42,7 @@ func (s *invokerExecuteTaskSuite) SetupTest() { Config: defaultConfig(), MetricsHandler: metrics.NoopMetricsHandler, BaseLogger: s.logger, + SpecProcessor: s.specProcessor, HistoryClient: s.mockHistoryClient, FrontendClient: s.mockFrontendClient, }) diff --git a/chasm/lib/scheduler/invoker_process_buffer_task_test.go b/chasm/lib/scheduler/invoker_process_buffer_task_test.go index 7c715f5262d..6e550029e49 100644 --- a/chasm/lib/scheduler/invoker_process_buffer_task_test.go +++ b/chasm/lib/scheduler/invoker_process_buffer_task_test.go @@ -31,6 +31,7 @@ func (s *invokerProcessBufferTaskSuite) SetupTest() { Config: defaultConfig(), MetricsHandler: metrics.NoopMetricsHandler, BaseLogger: s.logger, + SpecProcessor: s.specProcessor, }) } diff --git a/chasm/lib/scheduler/invoker_tasks.go b/chasm/lib/scheduler/invoker_tasks.go index 1861737eb64..288db683c4c 100644 --- a/chasm/lib/scheduler/invoker_tasks.go +++ b/chasm/lib/scheduler/invoker_tasks.go @@ -20,7 +20,9 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/payload" "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/util" legacyscheduler "go.temporal.io/server/service/worker/scheduler" "go.uber.org/fx" @@ -34,6 +36,7 @@ type ( Config *Config MetricsHandler metrics.Handler BaseLogger log.Logger + SpecProcessor SpecProcessor HistoryClient resource.HistoryClient @@ -524,6 +527,26 @@ func (e *InvokerProcessBufferTaskExecutor) startWorkflowDeadline( return start.ActualTime.AsTime().Add(timeout) } +// startWorkflowSearchAttributes returns the search attributes to be applied to +// workflows kicked off. Includes custom search attributes and Temporal-managed. +func startWorkflowSearchAttributes( + scheduler *Scheduler, + nominal time.Time, +) *commonpb.SearchAttributes { + attributes := scheduler.Schedule.GetAction().GetStartWorkflow().GetSearchAttributes() + + fields := util.CloneMapNonNil(attributes.GetIndexedFields()) + if p, err := payload.Encode(nominal); err == nil { + fields[searchattribute.TemporalScheduledStartTime] = p + } + if p, err := payload.Encode(scheduler.ScheduleId); err == nil { + fields[searchattribute.TemporalScheduledById] = p + } + return &commonpb.SearchAttributes{ + IndexedFields: fields, + } +} + func (e *InvokerExecuteTaskExecutor) startWorkflow( ctx context.Context, scheduler *Scheduler, @@ -553,7 +576,6 @@ func (e *InvokerExecuteTaskExecutor) startWorkflow( reusePolicy = enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE } - // TODO - set search attributes request := &workflowservice.StartWorkflowExecutionRequest{ CompletionCallbacks: []*commonpb.Callback{callback}, Header: requestSpec.Header, @@ -563,7 +585,7 @@ func (e *InvokerExecuteTaskExecutor) startWorkflow( Namespace: scheduler.Namespace, RequestId: start.RequestId, RetryPolicy: requestSpec.RetryPolicy, - SearchAttributes: nil, + SearchAttributes: startWorkflowSearchAttributes(scheduler, start.NominalTime.AsTime()), TaskQueue: requestSpec.TaskQueue, UserMetadata: requestSpec.UserMetadata, WorkflowExecutionTimeout: requestSpec.WorkflowExecutionTimeout, diff --git a/chasm/lib/scheduler/proto/v1/message.proto b/chasm/lib/scheduler/proto/v1/message.proto index 23a794d9a7a..ecc7195b58a 100644 --- a/chasm/lib/scheduler/proto/v1/message.proto +++ b/chasm/lib/scheduler/proto/v1/message.proto @@ -35,6 +35,9 @@ message SchedulerState { message GeneratorState { // High water mark. google.protobuf.Timestamp last_processed_time = 3; + + // A list of upcoming times an action will be triggered. + repeated google.protobuf.Timestamp future_action_times = 4; } // CHASM scheduler's Invoker internal state. diff --git a/chasm/lib/scheduler/scheduler.go b/chasm/lib/scheduler/scheduler.go index d5034fc4321..4fe0720012b 100644 --- a/chasm/lib/scheduler/scheduler.go +++ b/chasm/lib/scheduler/scheduler.go @@ -3,6 +3,7 @@ package scheduler import ( "bytes" "encoding/binary" + "errors" "fmt" "slices" "strings" @@ -12,7 +13,6 @@ import ( enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" schedulepb "go.temporal.io/api/schedule/v1" - "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/chasm" @@ -43,6 +43,8 @@ type Scheduler struct { Invoker chasm.Field[*Invoker] Backfillers chasm.Map[string, *Backfiller] // Backfill ID => *Backfiller + Visibility chasm.Field[*chasm.Visibility] + // Locally-cached state, invalidated whenever cacheConflictToken != ConflictToken. cacheConflictToken int64 compiledSpec *scheduler.CompiledSpec // compiledSpec is only ever replaced whole, not mutated. @@ -51,9 +53,18 @@ type Scheduler struct { const ( // How many recent actions to keep on the Info.RecentActions list. recentActionCount = 10 + + // Item limit per spec field on the ScheduleInfo memo. + listInfoSpecFieldLimit = 10 + + // Field in which the schedule's memo is stored. + visibilityMemoFieldInfo = "ScheduleInfo" ) -var ErrConflictTokenMismatch = serviceerror.NewFailedPrecondition("mismatched conflict token") +var ( + ErrConflictTokenMismatch = errors.New("mismatched conflict token") + ErrUnprocessable = errors.New("unprocessable schedule") +) // NewScheduler returns an initialized CHASM scheduler root component. func NewScheduler( @@ -83,12 +94,15 @@ func NewScheduler( sched.Schedule.State = &schedulepb.ScheduleState{} invoker := NewInvoker(ctx, sched) - generator := NewGenerator(ctx, sched, invoker) sched.Invoker = chasm.NewComponentField(ctx, invoker) + + generator := NewGenerator(ctx, sched, invoker) sched.Generator = chasm.NewComponentField(ctx, generator) // Create backfillers to fulfill initialPatch. sched.handlePatch(ctx, patch) + visibility := chasm.NewVisibility(ctx) + sched.Visibility = chasm.NewComponentField(ctx, visibility) return sched } @@ -111,7 +125,6 @@ func CreateScheduler( ctx chasm.MutableContext, req *schedulerpb.CreateScheduleRequest, ) (*Scheduler, *schedulerpb.CreateScheduleResponse, error) { - // TODO: namespace name should be resolved from namespace_id via namespace registry sched := NewScheduler( ctx, req.FrontendRequest.Namespace, @@ -121,7 +134,19 @@ func CreateScheduler( req.FrontendRequest.InitialPatch, ) - // TODO - use visibility component to update SAs + // Update visibility with custom attributes. + visibility, err := sched.Visibility.Get(ctx) + if err != nil { + return nil, nil, ErrUnprocessable + } + err = visibility.SetSearchAttributes(ctx, req.FrontendRequest.GetSearchAttributes().GetIndexedFields()) + if err != nil { + return nil, nil, ErrUnprocessable + } + err = visibility.SetMemo(ctx, req.FrontendRequest.GetMemo().GetFields()) + if err != nil { + return nil, nil, ErrUnprocessable + } return sched, &schedulerpb.CreateScheduleResponse{ FrontendResponse: &workflowservice.CreateScheduleResponse{ @@ -553,9 +578,20 @@ func (s *Scheduler) Update( return nil, ErrConflictTokenMismatch } - s.Schedule = common.CloneProto(req.FrontendRequest.Schedule) - // TODO - use visibility component to update SAs + // Update custom search attributes. + // + // TODO - we could also easily support allowing the customer to update their + // memo here. + visibility, err := s.Visibility.Get(ctx) + if err != nil { + return nil, ErrUnprocessable + } + err = visibility.SetSearchAttributes(ctx, req.FrontendRequest.GetSearchAttributes().GetIndexedFields()) + if err != nil { + return nil, ErrUnprocessable + } + s.Schedule = common.CloneProto(req.FrontendRequest.Schedule) s.Info.UpdateTime = timestamppb.New(ctx.Now(s)) s.updateConflictToken() @@ -599,3 +635,64 @@ func (s *Scheduler) validateConflictToken(token []byte) bool { current := s.generateConflictToken() return bytes.Equal(current, token) } + +// SearchAttributes returns the Temporal-managed key values for visibility. +func (s *Scheduler) SearchAttributes(chasm.Context) []chasm.SearchAttributeKeyValue { + return []chasm.SearchAttributeKeyValue{ + chasm.SearchAttributeTemporalSchedulePaused.Value(s.Schedule.State.Paused), + chasm.SearchAttributeTemporalNamespaceDivision.Value(scheduler.NamespaceDivision), + } +} + +// Memo returns the scheduler's info block for visibility. +func (s *Scheduler) Memo( + ctx chasm.Context, +) map[string]chasm.VisibilityValue { + newInfo, err := s.GetListInfo(ctx) + if err != nil { + // Unable to retrieve list info. Return nil to skip memo update. + // Error will be logged once loggers are available in CHASM. + return nil + } + + newInfoPayload, err := newInfo.Marshal() + if err != nil { + // Unable to marshal list info. Return nil to skip memo update. + // Error will be logged once loggers are available in CHASM. + return nil + } + + return map[string]chasm.VisibilityValue{ + visibilityMemoFieldInfo: chasm.VisibilityValueByteSlice(newInfoPayload), + } +} + +// GetListInfo returns the ScheduleListInfo, used as the visibility memo, and to +// answer List queries. +func (s *Scheduler) GetListInfo( + ctx chasm.Context, +) (*schedulepb.ScheduleListInfo, error) { + spec := common.CloneProto(s.Schedule.Spec) + + // Clear fields that are too large/not useful for the list view. + spec.TimezoneData = nil + + // Limit the number of specs and exclusions stored on the memo. + spec.ExcludeStructuredCalendar = util.SliceHead(spec.ExcludeStructuredCalendar, listInfoSpecFieldLimit) + spec.Interval = util.SliceHead(spec.Interval, listInfoSpecFieldLimit) + spec.StructuredCalendar = util.SliceHead(spec.StructuredCalendar, listInfoSpecFieldLimit) + + generator, err := s.Generator.Get(ctx) + if err != nil { + return nil, ErrUnprocessable + } + + return &schedulepb.ScheduleListInfo{ + Spec: spec, + WorkflowType: s.Schedule.Action.GetStartWorkflow().GetWorkflowType(), + Notes: s.Schedule.State.Notes, + Paused: s.Schedule.State.Paused, + RecentActions: util.SliceTail(s.Info.RecentActions, recentActionCount), + FutureActionTimes: generator.FutureActionTimes, + }, nil +} diff --git a/chasm/lib/scheduler/scheduler_suite_test.go b/chasm/lib/scheduler/scheduler_suite_test.go index 10e4d773819..81c9eb57270 100644 --- a/chasm/lib/scheduler/scheduler_suite_test.go +++ b/chasm/lib/scheduler/scheduler_suite_test.go @@ -15,7 +15,9 @@ import ( "go.temporal.io/server/common/testing/protorequire" "go.temporal.io/server/common/testing/testlogger" "go.temporal.io/server/common/testing/testvars" + legacyscheduler "go.temporal.io/server/service/worker/scheduler" "go.uber.org/mock/gomock" + "google.golang.org/protobuf/types/known/timestamppb" ) // schedulerSuite sets up a suite that has a basic CHASM tree ready @@ -54,9 +56,14 @@ func (s *schedulerSuite) SetupTest() { err := s.registry.Register(&scheduler.Library{}) s.NoError(err) + // Register the Core library as well, which we use for Visibility. + err = s.registry.Register(&chasm.CoreLibrary{}) + s.NoError(err) + // Advance here, because otherwise ctx.Now().IsZero() will be true. s.timeSource = clock.NewEventTimeSource() - s.timeSource.Update(time.Now()) + now := time.Now() + s.timeSource.Update(now) // Stub NodeBackend for NewEmptytree tv := testvars.New(s.T()) @@ -78,7 +85,19 @@ func (s *schedulerSuite) SetupTest() { s.scheduler = scheduler.NewScheduler(ctx, namespace, namespaceID, scheduleID, defaultSchedule(), nil) s.node.SetRootComponent(s.scheduler) _, err = s.node.CloseTransaction() + + // Advance Generator's high water mark to 'now'. + generator, err := s.scheduler.Generator.Get(ctx) s.NoError(err) + generator.LastProcessedTime = timestamppb.New(now) + + // Set up future action times. + futureTime := now.Add(time.Hour) + s.specProcessor.EXPECT().GetNextTime(s.scheduler, gomock.Any()).Return(legacyscheduler.GetNextTimeResult{ + Next: futureTime, + Nominal: futureTime, + }, nil).MaxTimes(1) + s.specProcessor.EXPECT().GetNextTime(s.scheduler, gomock.Any()).Return(legacyscheduler.GetNextTimeResult{}, nil).AnyTimes() } // hasTask returns true if the given task type was added at the end of the diff --git a/chasm/lib/scheduler/scheduler_test.go b/chasm/lib/scheduler/scheduler_test.go new file mode 100644 index 00000000000..98c84c7e5e3 --- /dev/null +++ b/chasm/lib/scheduler/scheduler_test.go @@ -0,0 +1,31 @@ +package scheduler_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.temporal.io/server/common/testing/protorequire" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestGetListInfo(t *testing.T) { + scheduler, ctx, _ := setupSchedulerForTest(t) + + // Generator maintains the FutureActionTimes list, set that up first. + generator, err := scheduler.Generator.Get(ctx) + require.NoError(t, err) + expectedFutureTimes := []*timestamppb.Timestamp{timestamppb.Now(), timestamppb.Now()} + generator.FutureActionTimes = expectedFutureTimes + + listInfo, err := scheduler.GetListInfo(ctx) + require.NoError(t, err) + + // Should return a populated info block. + require.NotNil(t, listInfo) + require.NotNil(t, listInfo.Spec) + require.NotEmpty(t, listInfo.Spec.Interval) + protorequire.ProtoEqual(t, listInfo.Spec.Interval[0], scheduler.Schedule.Spec.Interval[0]) + require.NotNil(t, listInfo.WorkflowType) + require.NotEmpty(t, listInfo.FutureActionTimes) + require.Equal(t, expectedFutureTimes, listInfo.FutureActionTimes) +} diff --git a/chasm/lib/scheduler/spec_processor.go b/chasm/lib/scheduler/spec_processor.go index 1daaed86e1c..dfa1cf31646 100644 --- a/chasm/lib/scheduler/spec_processor.go +++ b/chasm/lib/scheduler/spec_processor.go @@ -36,6 +36,9 @@ type ( manual bool, limit *int, ) (*ProcessedTimeRange, error) + + // GetNextTime provides a peek at the next time in the spec following 'after'. + GetNextTime(scheduler *Scheduler, after time.Time) (legacyscheduler.GetNextTimeResult, error) } SpecProcessorImpl struct { @@ -91,7 +94,7 @@ func (s *SpecProcessorImpl) ProcessTimeRange( // Manual (backfill/patch) runs are always buffered here. if !scheduler.useScheduledAction(false) && !manual { // Use end as last action time so that we don't reprocess time spent paused. - next, err := s.getNextTime(scheduler, end) + next, err := s.GetNextTime(scheduler, end) if err != nil { return nil, err } @@ -108,7 +111,7 @@ func (s *SpecProcessorImpl) ProcessTimeRange( var next legacyscheduler.GetNextTimeResult var err error var bufferedStarts []*schedulespb.BufferedStart - for next, err = s.getNextTime(scheduler, start); err == nil && (!next.Next.IsZero() && !next.Next.After(end)); next, err = s.getNextTime(scheduler, next.Next) { + for next, err = s.GetNextTime(scheduler, start); err == nil && (!next.Next.IsZero() && !next.Next.After(end)); next, err = s.GetNextTime(scheduler, next.Next) { lastAction = next.Next if scheduler.Info.UpdateTime.AsTime().After(next.Next) { @@ -160,8 +163,8 @@ func catchupWindow(s *Scheduler, tweakables Tweakables) time.Duration { return max(cw.AsDuration(), tweakables.MinCatchupWindow) } -// getNextTime returns the next time result, or an error if the schedule cannot be compiled. -func (s *SpecProcessorImpl) getNextTime(scheduler *Scheduler, after time.Time) (legacyscheduler.GetNextTimeResult, error) { +// GetNextTime returns the next time result, or an error if the schedule cannot be compiled. +func (s *SpecProcessorImpl) GetNextTime(scheduler *Scheduler, after time.Time) (legacyscheduler.GetNextTimeResult, error) { spec, err := scheduler.getCompiledSpec(s.specBuilder) if err != nil { s.logger.Error("Invalid schedule", tag.Error(err)) diff --git a/chasm/lib/scheduler/spec_processor_mock.go b/chasm/lib/scheduler/spec_processor_mock.go index c96be7fa87c..3a78eef6128 100644 --- a/chasm/lib/scheduler/spec_processor_mock.go +++ b/chasm/lib/scheduler/spec_processor_mock.go @@ -14,6 +14,7 @@ import ( time "time" enums "go.temporal.io/api/enums/v1" + scheduler "go.temporal.io/server/service/worker/scheduler" gomock "go.uber.org/mock/gomock" ) @@ -41,17 +42,32 @@ func (m *MockSpecProcessor) EXPECT() *MockSpecProcessorMockRecorder { return m.recorder } +// GetNextTime mocks base method. +func (m *MockSpecProcessor) GetNextTime(arg0 *Scheduler, after time.Time) (scheduler.GetNextTimeResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNextTime", arg0, after) + ret0, _ := ret[0].(scheduler.GetNextTimeResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetNextTime indicates an expected call of GetNextTime. +func (mr *MockSpecProcessorMockRecorder) GetNextTime(arg0, after any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNextTime", reflect.TypeOf((*MockSpecProcessor)(nil).GetNextTime), arg0, after) +} + // ProcessTimeRange mocks base method. -func (m *MockSpecProcessor) ProcessTimeRange(scheduler *Scheduler, start, end time.Time, overlapPolicy enums.ScheduleOverlapPolicy, workflowID, backfillID string, manual bool, limit *int) (*ProcessedTimeRange, error) { +func (m *MockSpecProcessor) ProcessTimeRange(arg0 *Scheduler, start, end time.Time, overlapPolicy enums.ScheduleOverlapPolicy, workflowID, backfillID string, manual bool, limit *int) (*ProcessedTimeRange, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessTimeRange", scheduler, start, end, overlapPolicy, workflowID, backfillID, manual, limit) + ret := m.ctrl.Call(m, "ProcessTimeRange", arg0, start, end, overlapPolicy, workflowID, backfillID, manual, limit) ret0, _ := ret[0].(*ProcessedTimeRange) ret1, _ := ret[1].(error) return ret0, ret1 } // ProcessTimeRange indicates an expected call of ProcessTimeRange. -func (mr *MockSpecProcessorMockRecorder) ProcessTimeRange(scheduler, start, end, overlapPolicy, workflowID, backfillID, manual, limit any) *gomock.Call { +func (mr *MockSpecProcessorMockRecorder) ProcessTimeRange(arg0, start, end, overlapPolicy, workflowID, backfillID, manual, limit any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessTimeRange", reflect.TypeOf((*MockSpecProcessor)(nil).ProcessTimeRange), scheduler, start, end, overlapPolicy, workflowID, backfillID, manual, limit) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessTimeRange", reflect.TypeOf((*MockSpecProcessor)(nil).ProcessTimeRange), arg0, start, end, overlapPolicy, workflowID, backfillID, manual, limit) } diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 832d8159c6c..382b490111c 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1251,10 +1251,6 @@ var ( "schedule_action_success", WithDescription("The number of schedule actions that were successfully taken by a schedule"), ) - ScheduleActionAttempt = NewCounterDef( - "schedule_action_attempt", - WithDescription("The number of schedule actions attempts"), - ) ScheduleActionErrors = NewCounterDef( "schedule_action_errors", WithDescription("The number of failed attempts from starting schedule actions"), @@ -1271,10 +1267,6 @@ var ( "schedule_action_delay", WithDescription("Delay between when scheduled actions should/actually happen"), ) - ScheduleActionDropped = NewCounterDef( - "schedule_action_dropped", - WithDescription("The number of schedule actions that failed to start"), - ) SchedulePayloadSize = NewCounterDef( "schedule_payload_size", WithDescription("The size in bytes of a customer payload (including action results and update signals)"),