Skip to content
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
f40c0db
refuse sleep/recv execution within a step before calling sysdb
maxdml Jan 31, 2026
a71f0bd
missing retries
maxdml Jan 31, 2026
7723cbf
return proper error
maxdml Jan 31, 2026
a435f58
remove superfluous retry
maxdml Jan 31, 2026
ea6a2ec
nit
maxdml Jan 31, 2026
23d93d8
propagate stepID down a workflow context tree
maxdml Jan 31, 2026
ef4ce90
run setEvent from within runAsTxn
maxdml Jan 31, 2026
1f7bf00
run send with runAsTxn if within a workflow
maxdml Jan 31, 2026
f48cccb
revert
maxdml Jan 31, 2026
4239a22
adjust error parsing
maxdml Feb 2, 2026
4901aaa
update test
maxdml Feb 2, 2026
726015c
handle 40001 when sending during debounce
maxdml Feb 2, 2026
cc0f286
debug
maxdml Feb 2, 2026
8ba8b88
revert
maxdml Feb 2, 2026
a350c45
tests nits
maxdml Feb 2, 2026
3af6da0
retry transaction management in runAsTxn
maxdml Feb 3, 2026
27fdb77
debug
maxdml Feb 3, 2026
c90b34a
should not timeout so use small timeout
maxdml Feb 3, 2026
3891a07
try always using a txn for send
maxdml Feb 3, 2026
5699b9f
more lenient timeout
maxdml Feb 3, 2026
12bab3a
Revert "try always using a txn for send"
maxdml Feb 3, 2026
0102e5c
remove nested retries
maxdml Feb 3, 2026
9543d91
cleanup
maxdml Feb 3, 2026
1844aee
fix
maxdml Feb 3, 2026
dadd479
nit
maxdml Feb 3, 2026
c847897
debug
maxdml Feb 3, 2026
7707b1c
walk the full error tree
maxdml Feb 3, 2026
0b7f444
remove nested retry
maxdml Feb 3, 2026
2d22b04
wrap runAsTxn in retries + add missing retries
maxdml Feb 3, 2026
14dac98
try reverting send changes
maxdml Feb 3, 2026
2ea00d7
increase timeout
maxdml Feb 3, 2026
3d47f31
Revert "try reverting send changes"
maxdml Feb 3, 2026
80344eb
revert the retry changes
maxdml Feb 3, 2026
5055aa4
debug
maxdml Feb 3, 2026
27d714f
REVERT ME
maxdml Feb 3, 2026
f95b92e
DEBUG
maxdml Feb 3, 2026
4a2f40e
DEBUG
maxdml Feb 3, 2026
4ec0450
...
maxdml Feb 3, 2026
8a453fe
DEBUG
maxdml Feb 3, 2026
c838e60
debug
maxdml Feb 3, 2026
36a8ef0
debug
maxdml Feb 3, 2026
1cde6a9
revert
maxdml Feb 3, 2026
7bfa02e
retry must be within runAsTxn so we keep deterministic step IDs
maxdml Feb 3, 2026
e727113
run as step must run a retried function, not be retried
maxdml Feb 3, 2026
adb9cc0
fix
maxdml Feb 3, 2026
3329d59
fix
maxdml Feb 3, 2026
595b1de
cleanup
maxdml Feb 3, 2026
9d6c307
Merge branch 'fix-txn-retries' into refactor-old-special-steps
maxdml Feb 3, 2026
7268e9f
fix
maxdml Feb 3, 2026
15698bb
fix post-merge
maxdml Feb 3, 2026
4763419
reinstate post-merge loss
maxdml Feb 3, 2026
054a0d4
simply retry
maxdml Feb 3, 2026
313f34e
debug
maxdml Feb 3, 2026
a85d175
deug
maxdml Feb 3, 2026
b3f300c
debug
maxdml Feb 3, 2026
24e3d80
debug
maxdml Feb 3, 2026
c9045cc
try always running ina txn
maxdml Feb 3, 2026
61670d7
debug
maxdml Feb 3, 2026
2190665
no txn
maxdml Feb 3, 2026
6356b12
debug
maxdml Feb 4, 2026
9f3876f
less debug
maxdml Feb 4, 2026
2177d6b
prevent multiple deletion colision due to timestamp
maxdml Feb 4, 2026
8451566
cleanup
maxdml Feb 4, 2026
aed8664
Merge branch 'main' into refactor-old-special-steps
maxdml Feb 4, 2026
a28ab94
allow chain of retry conditions + add one for 40001
maxdml Feb 4, 2026
46be9b5
only use repeatable read for resumeWorkflow
maxdml Feb 4, 2026
45d19a2
set isolation level when beginning tx
maxdml Feb 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chaos_tests/chaos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func TestChaosRecv(t *testing.T) {
signals[index].Set()

// Receive from topic with timeout
value, err := dbos.Recv[string](ctx, topic, 10*time.Second)
value, err := dbos.Recv[string](ctx, topic, 10*time.Minute)
if err != nil {
return "", fmt.Errorf("failed to receive: %w", err)
}
Expand Down
11 changes: 9 additions & 2 deletions dbos/debouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/google/uuid"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
)

const _DEBOUNCER_TOPIC = "_dbos_debouncer_topic"
Expand Down Expand Up @@ -177,8 +179,7 @@ func (d *Debouncer[P, R]) Debounce(ctx DBOSContext, key string, delay time.Durat
return newWorkflowPollingHandle[R](ctx, dInput.TargetWorkflowID), nil
}
// A dedup error means the internal debouncer workflow was already started, in which case we should send it the new input
var dbosErr *DBOSError
if errors.As(err, &dbosErr) && dbosErr.Code == QueueDeduplicated {
if errors.Is(err, &DBOSError{Code: QueueDeduplicated}) {
// Identify the ID of the internal debouncer workflow from the dedup error
debouncerWorkflowStatus, err := ListWorkflows(ctx, WithFilterDeduplicationID(key))
if err != nil {
Expand All @@ -196,6 +197,12 @@ func (d *Debouncer[P, R]) Debounce(ctx DBOSContext, key string, delay time.Durat
ID: messageID,
}, _DEBOUNCER_TOPIC)
if err != nil {
// On serialization failure (e.g. REPEATABLE READ conflict with FK on workflow_status),
// re-check debouncer status; if it is no longer PENDING, the debouncer may have finished
// and we can retry the loop (same as when the workflow already exited).
if errors.Is(err, &pgconn.PgError{Code: pgerrcode.SerializationFailure}) {
continue
}
return nil, err
}

Expand Down
9 changes: 2 additions & 7 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,7 @@ func TestWorkflowQueues(t *testing.T) {
require.Error(t, err, "expected ConflictingWorkflowError when enqueueing same workflow ID on different queue, but got none")

// Check that it's the correct error type
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)

assert.Equal(t, ConflictingWorkflowError, dbosErr.Code, "expected error code to be ConflictingWorkflowError")
require.True(t, errors.Is(err, &DBOSError{Code: ConflictingWorkflowError}), "expected error to be ConflictingWorkflowError, got %T", err)

// Check that the error message contains queue information
expectedMsgPart := "Workflow already exists in a different queue"
Expand Down Expand Up @@ -440,9 +437,7 @@ func TestWorkflowQueues(t *testing.T) {
require.Error(t, err, "expected error when enqueueing workflow with same deduplication ID")

// Check that it's the correct error type and message
var dbosErr *DBOSError
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
assert.Equal(t, QueueDeduplicated, dbosErr.Code, "expected error code to be QueueDeduplicated")
require.True(t, errors.Is(err, &DBOSError{Code: QueueDeduplicated}), "expected error to be QueueDeduplicated, got %T", err)

expectedMsgPart := fmt.Sprintf("Workflow %s was deduplicated due to an existing workflow in queue %s with deduplication ID %s", wfid2, dedupQueue.Name, dedupID)
assert.Contains(t, err.Error(), expectedMsgPart, "expected error message to contain deduplication information")
Expand Down
184 changes: 33 additions & 151 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1680,10 +1680,6 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err
return 0, newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
}

if wfState.isWithinStep {
return 0, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Sleep within a step"))
}

Comment on lines -1683 to -1686
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lifted to pre-sysdb invocation

// Determine step ID
var stepID int
if input.stepID != nil && *input.stepID >= 0 {
Expand Down Expand Up @@ -2042,98 +2038,38 @@ type WorkflowSendInput struct {
DestinationID string
Message any
Topic string
tx pgx.Tx
}

// Send is a special type of step that sends a message to another workflow.
// Can be called both within a workflow (as a step) or outside a workflow (directly).
// When called within a workflow: durability and the function run in the same transaction, and we forbid nested step execution
func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error {
functionName := "DBOS.send"

// Get workflow state from context (optional for Send as we can send from outside a workflow)
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
var stepID int
var isInWorkflow bool

if ok && wfState != nil {
isInWorkflow = true
if wfState.isWithinStep {
return newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Send within a step"))
}
Comment on lines -2060 to -2062
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lifted to pre-sysdb invocation

stepID = wfState.nextStepID()
}

if _, ok := input.Message.(*string); !ok {
return fmt.Errorf("message must be a pointer to a string")
}

tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback(ctx)

startTime := time.Now()

// Check if operation was already executed and do nothing if so (only if in workflow)
if isInWorkflow {
checkInput := checkOperationExecutionDBInput{
workflowID: wfState.workflowID,
stepID: stepID,
stepName: functionName,
tx: tx,
}
recordedResult, err := s.checkOperationExecution(ctx, checkInput)
if err != nil {
return err
}
if recordedResult != nil {
// when hitting this case, recordedResult will be &{<nil> <nil>}
return nil
}
}

// Set default topic if not provided
topic := _DBOS_NULL_TOPIC
if len(input.Topic) > 0 {
topic = input.Topic
}

insertQuery := fmt.Sprintf(`INSERT INTO %s.notifications (destination_uuid, topic, message) VALUES ($1, $2, $3)`, pgx.Identifier{s.schema}.Sanitize())
_, err = tx.Exec(ctx, insertQuery, input.DestinationID, topic, input.Message)
var err error
if input.tx != nil {
_, err = input.tx.Exec(ctx, insertQuery, input.DestinationID, topic, input.Message)
} else {
_, err = s.pool.Exec(ctx, insertQuery, input.DestinationID, topic, input.Message)
}
if err != nil {
s.logger.Error("failed to insert notification", "error", err, "query", insertQuery, "destination_id", input.DestinationID, "topic", topic, "message", input.Message)
// Check for foreign key violation (destination workflow doesn't exist)
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == _PG_ERROR_FOREIGN_KEY_VIOLATION {
return newNonExistentWorkflowError(input.DestinationID)
}
return fmt.Errorf("failed to insert notification: %w", err)
}

// Record the operation result if this is called within a workflow
if isInWorkflow {
completedTime := time.Now()
recordInput := recordOperationResultDBInput{
workflowID: wfState.workflowID,
stepID: stepID,
stepName: functionName,
output: nil,
err: nil,
tx: tx,
startedAt: startTime,
completedAt: completedTime,
}

err = s.recordOperationResult(ctx, recordInput)
if err != nil {
return err
}
}

// Commit transaction
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

return nil
}

Expand All @@ -2147,10 +2083,6 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (*string, error) {
return nil, newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
}

if wfState.isWithinStep {
return nil, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Recv within a step"))
}
Comment on lines -2150 to -2152
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lifted to pre-sysdb invocation


stepID := wfState.nextStepID()
sleepStepID := wfState.nextStepID() // We will use a sleep step to implement the timeout
destinationID := wfState.workflowID
Expand Down Expand Up @@ -2261,19 +2193,18 @@ loop:
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback(ctx)
// Use message_uuid so we delete exactly one row; created_at_epoch_ms can match multiple rows when inserts occur in the same millisecond.
query = fmt.Sprintf(`
WITH oldest_entry AS (
SELECT destination_uuid, topic, message, created_at_epoch_ms
FROM %s.notifications
WHERE destination_uuid = $1 AND topic = $2
ORDER BY created_at_epoch_ms ASC
LIMIT 1
)
DELETE FROM %s.notifications
WHERE destination_uuid = (SELECT destination_uuid FROM oldest_entry)
AND topic = (SELECT topic FROM oldest_entry)
AND created_at_epoch_ms = (SELECT created_at_epoch_ms FROM oldest_entry)
RETURNING message`, pgx.Identifier{s.schema}.Sanitize(), pgx.Identifier{s.schema}.Sanitize())
Comment on lines -2265 to -2276
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query can delete more than 1 message if created_at_epoch_ms is within the same millisecond. This has been surfaced by 1) not running send() inside a transaction outside of a workflow and 2) a recent change to the Golang migration where we now have:

created_at_epoch_ms BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now())::numeric * 1000)::bigint,

instead of

created_at_epoch_ms BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000.0)::bigint,

The first line converts the return value of now() (double precision, float8) to a numeric, which result in the *1000 multiplication being exact numeric and quite stable.

The second line (what we have in Python) does the multiplication on double precision, then converts to bigint, which can do a truncation. Because multiplication on floating points can (often) have errors, this meant more volatility in the truncation, which, I think could have contributed to obfuscate this bug.

WITH oldest_entry AS (
SELECT message_uuid, message
FROM %s.notifications
WHERE destination_uuid = $1 AND topic = $2
ORDER BY created_at_epoch_ms ASC
LIMIT 1
)
DELETE FROM %s.notifications
WHERE message_uuid = (SELECT message_uuid FROM oldest_entry)
RETURNING message`, pgx.Identifier{s.schema}.Sanitize(), pgx.Identifier{s.schema}.Sanitize())

var messageString *string
err = tx.QueryRow(ctx, query, destinationID, topic).Scan(&messageString)
Expand Down Expand Up @@ -2316,61 +2247,35 @@ loop:
type WorkflowSetEventInput struct {
Key string
Message any
tx pgx.Tx
}

func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error {
functionName := "DBOS.setEvent"

// Get workflow state from context
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
if !ok || wfState == nil {
return newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
return newStepExecutionError("", "DBOS.setEvent", fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
}

if _, ok := input.Message.(*string); !ok {
return fmt.Errorf("message must be a pointer to a string")
}

if wfState.isWithinStep {
return newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call SetEvent within a step"))
}

stepID := wfState.nextStepID()

startTime := time.Now()

tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback(ctx)

// Check if operation was already executed and do nothing if so
checkInput := checkOperationExecutionDBInput{
workflowID: wfState.workflowID,
stepID: stepID,
stepName: functionName,
tx: tx,
}
recordedResult, err := s.checkOperationExecution(ctx, checkInput)
if err != nil {
return err
}
if recordedResult != nil {
// when hitting this case, recordedResult will be &{<nil> <nil>}
return nil
}

// input.Message is already encoded *string from the typed layer
// Insert or update the event using UPSERT
insertQuery := fmt.Sprintf(`INSERT INTO %s.workflow_events (workflow_uuid, key, value)
VALUES ($1, $2, $3)
ON CONFLICT (workflow_uuid, key)
DO UPDATE SET value = EXCLUDED.value`, pgx.Identifier{s.schema}.Sanitize())

_, err = tx.Exec(ctx, insertQuery, wfState.workflowID, input.Key, input.Message)
var err error
if input.tx != nil {
_, err = input.tx.Exec(ctx, insertQuery, wfState.workflowID, input.Key, input.Message)
} else {
_, err = s.pool.Exec(ctx, insertQuery, wfState.workflowID, input.Key, input.Message)
}
if err != nil {
return fmt.Errorf("failed to insert/update workflow event: %w", err)
return fmt.Errorf("failed to insert event: %w", err)
}

// Record event in workflow_events_history
Expand All @@ -2379,35 +2284,12 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error
ON CONFLICT (workflow_uuid, function_id, key)
DO UPDATE SET value = EXCLUDED.value`, pgx.Identifier{s.schema}.Sanitize())

_, err = tx.Exec(ctx, insertHistoryQuery, wfState.workflowID, stepID, input.Key, input.Message)
if err != nil {
return fmt.Errorf("failed to insert workflow event history: %w", err)
}

// Record the operation result
completedTime := time.Now()
recordInput := recordOperationResultDBInput{
workflowID: wfState.workflowID,
stepID: stepID,
stepName: functionName,
output: nil,
err: nil,
tx: tx,
startedAt: startTime,
completedAt: completedTime,
}

err = s.recordOperationResult(ctx, recordInput)
if err != nil {
return err
}

// Commit transaction
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
if input.tx != nil {
_, err = input.tx.Exec(ctx, insertHistoryQuery, wfState.workflowID, wfState.stepID, input.Key, input.Message)
} else {
_, err = s.pool.Exec(ctx, insertHistoryQuery, wfState.workflowID, wfState.stepID, input.Key, input.Message)
}

return nil
return err
}

func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (*string, error) {
Expand Down
Loading
Loading