Skip to content

Commit a10f9ae

Browse files
authored
Unwrap System Nexus Operations in event history (#1017)
## What was changed - `workflow show` (table and detailed modes, not `-o json`) now displays the actual operation name instead of the generic `NexusOperationScheduled`/`NexusOperationCompleted` event type when the Nexus endpoint is `__temporal_system`. - `workflow describe` no longer includes pending Nexus operations on the `__temporal_system` endpoint in the "Pending Nexus Operations" list or count. ## Why? System Nexus operations (endpoint `__temporal_system`) are implementation details of high-level SDK operations like `SignalWithStartWorkflowExecution`. Surfacing them as `NexusOperationScheduled`/`NexusOperationCompleted` events is confusing. By unwrapping the operation name, the history and describe output reflect what the workflow actually did at a semantic level. Similarly, pending system operations in describe are noise the user has no actionable interest in. ## Checklist <!--- add/delete as needed ---> 1. Closes NA 2. How was this tested: - Unit tests added in `commands.workflow_show_test.go` covering all six NexusOperation event types, the non-system-endpoint no-op path, and the missing-scheduled-event (reverse traversal / orphaned completion) path. - Functional tests added to `commands.workflow_view_test.go` to show system operations are unwrapped but user defined are not - Manual validation: ran `temporal workflow show -w <id>` against a live workflow that executes a `SignalWithStartWorkflowExecution` system nexus operation and confirmed events 5 and 6 now show as `SignalWithStartWorkflowExecutionScheduled` and `SignalWithStartWorkflowExecutionCompleted`. 4. Any docs updates needed? No
1 parent 7e5ba38 commit a10f9ae

12 files changed

Lines changed: 1398 additions & 341 deletions

cliext/client.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ type ClientOptionsBuilder struct {
2929
// Logger is the slog logger to use for the client. If set, it will be
3030
// wrapped with the SDK's structured logger adapter.
3131
Logger *slog.Logger
32+
33+
// PayloadCodec is populated by Build when a remote payload codec is
34+
// configured. Callers can use it to decode payloads outside the gRPC
35+
// interceptor chain (e.g. payloads nested inside opaque proto bytes).
36+
PayloadCodec converter.PayloadCodec
3237
}
3338

3439
type oauthCredentials struct {
@@ -248,13 +253,18 @@ func (b *ClientOptionsBuilder) Build(ctx context.Context) (client.Options, error
248253
if err != nil {
249254
return client.Options{}, fmt.Errorf("invalid codec headers: %w", err)
250255
}
251-
interceptor, err := newPayloadCodecInterceptor(
256+
payloadCodec := newRemotePayloadCodec(
252257
profile.Namespace, profile.Codec.Endpoint, profile.Codec.Auth, codecHeaders)
258+
interceptor, err := converter.NewPayloadCodecGRPCClientInterceptor(
259+
converter.PayloadCodecGRPCClientInterceptorOptions{
260+
Codecs: []converter.PayloadCodec{payloadCodec},
261+
})
253262
if err != nil {
254263
return client.Options{}, fmt.Errorf("failed creating payload codec interceptor: %w", err)
255264
}
256265
clientOpts.ConnectionOptions.DialOptions = append(
257266
clientOpts.ConnectionOptions.DialOptions, grpc.WithChainUnaryInterceptor(interceptor))
267+
b.PayloadCodec = payloadCodec
258268
}
259269

260270
// Set connect timeout for GetSystemInfo if provided.
@@ -278,16 +288,17 @@ func parseKeyValuePairs(pairs []string) (map[string]string, error) {
278288
return result, nil
279289
}
280290

281-
// newPayloadCodecInterceptor creates a gRPC interceptor for remote payload codec.
282-
func newPayloadCodecInterceptor(
291+
// newRemotePayloadCodec constructs a remote payload codec from the configured endpoint,
292+
// auth, and headers. The returned codec can be used both inside a gRPC interceptor and
293+
// to decode payloads nested inside opaque proto bytes (e.g. system Nexus operation inputs).
294+
func newRemotePayloadCodec(
283295
namespace string,
284296
codecEndpoint string,
285297
codecAuth string,
286298
codecHeaders map[string]string,
287-
) (grpc.UnaryClientInterceptor, error) {
299+
) converter.PayloadCodec {
288300
codecEndpoint = strings.ReplaceAll(codecEndpoint, "{namespace}", namespace)
289-
290-
payloadCodec := converter.NewRemotePayloadCodec(
301+
return converter.NewRemotePayloadCodec(
291302
converter.RemotePayloadCodecOptions{
292303
Endpoint: codecEndpoint,
293304
ModifyRequest: func(req *http.Request) error {
@@ -302,11 +313,6 @@ func newPayloadCodecInterceptor(
302313
},
303314
},
304315
)
305-
return converter.NewPayloadCodecGRPCClientInterceptor(
306-
converter.PayloadCodecGRPCClientInterceptorOptions{
307-
Codecs: []converter.PayloadCodec{payloadCodec},
308-
},
309-
)
310316
}
311317

312318
func (c *oauthCredentials) getToken(ctx context.Context) (string, error) {

go.mod

Lines changed: 103 additions & 92 deletions
Large diffs are not rendered by default.

go.sum

Lines changed: 238 additions & 223 deletions
Large diffs are not rendered by default.

internal/temporalcli/client.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,17 @@ import (
2121
// so often used by callers after this call to know the currently configured
2222
// namespace.
2323
func dialClient(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, error) {
24+
cl, _, err := dialClientWithCodec(cctx, c)
25+
return cl, err
26+
}
27+
28+
// dialClientWithCodec is like [dialClient] but also returns the configured remote
29+
// payload codec, or nil if no codec is configured. The codec is the same instance
30+
// used by the gRPC interceptor; callers can use it to decode payloads nested inside
31+
// opaque proto bytes (e.g. the request/response of a system Nexus operation).
32+
func dialClientWithCodec(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, converter.PayloadCodec, error) {
2433
if cctx.RootCommand == nil {
25-
return nil, fmt.Errorf("root command unexpectedly missing when dialing client")
34+
return nil, nil, fmt.Errorf("root command unexpectedly missing when dialing client")
2635
}
2736

2837
// Set default identity if not provided
@@ -47,7 +56,7 @@ func dialClient(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, e
4756
}
4857
clientOpts, err := builder.Build(cctx)
4958
if err != nil {
50-
return nil, err
59+
return nil, nil, err
5160
}
5261

5362
// We do not put codec on data converter here, it is applied via
@@ -78,14 +87,14 @@ func dialClient(cctx *CommandContext, c *cliext.ClientOptions) (client.Client, e
7887

7988
cl, err := client.DialContext(dialCtx, clientOpts)
8089
if err != nil {
81-
return nil, err
90+
return nil, nil, err
8291
}
8392

8493
// Since this namespace value is used by many commands after this call,
8594
// we are mutating it to be the derived one
8695
c.Namespace = clientOpts.Namespace
8796

88-
return cl, nil
97+
return cl, builder.PayloadCodec, nil
8998
}
9099

91100
func fixedHeaderOverrideInterceptor(

internal/temporalcli/commands.schedule_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func (s *SharedServerSuite) TestSchedule_List() {
236236
out = res.Stdout.String()
237237
assert.Contains(t, out, schedId)
238238
}, 10*time.Second, time.Second)
239-
s.ContainsOnSameLine(out, schedId, "DevWorkflow", "0s" /*jitter*/, "false", "nil" /*memo*/)
239+
s.ContainsOnSameLine(out, schedId, "DevWorkflow", "0s" /*jitter*/, "false", "{}" /*memo*/)
240240
s.ContainsOnSameLine(out, "TestSchedule_List")
241241

242242
// table
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package temporalcli
2+
3+
import (
4+
"context"
5+
6+
commonpb "go.temporal.io/api/common/v1"
7+
"go.temporal.io/api/proxy"
8+
"go.temporal.io/api/workflowservice/v1"
9+
"go.temporal.io/api/workflowservice/v1/workflowservicenexus"
10+
"go.temporal.io/sdk/converter"
11+
"google.golang.org/protobuf/proto"
12+
)
13+
14+
// systemNexusOpKey identifies a system Nexus operation by its (endpoint, operation) pair.
15+
type systemNexusOpKey struct {
16+
Endpoint string
17+
Operation string
18+
}
19+
20+
// systemNexusOpTypes maps a system Nexus operation to the proto request and response types
21+
// whose bytes are serialized in NexusOperationScheduled.Input and NexusOperationCompleted.Result.
22+
type systemNexusOpTypes struct {
23+
// NewRequest returns a fresh, zero-valued instance of the request proto.
24+
NewRequest func() proto.Message
25+
// NewResponse returns a fresh, zero-valued instance of the response proto.
26+
NewResponse func() proto.Message
27+
}
28+
29+
// systemNexusOps is the global registry of known system Nexus operations on the
30+
// __temporal_system endpoint. Add new entries here as the server adds support for more
31+
// system operations. The keys' Operation values must match what the server records in
32+
// NexusOperationScheduledEventAttributes.Operation.
33+
// NOTE seankane: Part 2 of the System Operations work is to code generate this map from the
34+
// go.temporal.io/api/workflowservice/v1/workflowservicenexus package.
35+
var systemNexusOps = map[systemNexusOpKey]systemNexusOpTypes{
36+
{
37+
Endpoint: temporalSystemNexusEndpoint,
38+
Operation: workflowservicenexus.TemporalAPIWorkflowserviceV1WorkflowService.SignalWithStartWorkflowExecution.Name(),
39+
}: {
40+
NewRequest: func() proto.Message { return &workflowservice.SignalWithStartWorkflowExecutionRequest{} },
41+
NewResponse: func() proto.Message { return &workflowservice.SignalWithStartWorkflowExecutionResponse{} },
42+
},
43+
}
44+
45+
// decodePayloadsInProto walks a proto message and applies codec.Decode to every Payload
46+
// found inside it (including nested messages). The message is mutated in place.
47+
func decodePayloadsInProto(ctx context.Context, msg proto.Message, codec converter.PayloadCodec) error {
48+
return proxy.VisitPayloads(ctx, msg, proxy.VisitPayloadsOptions{
49+
SkipSearchAttributes: true,
50+
Visitor: func(_ *proxy.VisitPayloadsContext, payloads []*commonpb.Payload) ([]*commonpb.Payload, error) {
51+
return codec.Decode(payloads)
52+
},
53+
})
54+
}

0 commit comments

Comments
 (0)