-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[Scheduled Actions] CHASM scheduled actions visibility support #8292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
chasm/lib/scheduler/scheduler.go
Outdated
| customSearchAttributes *commonpb.SearchAttributes, | ||
| ) error { | ||
| needsTask := false // Set to true if we need to write anything to Visibility. | ||
| visibility, err := s.Visibility.Get(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR but are we ever expecting an error getting a component to be an error that the application needs to handle? What reasons would be for this failure? Can we just make the method panic instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we support partial read in the future, there could be some transient error returned here. For now, it will only be serialization errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's a transient, unexpected error, it should be okay to panic IMHO. That would be better than complicating the user interface. There's never really a way for a developer to react to this situation.
chasm/lib/scheduler/scheduler.go
Outdated
| if customSearchAttributes != nil && | ||
| len(customSearchAttributes.GetIndexedFields()) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This is enough because proto getters work with nils.
| if customSearchAttributes != nil && | |
| len(customSearchAttributes.GetIndexedFields()) > 0 { | |
| if len(customSearchAttributes.GetIndexedFields()) > 0 { |
chasm/lib/scheduler/scheduler.go
Outdated
| } | ||
|
|
||
| // Update Paused status. | ||
| var currentPaused bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is exactly why I didn't want components to use imperative logic to update their search attributes. This logic should be part of the framework.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can afford it timeline-wise I'd much prefer if we coded up a better abstraction in the framework.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I agree. Also acknowledging that imperative logic means component author needs to remember update those attributes.
We discussed about the field tag approach which addresses this concern and also the SA registration problem but decided to do that as a follow up to unblock the Scheduler migration and deliver value sooner. It's more of a ROI & priority discussion for the entire OSS team.
chasm/lib/scheduler/scheduler.go
Outdated
|
|
||
| // Update visibility if the memo is out-of-date or absent. | ||
| if currentInfoPayload == nil || | ||
| !bytes.Equal(currentInfoPayload.Data, newInfoPayload) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proto marshal is not deterministic by default, this comparison will give you false negatives. You will want to unmarshal and use proto.Equal instead.
Another reason why component authors shouldn't implement all of this logic.
chasm/lib/scheduler/scheduler.go
Outdated
| // attributes will be left as-is with it unset. | ||
| // | ||
| // See mergeCustomSearchAttributes for how custom search attributes are merged. | ||
| func (s *Scheduler) UpdateVisibility( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this approach, it assumes that the component author will remember to update visibility anytime that an attribute changes. This should be the framework's responsibility and should be done automatically when a transaction is closed.
chasm/lib/scheduler/util.go
Outdated
| for key, newPayload := range customAttrs { | ||
| oldPayload, alreadySet := currentAttrs[key] | ||
|
|
||
| if !alreadySet || !bytes.Equal(oldPayload.Data, newPayload.Data) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned before, you cannot rely on byte equality for this comparison.
chasm/lib/scheduler/scheduler.go
Outdated
| if needsTask { | ||
| visibility.GenerateTask(ctx) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unnecessary, Visibility component automatically generates a task when it's updated.
chasm/lib/scheduler/scheduler.go
Outdated
| customSearchAttributes *commonpb.SearchAttributes, | ||
| ) error { | ||
| needsTask := false // Set to true if we need to write anything to Visibility. | ||
| visibility, err := s.Visibility.Get(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we support partial read in the future, there could be some transient error returned here. For now, it will only be serialization errors.
chasm/lib/scheduler/scheduler.go
Outdated
| if !ok || currentPaused != s.Schedule.State.Paused { | ||
| upsertAttrs[searchattribute.TemporalSchedulePaused] = s.Schedule.State.Paused | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more like a "refresh visibility" approach. Do we have a like PauseSchedule method where we can update the field?
chasm/lib/scheduler/scheduler.go
Outdated
| currentMemo, err := visibility.GetMemo(ctx) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| currentInfoPayload := currentMemo[visibilityMemoFieldInfo] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can use the generic chasm.GetMemo() function which returns a strongly typed value instead of Payload
chasm/lib/scheduler/scheduler.go
Outdated
| newMemo := map[string]any{visibilityMemoFieldInfo: newInfoPayload} | ||
| err = visibility.UpsertMemo(ctx, newMemo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UpsertMemo will perform the encoding to Payload type, should just use newInfo here.
chasm/lib/scheduler/scheduler.go
Outdated
| currentPausedPayload, ok := currentAttrs[searchattribute.TemporalSchedulePaused] | ||
| if ok { | ||
| err = payload.Decode(currentPausedPayload, ¤tPaused) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can use the generic chasm.GetSearchAttribute() method here as well and you don't need to do the decoding.
chasm/lib/scheduler/util.go
Outdated
| // Key isn't in the new map, delete it. | ||
| upsertAttrs[key] = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this work? At least when I was coding the visibility.UpsertSearchAttributes() method I wasn't thinking of using that as a way for deletion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how it worked in the SDKs before we had typed search attributes.
chasm/lib/scheduler/scheduler.go
Outdated
| } | ||
|
|
||
| // Update Paused status. | ||
| var currentPaused bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I agree. Also acknowledging that imperative logic means component author needs to remember update those attributes.
We discussed about the field tag approach which addresses this concern and also the SA registration problem but decided to do that as a follow up to unblock the Scheduler migration and deliver value sooner. It's more of a ROI & priority discussion for the entire OSS team.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on @yycptt's comments the abstraction isn't too bad today.
Seems like you don't need to manually generate a task, serialize and deserialize payloads, etc...
If we could also eliminate the need to compare search attributes and do incremental updates only to fields that changed, that would be good enough for me for merging this PR and deferring the declarative APIs for later.
Ideally we wouldn't even need to do that, if you could just recompute the desired visibility state and have the visibility component understand what needs to be updated (if anything at all) that would be much much better than what we have today.
Is that doable?
chasm/lib/scheduler/scheduler.go
Outdated
| customSearchAttributes *commonpb.SearchAttributes, | ||
| ) error { | ||
| needsTask := false // Set to true if we need to write anything to Visibility. | ||
| visibility, err := s.Visibility.Get(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's a transient, unexpected error, it should be okay to panic IMHO. That would be better than complicating the user interface. There's never really a way for a developer to react to this situation.
chasm/lib/scheduler/util.go
Outdated
| // Key isn't in the new map, delete it. | ||
| upsertAttrs[key] = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how it worked in the SDKs before we had typed search attributes.
…move some unused metric definitions (#8293) ## What changed? - Did a pass on metrics for the CHASM scheduler and added the missing delay metric. - A few of the old schedule metrics aren't used anywhere at all, so I've removed them. ## Why? _Tell your future self why have you made these changes._ ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks _Any change is risky. Identify all risks you are aware of. If none, remove this section._
| ScheduleActionAttempt = NewCounterDef( | ||
| "schedule_action_attempt", | ||
| WithDescription("The number of schedule actions attempts"), | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure these were from another PR that might still be pending.
chasm/lib/scheduler/invoker_tasks.go
Outdated
| // realTime may be slightly past the time of the action's first scheduled WFT. | ||
| realTime := time.Now() | ||
| desiredTime := start.ActualTime | ||
| e.MetricsHandler.Timer(metrics.ScheduleActionDelay.Name()).Record(realTime.Sub(desiredTime.AsTime())) | ||
|
|
||
| return &schedulepb.ScheduleActionResult{ | ||
| ScheduleTime: start.ActualTime, | ||
| ActualTime: timestamppb.New(time.Now()), | ||
| ScheduleTime: desiredTime, | ||
| ActualTime: timestamppb.New(realTime), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also from another PR (nexus-watcher). Ignore, will fix before merge.
| // "Pause" is the only Temporal-managed search attribute for schedules, so it is | ||
| // updated here ad-hoc, instead of via the SearchAttributesProvider interface. | ||
| pauseAttr := make(map[string]*commonpb.Payload) | ||
| pauseAttr[searchattribute.TemporalSchedulePaused] = p | ||
| return vis.SetSearchAttributes(ctx, pauseAttr) // merges with custom search attributes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it this way because it seemed that to do it in the SA provider, I'd have to manually first Get the existing SAs and merge the paused status. Plus, schedule search attributes rarely change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should put this in the SA provider. The visibility component is for custom search attributes. The way that you've implemented it, you'll need to update the this attribute when a user modifies the schedule, and you're deviating from the design and how we want to structure the other CHASM components.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way that you've implemented it, you'll need to update the this attribute when a user modifies the schedule,
Why would I have to do that? SetSearchAttributes does a merge, the SA provider does not, from my read of it (it looks to be a full replace). Because these change infrequently, and independently, the ad-hoc call seemed more appropriate than writing an SA provider that has to run every transaction and have its own manual merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatted about this offline - the idea is for SearchAttributes provider to provide the Temporal-provided attributes, and SetSearchAttributes to be for custom search attributes. Will update
| msg := "failed to update future action times" | ||
| logger.Error(msg, tag.Error(err)) | ||
| return fmt.Errorf("%w: %w", | ||
| serviceerror.NewInternal(msg), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a non retryable error to ensure this task doesn't stay in the queue?
| google.protobuf.Timestamp last_processed_time = 3; | ||
|
|
||
| // A list of upcoming times an action will be triggered. | ||
| repeated google.protobuf.Timestamp future_action_times = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be persisted? Can't it be calculated on the fly when calculating the memo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it can't be, because the Memo is computed in the context of a component (not a task executor), and therefore it doesn't have the wired-in dependencies that computing this relies upon (SpecProcessor).
| // "Pause" is the only Temporal-managed search attribute for schedules, so it is | ||
| // updated here ad-hoc, instead of via the SearchAttributesProvider interface. | ||
| pauseAttr := make(map[string]*commonpb.Payload) | ||
| pauseAttr[searchattribute.TemporalSchedulePaused] = p | ||
| return vis.SetSearchAttributes(ctx, pauseAttr) // merges with custom search attributes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should put this in the SA provider. The visibility component is for custom search attributes. The way that you've implemented it, you'll need to update the this attribute when a user modifies the schedule, and you're deviating from the design and how we want to structure the other CHASM components.
| "google.golang.org/protobuf/types/known/timestamppb" | ||
| ) | ||
|
|
||
| type schedulerTestSuite struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, we asked not to add more suites into the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're just going to start redoing scheduler's tests, for a package where it's a well-established pattern, before it's even landed to match that..? Also, that isn't the agreement we came to as a team, as far as I can remember. I'd asked specifically during that meeting if there was an issue with test suites themselves, and you'd said that it was about the way that an improper testing.T gets propagated to subtests improperly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We said we wouldn't use suites for new tests. It's just impossible to not user the suite's T() and embedded assertions. But I know you're in a time crunch so we can leave this as tech debt.
What changed?
How did you test it?