From 2bf8bd87edf8ba566b9eeead70fa87a6a7aaf962 Mon Sep 17 00:00:00 2001 From: K8sCat Date: Sun, 11 May 2025 05:56:11 +0800 Subject: [PATCH 1/3] Use safeAsTime and safeAsDuration --- internal/activity.go | 10 +++--- internal/internal_deployment_client.go | 8 ++--- internal/internal_event_handlers.go | 2 +- internal/internal_nexus_task_poller.go | 4 +-- internal/internal_schedule_client.go | 30 ++++++++-------- internal/internal_task_handlers.go | 10 +++--- internal/internal_task_pollers.go | 6 ++-- internal/internal_versioning_client.go | 6 ++-- internal/internal_versioning_client_test.go | 2 +- internal/internal_worker_deployment_client.go | 10 ++++++ internal/internal_workflow_client.go | 10 +++--- internal/internal_workflow_testsuite.go | 34 +++++++++---------- internal/worker_versioning_rules.go | 4 +-- internal/worker_versioning_rules_test.go | 2 +- internal/workflow.go | 4 +-- 15 files changed, 76 insertions(+), 66 deletions(-) diff --git a/internal/activity.go b/internal/activity.go index fada8ad7d..49ae6546f 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -286,11 +286,11 @@ func WithActivityTask( interceptors []WorkerInterceptor, client *WorkflowClient, ) (context.Context, error) { - scheduled := task.GetScheduledTime().AsTime() - started := task.GetStartedTime().AsTime() - scheduleToCloseTimeout := task.GetScheduleToCloseTimeout().AsDuration() - startToCloseTimeout := task.GetStartToCloseTimeout().AsDuration() - heartbeatTimeout := task.GetHeartbeatTimeout().AsDuration() + scheduled := safeAsTime(task.GetScheduledTime()) + started := safeAsTime(task.GetStartedTime()) + scheduleToCloseTimeout := safeAsDuration(task.GetScheduleToCloseTimeout()) + startToCloseTimeout := safeAsDuration(task.GetStartToCloseTimeout()) + heartbeatTimeout := safeAsDuration(task.GetHeartbeatTimeout()) deadline := calculateActivityDeadline(scheduled, started, scheduleToCloseTimeout, startToCloseTimeout) logger = log.With(logger, diff --git a/internal/internal_deployment_client.go b/internal/internal_deployment_client.go index c3ca8f156..0d9a4a552 100644 --- a/internal/internal_deployment_client.go +++ b/internal/internal_deployment_client.go @@ -73,7 +73,7 @@ func deploymentToProto(deploymentID Deployment) *deployment.Deployment { func deploymentListEntryFromProto(deployment *deployment.DeploymentListInfo) *DeploymentListEntry { return &DeploymentListEntry{ Deployment: deploymentFromProto(deployment.GetDeployment()), - CreateTime: deployment.GetCreateTime().AsTime(), + CreateTime: safeAsTime(deployment.GetCreateTime()), IsCurrent: deployment.GetIsCurrent(), } } @@ -84,7 +84,7 @@ func deploymentTaskQueuesInfoFromProto(tqsInfo []*deployment.DeploymentInfo_Task result = append(result, DeploymentTaskQueueInfo{ Name: info.GetName(), Type: TaskQueueType(info.GetType()), - FirstPollerTime: info.GetFirstPollerTime().AsTime(), + FirstPollerTime: safeAsTime(info.GetFirstPollerTime()), }) } return result @@ -93,7 +93,7 @@ func deploymentTaskQueuesInfoFromProto(tqsInfo []*deployment.DeploymentInfo_Task func deploymentInfoFromProto(deploymentInfo *deployment.DeploymentInfo) DeploymentInfo { return DeploymentInfo{ Deployment: deploymentFromProto(deploymentInfo.GetDeployment()), - CreateTime: deploymentInfo.GetCreateTime().AsTime(), + CreateTime: safeAsTime(deploymentInfo.GetCreateTime()), IsCurrent: deploymentInfo.GetIsCurrent(), TaskQueuesInfos: deploymentTaskQueuesInfoFromProto(deploymentInfo.GetTaskQueueInfos()), Metadata: deploymentInfo.GetMetadata(), @@ -110,7 +110,7 @@ func deploymentReachabilityInfoFromProto(response *workflowservice.GetDeployment return DeploymentReachabilityInfo{ DeploymentInfo: deploymentInfoFromProto(response.GetDeploymentInfo()), Reachability: DeploymentReachability(response.GetReachability()), - LastUpdateTime: response.GetLastUpdateTime().AsTime(), + LastUpdateTime: safeAsTime(response.GetLastUpdateTime()), } } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 1c4574d97..92bd90ded 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1194,7 +1194,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( // No Operation case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED: // Set replay clock. - weh.SetCurrentReplayTime(event.GetEventTime().AsTime()) + weh.SetCurrentReplayTime(safeAsTime(event.GetEventTime())) // Update workflow info fields weh.workflowInfo.currentHistoryLength = int(event.EventId) weh.workflowInfo.continueAsNewSuggested = event.GetWorkflowTaskStartedEventAttributes().GetSuggestContinueAsNew() diff --git a/internal/internal_nexus_task_poller.go b/internal/internal_nexus_task_poller.go index 30a2973e0..14d973816 100644 --- a/internal/internal_nexus_task_poller.go +++ b/internal/internal_nexus_task_poller.go @@ -123,7 +123,7 @@ func (ntp *nexusTaskPoller) ProcessTask(task interface{}) error { // Schedule-to-start (from the time the request hit the frontend). // Note that this metric does not include the service and operation name as they are not relevant when polling from // the Nexus task queue. - scheduleToStartLatency := executionStartTime.Sub(response.GetRequest().GetScheduledTime().AsTime()) + scheduleToStartLatency := executionStartTime.Sub(safeAsTime(response.GetRequest().GetScheduledTime())) ntp.metricsHandler.WithTags(metrics.TaskQueueTags(ntp.taskQueueName)).Timer(metrics.NexusTaskScheduleToStartLatency).Record(scheduleToStartLatency) nctx, handlerErr := ntp.taskHandler.newNexusOperationContext(response) @@ -184,7 +184,7 @@ func (ntp *nexusTaskPoller) ProcessTask(task interface{}) error { // E2E latency, from frontend until we finished reporting completion. nctx.metricsHandler. Timer(metrics.NexusTaskEndToEndLatency). - Record(time.Since(response.GetRequest().GetScheduledTime().AsTime())) + Record(time.Since(safeAsTime(response.GetRequest().GetScheduledTime()))) return nil } diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go index eb08ef907..253471dbf 100644 --- a/internal/internal_schedule_client.go +++ b/internal/internal_schedule_client.go @@ -419,8 +419,8 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS intervals := make([]ScheduleIntervalSpec, len(scheduleSpec.GetInterval())) for i, s := range scheduleSpec.GetInterval() { intervals[i] = ScheduleIntervalSpec{ - Every: s.Interval.AsDuration(), - Offset: s.Phase.AsDuration(), + Every: safeAsDuration(s.Interval), + Offset: safeAsDuration(s.Phase), } } @@ -428,12 +428,12 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS startAt := time.Time{} if scheduleSpec.GetStartTime() != nil { - startAt = scheduleSpec.GetStartTime().AsTime() + startAt = safeAsTime(scheduleSpec.GetStartTime()) } endAt := time.Time{} if scheduleSpec.GetEndTime() != nil { - endAt = scheduleSpec.GetEndTime().AsTime() + endAt = safeAsTime(scheduleSpec.GetEndTime()) } return &ScheduleSpec{ @@ -442,7 +442,7 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS Skip: skip, StartAt: startAt, EndAt: endAt, - Jitter: scheduleSpec.GetJitter().AsDuration(), + Jitter: safeAsDuration(scheduleSpec.GetJitter()), TimeZoneName: scheduleSpec.GetTimezoneName(), } } @@ -468,7 +468,7 @@ func scheduleDescriptionFromPB( nextActionTimes := make([]time.Time, len(describeResponse.Info.GetFutureActionTimes())) for i, t := range describeResponse.Info.GetFutureActionTimes() { - nextActionTimes[i] = t.AsTime() + nextActionTimes[i] = safeAsTime(t) } actionDescription, err := convertFromPBScheduleAction(logger, dc, describeResponse.Schedule.Action) @@ -488,7 +488,7 @@ func scheduleDescriptionFromPB( Spec: convertFromPBScheduleSpec(describeResponse.Schedule.Spec), Policy: &SchedulePolicies{ Overlap: describeResponse.Schedule.Policies.GetOverlapPolicy(), - CatchupWindow: describeResponse.Schedule.Policies.GetCatchupWindow().AsDuration(), + CatchupWindow: safeAsDuration(describeResponse.Schedule.Policies.GetCatchupWindow()), PauseOnFailure: describeResponse.Schedule.Policies.GetPauseOnFailure(), }, State: &ScheduleState{ @@ -505,8 +505,8 @@ func scheduleDescriptionFromPB( RunningWorkflows: runningWorkflows, RecentActions: recentActions, NextActionTimes: nextActionTimes, - CreatedAt: describeResponse.Info.GetCreateTime().AsTime(), - LastUpdateAt: describeResponse.Info.GetUpdateTime().AsTime(), + CreatedAt: safeAsTime(describeResponse.Info.GetCreateTime()), + LastUpdateAt: safeAsTime(describeResponse.Info.GetUpdateTime()), }, Memo: describeResponse.Memo, SearchAttributes: searchAttributes, @@ -553,7 +553,7 @@ func convertFromPBScheduleListEntry(schedule *schedulepb.ScheduleListEntry) *Sch nextActionTimes := make([]time.Time, len(schedule.Info.GetFutureActionTimes())) for i, t := range schedule.Info.GetFutureActionTimes() { - nextActionTimes[i] = t.AsTime() + nextActionTimes[i] = safeAsTime(t) } return &ScheduleListEntry{ @@ -712,9 +712,9 @@ func convertFromPBScheduleAction( Workflow: workflow.WorkflowType.GetName(), Args: args, TaskQueue: workflow.TaskQueue.GetName(), - WorkflowExecutionTimeout: workflow.GetWorkflowExecutionTimeout().AsDuration(), - WorkflowRunTimeout: workflow.GetWorkflowRunTimeout().AsDuration(), - WorkflowTaskTimeout: workflow.GetWorkflowTaskTimeout().AsDuration(), + WorkflowExecutionTimeout: safeAsDuration(workflow.GetWorkflowExecutionTimeout()), + WorkflowRunTimeout: safeAsDuration(workflow.GetWorkflowRunTimeout()), + WorkflowTaskTimeout: safeAsDuration(workflow.GetWorkflowTaskTimeout()), RetryPolicy: convertFromPBRetryPolicy(workflow.RetryPolicy), Memo: memos, TypedSearchAttributes: searchAttrs, @@ -842,8 +842,8 @@ func convertFromPBScheduleActionResultList(aa []*schedulepb.ScheduleActionResult } } recentActions[i] = ScheduleActionResult{ - ScheduleTime: a.GetScheduleTime().AsTime(), - ActualTime: a.GetActualTime().AsTime(), + ScheduleTime: safeAsTime(a.GetScheduleTime()), + ActualTime: safeAsTime(a.GetActualTime()), StartWorkflowResult: workflowExecution, } } diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 665bf8bef..e4b2c7793 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -727,12 +727,12 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice. FirstRunID: attributes.FirstExecutionRunId, WorkflowType: WorkflowType{Name: task.WorkflowType.GetName()}, TaskQueueName: taskQueue.GetName(), - WorkflowExecutionTimeout: attributes.GetWorkflowExecutionTimeout().AsDuration(), - WorkflowRunTimeout: attributes.GetWorkflowRunTimeout().AsDuration(), - WorkflowTaskTimeout: attributes.GetWorkflowTaskTimeout().AsDuration(), + WorkflowExecutionTimeout: safeAsDuration(attributes.GetWorkflowExecutionTimeout()), + WorkflowRunTimeout: safeAsDuration(attributes.GetWorkflowRunTimeout()), + WorkflowTaskTimeout: safeAsDuration(attributes.GetWorkflowTaskTimeout()), Namespace: wth.namespace, Attempt: attributes.GetAttempt(), - WorkflowStartTime: startedEvent.GetEventTime().AsTime(), + WorkflowStartTime: safeAsTime(startedEvent.GetEventTime()), lastCompletionResult: attributes.LastCompletionResult, lastFailure: attributes.ContinuedFailure, CronSchedule: attributes.CronSchedule, @@ -2211,7 +2211,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice canCtx, cancel := context.WithCancelCause(rootCtx) defer cancel(nil) - heartbeatThrottleInterval := ath.getHeartbeatThrottleInterval(t.GetHeartbeatTimeout().AsDuration()) + heartbeatThrottleInterval := ath.getHeartbeatThrottleInterval(safeAsDuration(t.GetHeartbeatTimeout())) invoker := newServiceInvoker( t.TaskToken, ath.identity, ath.client.workflowService, ath.metricsHandler, cancel, heartbeatThrottleInterval, ath.workerStopCh, ath.namespace) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index fd66de217..fba4e1891 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -864,7 +864,7 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (taskForWorker, error) metricsHandler := wtp.metricsHandler.WithTags(metrics.WorkflowTags(response.WorkflowType.GetName())) metricsHandler.Counter(metrics.WorkflowTaskQueuePollSucceedCounter).Inc(1) - scheduleToStartLatency := response.GetStartedTime().AsTime().Sub(response.GetScheduledTime().AsTime()) + scheduleToStartLatency := safeAsTime(response.GetStartedTime()).Sub(safeAsTime(response.GetScheduledTime())) metricsHandler.Timer(metrics.WorkflowTaskScheduleToStartLatency).Record(scheduleToStartLatency) return task, nil } @@ -1032,7 +1032,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (taskForWorker, error) activityType := response.ActivityType.GetName() metricsHandler := atp.metricsHandler.WithTags(metrics.ActivityTags(workflowType, activityType, atp.taskQueueName)) - scheduleToStartLatency := response.GetStartedTime().AsTime().Sub(response.GetCurrentAttemptScheduledTime().AsTime()) + scheduleToStartLatency := safeAsTime(response.GetStartedTime()).Sub(safeAsTime(response.GetCurrentAttemptScheduledTime())) metricsHandler.Timer(metrics.ActivityScheduleToStartLatency).Record(scheduleToStartLatency) return &activityTask{task: response}, nil @@ -1103,7 +1103,7 @@ func (atp *activityTaskPoller) ProcessTask(task interface{}) error { if _, ok := request.(*workflowservice.RespondActivityTaskCompletedRequest); ok { activityMetricsHandler. Timer(metrics.ActivitySucceedEndToEndLatency). - Record(time.Since(activityTask.task.GetScheduledTime().AsTime())) + Record(time.Since(safeAsTime(activityTask.task.GetScheduledTime()))) } return nil } diff --git a/internal/internal_versioning_client.go b/internal/internal_versioning_client.go index f5ab9c591..ad17da50b 100644 --- a/internal/internal_versioning_client.go +++ b/internal/internal_versioning_client.go @@ -352,7 +352,7 @@ func pollerInfoFromResponse(response *taskqueuepb.PollerInfo) TaskQueuePollerInf lastAccessTime := time.Time{} if response.GetLastAccessTime() != nil { - lastAccessTime = response.GetLastAccessTime().AsTime() + lastAccessTime = safeAsTime(response.GetLastAccessTime()) } return TaskQueuePollerInfo{ @@ -388,7 +388,7 @@ func statsFromResponse(stats *taskqueuepb.TaskQueueStats) *TaskQueueStats { return &TaskQueueStats{ ApproximateBacklogCount: stats.GetApproximateBacklogCount(), - ApproximateBacklogAge: stats.GetApproximateBacklogAge().AsDuration(), + ApproximateBacklogAge: safeAsDuration(stats.GetApproximateBacklogAge()), TasksAddRate: stats.TasksAddRate, TasksDispatchRate: stats.TasksDispatchRate, BacklogIncreaseRate: stats.TasksAddRate - stats.TasksDispatchRate, @@ -429,7 +429,7 @@ func taskQueueVersioningInfoFromResponse(info *taskqueuepb.TaskQueueVersioningIn CurrentVersion: info.CurrentVersion, RampingVersion: info.RampingVersion, RampingVersionPercentage: info.RampingVersionPercentage, - UpdateTime: info.UpdateTime.AsTime(), + UpdateTime: safeAsTime(info.UpdateTime), } } diff --git a/internal/internal_versioning_client_test.go b/internal/internal_versioning_client_test.go index 4e7994a7f..e36d0146a 100644 --- a/internal/internal_versioning_client_test.go +++ b/internal/internal_versioning_client_test.go @@ -53,7 +53,7 @@ func Test_DetectEnhancedNotSupported_fromProtoResponse(t *testing.T) { func Test_TaskQueueDescription_fromProtoResponse(t *testing.T) { nowProto := timestamppb.Now() - now := nowProto.AsTime() + now := safeAsTime(nowProto) tests := []struct { name string response *workflowservice.DescribeTaskQueueResponse diff --git a/internal/internal_worker_deployment_client.go b/internal/internal_worker_deployment_client.go index ce66767b6..aa5ddf3ce 100644 --- a/internal/internal_worker_deployment_client.go +++ b/internal/internal_worker_deployment_client.go @@ -11,6 +11,7 @@ import ( "go.temporal.io/api/deployment/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/converter" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -29,6 +30,15 @@ func safeAsTime(timestamp *timestamppb.Timestamp) time.Time { } } +// safeAsDuration ensures that a nil proto duration makes `AsDuration()` return 0. +func safeAsDuration(duration *durationpb.Duration) time.Duration { + if duration == nil { + return time.Duration(0) + } else { + return duration.AsDuration() + } +} + type ( // WorkerDeploymentClient is the client for managing worker deployments. workerDeploymentClient struct { diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index ceaa0ebf6..338b81917 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -1597,10 +1597,10 @@ func (workflowRun *workflowRunImpl) GetWithOptions( TaskQueueName: attributes.GetTaskQueue().GetName(), } if attributes.WorkflowRunTimeout != nil { - err.WorkflowRunTimeout = attributes.WorkflowRunTimeout.AsDuration() + err.WorkflowRunTimeout = safeAsDuration(attributes.WorkflowRunTimeout) } if attributes.WorkflowTaskTimeout != nil { - err.WorkflowTaskTimeout = attributes.WorkflowTaskTimeout.AsDuration() + err.WorkflowTaskTimeout = safeAsDuration(attributes.WorkflowTaskTimeout) } return err default: @@ -2223,12 +2223,12 @@ func (w *workflowClientInterceptor) DescribeWorkflow( var closeTime *time.Time if info.GetCloseTime().IsValid() { - t := info.GetCloseTime().AsTime() + t := safeAsTime(info.GetCloseTime()) closeTime = &t } var executionTime *time.Time if info.GetExecutionTime().IsValid() { - t := info.GetExecutionTime().AsTime() + t := safeAsTime(info.GetExecutionTime()) executionTime = &t } @@ -2262,7 +2262,7 @@ func (w *workflowClientInterceptor) DescribeWorkflow( Status: info.GetStatus(), ParentWorkflowExecution: parentWorkflowExecution, RootWorkflowExecution: rootWorkflowExecution, - WorkflowStartTime: info.GetStartTime().AsTime(), + WorkflowStartTime: safeAsTime(info.GetStartTime()), ExecutionTime: executionTime, WorkflowCloseTime: closeTime, HistoryLength: int(info.GetHistoryLength()), diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 26923ceb3..18a8507f2 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -1274,7 +1274,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivi } func minDur(a *durationpb.Duration, b *durationpb.Duration) *durationpb.Duration { - ad, bd := a.AsDuration(), b.AsDuration() + ad, bd := safeAsDuration(a), safeAsDuration(b) if ad < bd { return a } @@ -1317,14 +1317,14 @@ func (env *testWorkflowEnvironmentImpl) validateActivityScheduleAttributes( } // Only attempt to deduce and fill in unspecified timeouts only when all timeouts are non-negative. - if attributes.GetScheduleToCloseTimeout().AsDuration() < 0 || attributes.GetScheduleToStartTimeout().AsDuration() < 0 || - attributes.GetStartToCloseTimeout().AsDuration() < 0 || attributes.GetHeartbeatTimeout().AsDuration() < 0 { + if safeAsDuration(attributes.GetScheduleToCloseTimeout()) < 0 || safeAsDuration(attributes.GetScheduleToStartTimeout()) < 0 || + safeAsDuration(attributes.GetStartToCloseTimeout()) < 0 || safeAsDuration(attributes.GetHeartbeatTimeout()) < 0 { return serviceerror.NewInvalidArgument("A valid timeout may not be negative.") } - validScheduleToClose := attributes.GetScheduleToCloseTimeout().AsDuration() > 0 - validScheduleToStart := attributes.GetScheduleToStartTimeout().AsDuration() > 0 - validStartToClose := attributes.GetStartToCloseTimeout().AsDuration() > 0 + validScheduleToClose := safeAsDuration(attributes.GetScheduleToCloseTimeout()) > 0 + validScheduleToStart := safeAsDuration(attributes.GetScheduleToStartTimeout()) > 0 + validStartToClose := safeAsDuration(attributes.GetStartToCloseTimeout()) > 0 if validScheduleToClose { if validScheduleToStart { @@ -1349,16 +1349,16 @@ func (env *testWorkflowEnvironmentImpl) validateActivityScheduleAttributes( } // ensure activity timeout never larger than workflow timeout if runTimeout > 0 { - if attributes.GetScheduleToCloseTimeout().AsDuration() > runTimeout { + if safeAsDuration(attributes.GetScheduleToCloseTimeout()) > runTimeout { attributes.ScheduleToCloseTimeout = durationpb.New(runTimeout) } - if attributes.GetScheduleToStartTimeout().AsDuration() > runTimeout { + if safeAsDuration(attributes.GetScheduleToStartTimeout()) > runTimeout { attributes.ScheduleToStartTimeout = durationpb.New(runTimeout) } - if attributes.GetStartToCloseTimeout().AsDuration() > runTimeout { + if safeAsDuration(attributes.GetStartToCloseTimeout()) > runTimeout { attributes.StartToCloseTimeout = durationpb.New(runTimeout) } - if attributes.GetHeartbeatTimeout().AsDuration() > runTimeout { + if safeAsDuration(attributes.GetHeartbeatTimeout()) > runTimeout { attributes.HeartbeatTimeout = durationpb.New(runTimeout) } } @@ -1408,16 +1408,16 @@ func (env *testWorkflowEnvironmentImpl) validateRetryPolicy(policy *commonpb.Ret // rest of the arguments is pointless return nil } - if policy.GetInitialInterval().AsDuration() < 0 { + if safeAsDuration(policy.GetInitialInterval()) < 0 { return serviceerror.NewInvalidArgument("InitialInterval cannot be negative on retry policy.") } if policy.GetBackoffCoefficient() < 1 { return serviceerror.NewInvalidArgument("BackoffCoefficient cannot be less than 1 on retry policy.") } - if policy.GetMaximumInterval().AsDuration() < 0 { + if safeAsDuration(policy.GetMaximumInterval()) < 0 { return serviceerror.NewInvalidArgument("MaximumInterval cannot be negative on retry policy.") } - if policy.GetMaximumInterval().AsDuration() > 0 && policy.GetMaximumInterval().AsDuration() < policy.GetInitialInterval().AsDuration() { + if safeAsDuration(policy.GetMaximumInterval()) > 0 && safeAsDuration(policy.GetMaximumInterval()) < safeAsDuration(policy.GetInitialInterval()) { return serviceerror.NewInvalidArgument("MaximumInterval cannot be less than InitialInterval on retry policy.") } if policy.GetMaximumAttempts() < 0 { @@ -1506,9 +1506,9 @@ func (env *testWorkflowEnvironmentImpl) executeActivityWithRetryForTest( func fromProtoRetryPolicy(p *commonpb.RetryPolicy) *RetryPolicy { return &RetryPolicy{ - InitialInterval: p.GetInitialInterval().AsDuration(), + InitialInterval: safeAsDuration(p.GetInitialInterval()), BackoffCoefficient: p.GetBackoffCoefficient(), - MaximumInterval: p.GetMaximumInterval().AsDuration(), + MaximumInterval: safeAsDuration(p.GetMaximumInterval()), MaximumAttempts: p.GetMaximumAttempts(), NonRetryableErrorTypes: p.NonRetryableErrorTypes, } @@ -1529,10 +1529,10 @@ func ensureDefaultRetryPolicy(parameters *ExecuteActivityParams) { parameters.RetryPolicy = &commonpb.RetryPolicy{} } - if parameters.RetryPolicy.InitialInterval == nil || parameters.RetryPolicy.InitialInterval.AsDuration() == 0 { + if parameters.RetryPolicy.InitialInterval == nil || safeAsDuration(parameters.RetryPolicy.InitialInterval) == 0 { parameters.RetryPolicy.InitialInterval = durationpb.New(time.Second) } - if parameters.RetryPolicy.MaximumInterval == nil || parameters.RetryPolicy.MaximumInterval.AsDuration() == 0 { + if parameters.RetryPolicy.MaximumInterval == nil || safeAsDuration(parameters.RetryPolicy.MaximumInterval) == 0 { parameters.RetryPolicy.MaximumInterval = parameters.RetryPolicy.InitialInterval } if parameters.RetryPolicy.BackoffCoefficient == 0 { diff --git a/internal/worker_versioning_rules.go b/internal/worker_versioning_rules.go index 469e2ebe4..50e2cff86 100644 --- a/internal/worker_versioning_rules.go +++ b/internal/worker_versioning_rules.go @@ -357,7 +357,7 @@ func versioningAssignmentRuleFromProto(rule *taskqueuepb.BuildIdAssignmentRule, } if timestamp != nil { - result.CreateTime = timestamp.AsTime() + result.CreateTime = safeAsTime(timestamp) } return result } @@ -375,7 +375,7 @@ func versioningRedirectRuleFromProto(rule *taskqueuepb.CompatibleBuildIdRedirect } if timestamp != nil { - result.CreateTime = timestamp.AsTime() + result.CreateTime = safeAsTime(timestamp) } return result } diff --git a/internal/worker_versioning_rules_test.go b/internal/worker_versioning_rules_test.go index 68859f621..c8fa4572b 100644 --- a/internal/worker_versioning_rules_test.go +++ b/internal/worker_versioning_rules_test.go @@ -11,7 +11,7 @@ import ( func Test_WorkerVersioningRules_fromProtoGetResponse(t *testing.T) { nowProto := timestamppb.Now() - timestamp := nowProto.AsTime() + timestamp := safeAsTime(nowProto) tests := []struct { name string response *workflowservice.GetWorkerVersioningRulesResponse diff --git a/internal/workflow.go b/internal/workflow.go index cc23bc216..d46c94db6 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -2557,8 +2557,8 @@ func convertFromPBRetryPolicy(retryPolicy *commonpb.RetryPolicy) *RetryPolicy { NonRetryableErrorTypes: retryPolicy.NonRetryableErrorTypes, } - p.MaximumInterval = retryPolicy.MaximumInterval.AsDuration() - p.InitialInterval = retryPolicy.InitialInterval.AsDuration() + p.MaximumInterval = safeAsDuration(retryPolicy.MaximumInterval) + p.InitialInterval = safeAsDuration(retryPolicy.InitialInterval) return &p } From 8abb63c7c9d380f25c5f48f272e9d308090f854a Mon Sep 17 00:00:00 2001 From: K8sCat Date: Wed, 14 May 2025 00:08:01 +0800 Subject: [PATCH 2/3] Remove safeAsDuration --- internal/activity.go | 6 ++-- internal/internal_schedule_client.go | 14 ++++---- internal/internal_task_handlers.go | 8 ++--- internal/internal_versioning_client.go | 2 +- internal/internal_worker_deployment_client.go | 10 ------ internal/internal_workflow_client.go | 4 +-- internal/internal_workflow_testsuite.go | 34 +++++++++---------- internal/workflow.go | 4 +-- 8 files changed, 36 insertions(+), 46 deletions(-) diff --git a/internal/activity.go b/internal/activity.go index 49ae6546f..6f27f3358 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -288,9 +288,9 @@ func WithActivityTask( ) (context.Context, error) { scheduled := safeAsTime(task.GetScheduledTime()) started := safeAsTime(task.GetStartedTime()) - scheduleToCloseTimeout := safeAsDuration(task.GetScheduleToCloseTimeout()) - startToCloseTimeout := safeAsDuration(task.GetStartToCloseTimeout()) - heartbeatTimeout := safeAsDuration(task.GetHeartbeatTimeout()) + scheduleToCloseTimeout := task.GetScheduleToCloseTimeout().AsDuration() + startToCloseTimeout := task.GetStartToCloseTimeout().AsDuration() + heartbeatTimeout := task.GetHeartbeatTimeout().AsDuration() deadline := calculateActivityDeadline(scheduled, started, scheduleToCloseTimeout, startToCloseTimeout) logger = log.With(logger, diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go index 253471dbf..af932b647 100644 --- a/internal/internal_schedule_client.go +++ b/internal/internal_schedule_client.go @@ -419,8 +419,8 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS intervals := make([]ScheduleIntervalSpec, len(scheduleSpec.GetInterval())) for i, s := range scheduleSpec.GetInterval() { intervals[i] = ScheduleIntervalSpec{ - Every: safeAsDuration(s.Interval), - Offset: safeAsDuration(s.Phase), + Every: s.Interval.AsDuration(), + Offset: s.Phase.AsDuration(), } } @@ -442,7 +442,7 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS Skip: skip, StartAt: startAt, EndAt: endAt, - Jitter: safeAsDuration(scheduleSpec.GetJitter()), + Jitter: scheduleSpec.GetJitter().AsDuration(), TimeZoneName: scheduleSpec.GetTimezoneName(), } } @@ -488,7 +488,7 @@ func scheduleDescriptionFromPB( Spec: convertFromPBScheduleSpec(describeResponse.Schedule.Spec), Policy: &SchedulePolicies{ Overlap: describeResponse.Schedule.Policies.GetOverlapPolicy(), - CatchupWindow: safeAsDuration(describeResponse.Schedule.Policies.GetCatchupWindow()), + CatchupWindow: describeResponse.Schedule.Policies.GetCatchupWindow().AsDuration(), PauseOnFailure: describeResponse.Schedule.Policies.GetPauseOnFailure(), }, State: &ScheduleState{ @@ -712,9 +712,9 @@ func convertFromPBScheduleAction( Workflow: workflow.WorkflowType.GetName(), Args: args, TaskQueue: workflow.TaskQueue.GetName(), - WorkflowExecutionTimeout: safeAsDuration(workflow.GetWorkflowExecutionTimeout()), - WorkflowRunTimeout: safeAsDuration(workflow.GetWorkflowRunTimeout()), - WorkflowTaskTimeout: safeAsDuration(workflow.GetWorkflowTaskTimeout()), + WorkflowExecutionTimeout: workflow.GetWorkflowExecutionTimeout().AsDuration(), + WorkflowRunTimeout: workflow.GetWorkflowRunTimeout().AsDuration(), + WorkflowTaskTimeout: workflow.GetWorkflowTaskTimeout().AsDuration(), RetryPolicy: convertFromPBRetryPolicy(workflow.RetryPolicy), Memo: memos, TypedSearchAttributes: searchAttrs, diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index e4b2c7793..e019b2d02 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -727,9 +727,9 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice. FirstRunID: attributes.FirstExecutionRunId, WorkflowType: WorkflowType{Name: task.WorkflowType.GetName()}, TaskQueueName: taskQueue.GetName(), - WorkflowExecutionTimeout: safeAsDuration(attributes.GetWorkflowExecutionTimeout()), - WorkflowRunTimeout: safeAsDuration(attributes.GetWorkflowRunTimeout()), - WorkflowTaskTimeout: safeAsDuration(attributes.GetWorkflowTaskTimeout()), + WorkflowExecutionTimeout: attributes.GetWorkflowExecutionTimeout().AsDuration(), + WorkflowRunTimeout: attributes.GetWorkflowRunTimeout().AsDuration(), + WorkflowTaskTimeout: attributes.GetWorkflowTaskTimeout().AsDuration(), Namespace: wth.namespace, Attempt: attributes.GetAttempt(), WorkflowStartTime: safeAsTime(startedEvent.GetEventTime()), @@ -2211,7 +2211,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice canCtx, cancel := context.WithCancelCause(rootCtx) defer cancel(nil) - heartbeatThrottleInterval := ath.getHeartbeatThrottleInterval(safeAsDuration(t.GetHeartbeatTimeout())) + heartbeatThrottleInterval := ath.getHeartbeatThrottleInterval(t.GetHeartbeatTimeout().AsDuration()) invoker := newServiceInvoker( t.TaskToken, ath.identity, ath.client.workflowService, ath.metricsHandler, cancel, heartbeatThrottleInterval, ath.workerStopCh, ath.namespace) diff --git a/internal/internal_versioning_client.go b/internal/internal_versioning_client.go index ad17da50b..24c4fc730 100644 --- a/internal/internal_versioning_client.go +++ b/internal/internal_versioning_client.go @@ -388,7 +388,7 @@ func statsFromResponse(stats *taskqueuepb.TaskQueueStats) *TaskQueueStats { return &TaskQueueStats{ ApproximateBacklogCount: stats.GetApproximateBacklogCount(), - ApproximateBacklogAge: safeAsDuration(stats.GetApproximateBacklogAge()), + ApproximateBacklogAge: stats.GetApproximateBacklogAge().AsDuration(), TasksAddRate: stats.TasksAddRate, TasksDispatchRate: stats.TasksDispatchRate, BacklogIncreaseRate: stats.TasksAddRate - stats.TasksDispatchRate, diff --git a/internal/internal_worker_deployment_client.go b/internal/internal_worker_deployment_client.go index aa5ddf3ce..ce66767b6 100644 --- a/internal/internal_worker_deployment_client.go +++ b/internal/internal_worker_deployment_client.go @@ -11,7 +11,6 @@ import ( "go.temporal.io/api/deployment/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/converter" - "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -30,15 +29,6 @@ func safeAsTime(timestamp *timestamppb.Timestamp) time.Time { } } -// safeAsDuration ensures that a nil proto duration makes `AsDuration()` return 0. -func safeAsDuration(duration *durationpb.Duration) time.Duration { - if duration == nil { - return time.Duration(0) - } else { - return duration.AsDuration() - } -} - type ( // WorkerDeploymentClient is the client for managing worker deployments. workerDeploymentClient struct { diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 338b81917..7bc22f553 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -1597,10 +1597,10 @@ func (workflowRun *workflowRunImpl) GetWithOptions( TaskQueueName: attributes.GetTaskQueue().GetName(), } if attributes.WorkflowRunTimeout != nil { - err.WorkflowRunTimeout = safeAsDuration(attributes.WorkflowRunTimeout) + err.WorkflowRunTimeout = attributes.WorkflowRunTimeout.AsDuration() } if attributes.WorkflowTaskTimeout != nil { - err.WorkflowTaskTimeout = safeAsDuration(attributes.WorkflowTaskTimeout) + err.WorkflowTaskTimeout = attributes.WorkflowTaskTimeout.AsDuration() } return err default: diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 18a8507f2..26923ceb3 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -1274,7 +1274,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivi } func minDur(a *durationpb.Duration, b *durationpb.Duration) *durationpb.Duration { - ad, bd := safeAsDuration(a), safeAsDuration(b) + ad, bd := a.AsDuration(), b.AsDuration() if ad < bd { return a } @@ -1317,14 +1317,14 @@ func (env *testWorkflowEnvironmentImpl) validateActivityScheduleAttributes( } // Only attempt to deduce and fill in unspecified timeouts only when all timeouts are non-negative. - if safeAsDuration(attributes.GetScheduleToCloseTimeout()) < 0 || safeAsDuration(attributes.GetScheduleToStartTimeout()) < 0 || - safeAsDuration(attributes.GetStartToCloseTimeout()) < 0 || safeAsDuration(attributes.GetHeartbeatTimeout()) < 0 { + if attributes.GetScheduleToCloseTimeout().AsDuration() < 0 || attributes.GetScheduleToStartTimeout().AsDuration() < 0 || + attributes.GetStartToCloseTimeout().AsDuration() < 0 || attributes.GetHeartbeatTimeout().AsDuration() < 0 { return serviceerror.NewInvalidArgument("A valid timeout may not be negative.") } - validScheduleToClose := safeAsDuration(attributes.GetScheduleToCloseTimeout()) > 0 - validScheduleToStart := safeAsDuration(attributes.GetScheduleToStartTimeout()) > 0 - validStartToClose := safeAsDuration(attributes.GetStartToCloseTimeout()) > 0 + validScheduleToClose := attributes.GetScheduleToCloseTimeout().AsDuration() > 0 + validScheduleToStart := attributes.GetScheduleToStartTimeout().AsDuration() > 0 + validStartToClose := attributes.GetStartToCloseTimeout().AsDuration() > 0 if validScheduleToClose { if validScheduleToStart { @@ -1349,16 +1349,16 @@ func (env *testWorkflowEnvironmentImpl) validateActivityScheduleAttributes( } // ensure activity timeout never larger than workflow timeout if runTimeout > 0 { - if safeAsDuration(attributes.GetScheduleToCloseTimeout()) > runTimeout { + if attributes.GetScheduleToCloseTimeout().AsDuration() > runTimeout { attributes.ScheduleToCloseTimeout = durationpb.New(runTimeout) } - if safeAsDuration(attributes.GetScheduleToStartTimeout()) > runTimeout { + if attributes.GetScheduleToStartTimeout().AsDuration() > runTimeout { attributes.ScheduleToStartTimeout = durationpb.New(runTimeout) } - if safeAsDuration(attributes.GetStartToCloseTimeout()) > runTimeout { + if attributes.GetStartToCloseTimeout().AsDuration() > runTimeout { attributes.StartToCloseTimeout = durationpb.New(runTimeout) } - if safeAsDuration(attributes.GetHeartbeatTimeout()) > runTimeout { + if attributes.GetHeartbeatTimeout().AsDuration() > runTimeout { attributes.HeartbeatTimeout = durationpb.New(runTimeout) } } @@ -1408,16 +1408,16 @@ func (env *testWorkflowEnvironmentImpl) validateRetryPolicy(policy *commonpb.Ret // rest of the arguments is pointless return nil } - if safeAsDuration(policy.GetInitialInterval()) < 0 { + if policy.GetInitialInterval().AsDuration() < 0 { return serviceerror.NewInvalidArgument("InitialInterval cannot be negative on retry policy.") } if policy.GetBackoffCoefficient() < 1 { return serviceerror.NewInvalidArgument("BackoffCoefficient cannot be less than 1 on retry policy.") } - if safeAsDuration(policy.GetMaximumInterval()) < 0 { + if policy.GetMaximumInterval().AsDuration() < 0 { return serviceerror.NewInvalidArgument("MaximumInterval cannot be negative on retry policy.") } - if safeAsDuration(policy.GetMaximumInterval()) > 0 && safeAsDuration(policy.GetMaximumInterval()) < safeAsDuration(policy.GetInitialInterval()) { + if policy.GetMaximumInterval().AsDuration() > 0 && policy.GetMaximumInterval().AsDuration() < policy.GetInitialInterval().AsDuration() { return serviceerror.NewInvalidArgument("MaximumInterval cannot be less than InitialInterval on retry policy.") } if policy.GetMaximumAttempts() < 0 { @@ -1506,9 +1506,9 @@ func (env *testWorkflowEnvironmentImpl) executeActivityWithRetryForTest( func fromProtoRetryPolicy(p *commonpb.RetryPolicy) *RetryPolicy { return &RetryPolicy{ - InitialInterval: safeAsDuration(p.GetInitialInterval()), + InitialInterval: p.GetInitialInterval().AsDuration(), BackoffCoefficient: p.GetBackoffCoefficient(), - MaximumInterval: safeAsDuration(p.GetMaximumInterval()), + MaximumInterval: p.GetMaximumInterval().AsDuration(), MaximumAttempts: p.GetMaximumAttempts(), NonRetryableErrorTypes: p.NonRetryableErrorTypes, } @@ -1529,10 +1529,10 @@ func ensureDefaultRetryPolicy(parameters *ExecuteActivityParams) { parameters.RetryPolicy = &commonpb.RetryPolicy{} } - if parameters.RetryPolicy.InitialInterval == nil || safeAsDuration(parameters.RetryPolicy.InitialInterval) == 0 { + if parameters.RetryPolicy.InitialInterval == nil || parameters.RetryPolicy.InitialInterval.AsDuration() == 0 { parameters.RetryPolicy.InitialInterval = durationpb.New(time.Second) } - if parameters.RetryPolicy.MaximumInterval == nil || safeAsDuration(parameters.RetryPolicy.MaximumInterval) == 0 { + if parameters.RetryPolicy.MaximumInterval == nil || parameters.RetryPolicy.MaximumInterval.AsDuration() == 0 { parameters.RetryPolicy.MaximumInterval = parameters.RetryPolicy.InitialInterval } if parameters.RetryPolicy.BackoffCoefficient == 0 { diff --git a/internal/workflow.go b/internal/workflow.go index d46c94db6..cc23bc216 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -2557,8 +2557,8 @@ func convertFromPBRetryPolicy(retryPolicy *commonpb.RetryPolicy) *RetryPolicy { NonRetryableErrorTypes: retryPolicy.NonRetryableErrorTypes, } - p.MaximumInterval = safeAsDuration(retryPolicy.MaximumInterval) - p.InitialInterval = safeAsDuration(retryPolicy.InitialInterval) + p.MaximumInterval = retryPolicy.MaximumInterval.AsDuration() + p.InitialInterval = retryPolicy.InitialInterval.AsDuration() return &p } From 0303efa7e601095adb75b652e87de52e1d23a69f Mon Sep 17 00:00:00 2001 From: K8sCat Date: Wed, 14 May 2025 00:19:32 +0800 Subject: [PATCH 3/3] Remove unnecessary safeAsTime --- internal/internal_nexus_task_poller.go | 4 ++-- internal/internal_schedule_client.go | 11 ++--------- internal/internal_task_pollers.go | 6 +++--- internal/internal_versioning_client.go | 5 +---- internal/internal_versioning_client_test.go | 2 +- internal/internal_workflow_client.go | 4 ++-- internal/worker_versioning_rules.go | 4 ++-- internal/worker_versioning_rules_test.go | 2 +- 8 files changed, 14 insertions(+), 24 deletions(-) diff --git a/internal/internal_nexus_task_poller.go b/internal/internal_nexus_task_poller.go index 14d973816..30a2973e0 100644 --- a/internal/internal_nexus_task_poller.go +++ b/internal/internal_nexus_task_poller.go @@ -123,7 +123,7 @@ func (ntp *nexusTaskPoller) ProcessTask(task interface{}) error { // Schedule-to-start (from the time the request hit the frontend). // Note that this metric does not include the service and operation name as they are not relevant when polling from // the Nexus task queue. - scheduleToStartLatency := executionStartTime.Sub(safeAsTime(response.GetRequest().GetScheduledTime())) + scheduleToStartLatency := executionStartTime.Sub(response.GetRequest().GetScheduledTime().AsTime()) ntp.metricsHandler.WithTags(metrics.TaskQueueTags(ntp.taskQueueName)).Timer(metrics.NexusTaskScheduleToStartLatency).Record(scheduleToStartLatency) nctx, handlerErr := ntp.taskHandler.newNexusOperationContext(response) @@ -184,7 +184,7 @@ func (ntp *nexusTaskPoller) ProcessTask(task interface{}) error { // E2E latency, from frontend until we finished reporting completion. nctx.metricsHandler. Timer(metrics.NexusTaskEndToEndLatency). - Record(time.Since(safeAsTime(response.GetRequest().GetScheduledTime()))) + Record(time.Since(response.GetRequest().GetScheduledTime().AsTime())) return nil } diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go index af932b647..bd22e9517 100644 --- a/internal/internal_schedule_client.go +++ b/internal/internal_schedule_client.go @@ -426,15 +426,8 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS skip := convertFromPBScheduleCalendarSpecList(scheduleSpec.GetExcludeStructuredCalendar()) - startAt := time.Time{} - if scheduleSpec.GetStartTime() != nil { - startAt = safeAsTime(scheduleSpec.GetStartTime()) - } - - endAt := time.Time{} - if scheduleSpec.GetEndTime() != nil { - endAt = safeAsTime(scheduleSpec.GetEndTime()) - } + startAt := safeAsTime(scheduleSpec.GetStartTime()) + endAt := safeAsTime(scheduleSpec.GetEndTime()) return &ScheduleSpec{ Calendars: calendars, diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index fba4e1891..fd66de217 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -864,7 +864,7 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (taskForWorker, error) metricsHandler := wtp.metricsHandler.WithTags(metrics.WorkflowTags(response.WorkflowType.GetName())) metricsHandler.Counter(metrics.WorkflowTaskQueuePollSucceedCounter).Inc(1) - scheduleToStartLatency := safeAsTime(response.GetStartedTime()).Sub(safeAsTime(response.GetScheduledTime())) + scheduleToStartLatency := response.GetStartedTime().AsTime().Sub(response.GetScheduledTime().AsTime()) metricsHandler.Timer(metrics.WorkflowTaskScheduleToStartLatency).Record(scheduleToStartLatency) return task, nil } @@ -1032,7 +1032,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (taskForWorker, error) activityType := response.ActivityType.GetName() metricsHandler := atp.metricsHandler.WithTags(metrics.ActivityTags(workflowType, activityType, atp.taskQueueName)) - scheduleToStartLatency := safeAsTime(response.GetStartedTime()).Sub(safeAsTime(response.GetCurrentAttemptScheduledTime())) + scheduleToStartLatency := response.GetStartedTime().AsTime().Sub(response.GetCurrentAttemptScheduledTime().AsTime()) metricsHandler.Timer(metrics.ActivityScheduleToStartLatency).Record(scheduleToStartLatency) return &activityTask{task: response}, nil @@ -1103,7 +1103,7 @@ func (atp *activityTaskPoller) ProcessTask(task interface{}) error { if _, ok := request.(*workflowservice.RespondActivityTaskCompletedRequest); ok { activityMetricsHandler. Timer(metrics.ActivitySucceedEndToEndLatency). - Record(time.Since(safeAsTime(activityTask.task.GetScheduledTime()))) + Record(time.Since(activityTask.task.GetScheduledTime().AsTime())) } return nil } diff --git a/internal/internal_versioning_client.go b/internal/internal_versioning_client.go index 24c4fc730..fac00dd68 100644 --- a/internal/internal_versioning_client.go +++ b/internal/internal_versioning_client.go @@ -350,10 +350,7 @@ func pollerInfoFromResponse(response *taskqueuepb.PollerInfo) TaskQueuePollerInf return TaskQueuePollerInfo{} } - lastAccessTime := time.Time{} - if response.GetLastAccessTime() != nil { - lastAccessTime = safeAsTime(response.GetLastAccessTime()) - } + lastAccessTime := safeAsTime(response.GetLastAccessTime()) return TaskQueuePollerInfo{ LastAccessTime: lastAccessTime, diff --git a/internal/internal_versioning_client_test.go b/internal/internal_versioning_client_test.go index e36d0146a..4e7994a7f 100644 --- a/internal/internal_versioning_client_test.go +++ b/internal/internal_versioning_client_test.go @@ -53,7 +53,7 @@ func Test_DetectEnhancedNotSupported_fromProtoResponse(t *testing.T) { func Test_TaskQueueDescription_fromProtoResponse(t *testing.T) { nowProto := timestamppb.Now() - now := safeAsTime(nowProto) + now := nowProto.AsTime() tests := []struct { name string response *workflowservice.DescribeTaskQueueResponse diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 7bc22f553..df687e845 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -2223,12 +2223,12 @@ func (w *workflowClientInterceptor) DescribeWorkflow( var closeTime *time.Time if info.GetCloseTime().IsValid() { - t := safeAsTime(info.GetCloseTime()) + t := info.GetCloseTime().AsTime() closeTime = &t } var executionTime *time.Time if info.GetExecutionTime().IsValid() { - t := safeAsTime(info.GetExecutionTime()) + t := info.GetExecutionTime().AsTime() executionTime = &t } diff --git a/internal/worker_versioning_rules.go b/internal/worker_versioning_rules.go index 50e2cff86..469e2ebe4 100644 --- a/internal/worker_versioning_rules.go +++ b/internal/worker_versioning_rules.go @@ -357,7 +357,7 @@ func versioningAssignmentRuleFromProto(rule *taskqueuepb.BuildIdAssignmentRule, } if timestamp != nil { - result.CreateTime = safeAsTime(timestamp) + result.CreateTime = timestamp.AsTime() } return result } @@ -375,7 +375,7 @@ func versioningRedirectRuleFromProto(rule *taskqueuepb.CompatibleBuildIdRedirect } if timestamp != nil { - result.CreateTime = safeAsTime(timestamp) + result.CreateTime = timestamp.AsTime() } return result } diff --git a/internal/worker_versioning_rules_test.go b/internal/worker_versioning_rules_test.go index c8fa4572b..68859f621 100644 --- a/internal/worker_versioning_rules_test.go +++ b/internal/worker_versioning_rules_test.go @@ -11,7 +11,7 @@ import ( func Test_WorkerVersioningRules_fromProtoGetResponse(t *testing.T) { nowProto := timestamppb.Now() - timestamp := safeAsTime(nowProto) + timestamp := nowProto.AsTime() tests := []struct { name string response *workflowservice.GetWorkerVersioningRulesResponse