Conversation
f1bde3e to
5699b9f
Compare
| if wfState.isWithinStep { | ||
| return 0, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Sleep within a step")) | ||
| } | ||
|
|
There was a problem hiding this comment.
Lifted to pre-sysdb invocation
| if wfState.isWithinStep { | ||
| return nil, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Recv within a step")) | ||
| } |
There was a problem hiding this comment.
lifted to pre-sysdb invocation
| if err != nil { | ||
| c.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID) | ||
| return err | ||
| return newWorkflowExecutionError(workflowID, fmt.Errorf("failed to insert workflow status: %w", err)) |
There was a problem hiding this comment.
This actually is a small breaking change -- wrap the sysdb error into a yet-more-explanatory workflow error. (Hence the changes to use error.Is to look for a wrapped error in the entire error tree.)
dbos/workflow.go
Outdated
| err = retry(uncancellableCtx, func() error { | ||
| return c.systemDB.recordChildWorkflow(uncancellableCtx, childInput) | ||
| }, withRetrierLogger(c.logger)) | ||
| err = c.systemDB.recordChildWorkflow(uncancellableCtx, childInput) |
There was a problem hiding this comment.
Undeeded retry within a larger retry + was buggy because retry cancel was uncancellable.
| if wfState.isWithinStep { | ||
| return newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Send within a step")) | ||
| } |
There was a problem hiding this comment.
Lifted to pre-sysdb invocation
| WITH oldest_entry AS ( | ||
| SELECT destination_uuid, topic, message, created_at_epoch_ms | ||
| FROM %s.notifications | ||
| WHERE destination_uuid = $1 AND topic = $2 | ||
| ORDER BY created_at_epoch_ms ASC | ||
| LIMIT 1 | ||
| ) | ||
| DELETE FROM %s.notifications | ||
| WHERE destination_uuid = (SELECT destination_uuid FROM oldest_entry) | ||
| AND topic = (SELECT topic FROM oldest_entry) | ||
| AND created_at_epoch_ms = (SELECT created_at_epoch_ms FROM oldest_entry) | ||
| RETURNING message`, pgx.Identifier{s.schema}.Sanitize(), pgx.Identifier{s.schema}.Sanitize()) |
There was a problem hiding this comment.
This query can delete more than 1 message if created_at_epoch_ms is within the same millisecond. This has been surfaced by 1) not running send() inside a transaction outside of a workflow and 2) a recent change to the Golang migration where we now have:
created_at_epoch_ms BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now())::numeric * 1000)::bigint,
instead of
created_at_epoch_ms BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000.0)::bigint,
The first line converts the return value of now() (double precision, float8) to a numeric, which result in the *1000 multiplication being exact numeric and quite stable.
The second line (what we have in Python) does the multiplication on double precision, then converts to bigint, which can do a truncation. Because multiplication on floating points can (often) have errors, this meant more volatility in the truncation, which, I think could have contributed to obfuscate this bug.
Move
setEventandsendto be run throughoutrunAsTxn.Ideally we'd run
getEvent,recv, andsleepusingrunAsTxn, but this is currently very challenging because:getEvent/recvupdate the stepID on their own, to generate a step ID forsleep. This complicates the whole logic of checkpointingsleephas its own logic to check whether it executed or not, to become durableAlso:
setEventto be sent within a steprunAsTxnfor CRDBrunAsTxn(default read committed)