@@ -22,6 +22,7 @@ import (
22
22
"fmt"
23
23
"log"
24
24
"net/http"
25
+ "reflect"
25
26
"strconv"
26
27
"sync"
27
28
"time"
@@ -476,6 +477,20 @@ func (f *Flow[In, Out, Stream]) start(ctx context.Context, input In, cb streamin
476
477
return state , nil
477
478
}
478
479
480
+ func isInputMissing (input any ) bool {
481
+ if input == nil {
482
+ return true
483
+ }
484
+ v := reflect .ValueOf (input )
485
+ switch v .Kind () {
486
+ case reflect .Ptr , reflect .Slice , reflect .Map , reflect .Interface , reflect .Chan , reflect .Func :
487
+ return v .IsNil ()
488
+ default :
489
+ // For other types like structs, zero value might be a valid input.
490
+ return false
491
+ }
492
+ }
493
+
479
494
// execute performs one flow execution.
480
495
// Using its flowState argument as a starting point, it runs the flow function until
481
496
// it finishes or is interrupted.
@@ -510,7 +525,22 @@ func (f *Flow[In, Out, Stream]) execute(ctx context.Context, state *flowState[In
510
525
traceID := rootSpanContext .TraceID ().String ()
511
526
exec .TraceIDs = append (exec .TraceIDs , traceID )
512
527
// TODO: Save rootSpanContext in the state.
513
- // TODO: If input is missing, get it from state.input and overwrite metadata.input.
528
+ if isInputMissing (input ) {
529
+ if state == nil {
530
+ return base .Zero [Out ](), errors .New ("input is missing and state is nil" )
531
+ }
532
+ if isInputMissing (state .Input ) {
533
+ return base .Zero [Out ](), errors .New ("input is missing and state.Input is also empty" )
534
+ }
535
+ input = state .Input
536
+
537
+ // Convert input to JSON string for tracing metadata
538
+ bytes , err := json .Marshal (input )
539
+ if err != nil {
540
+ return base .Zero [Out ](), fmt .Errorf ("failed to marshal input for tracing: %w" , err )
541
+ }
542
+ tracing .SetCustomMetadataAttr (ctx , "input" , string (bytes ))
543
+ }
514
544
start := time .Now ()
515
545
var err error
516
546
if err = base .ValidateValue (input , f .inputSchema ); err != nil {
0 commit comments