Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ Object recv(
if (!checkedDBForSleep) {
// Support OAOO sleep
actualTimeout =
StepsDAO.sleep(
dataSource, workflowUuid, timeoutFunctionId, timeout, true, this.schema)
StepsDAO.durableSleepDuration(
dataSource, workflowUuid, timeoutFunctionId, timeout, this.schema)
.toMillis();
checkedDBForSleep = true;
targetTime = nowTime + actualTimeout;
Expand Down Expand Up @@ -426,12 +426,11 @@ Object getEvent(
// Consult DB - part of timeout may have expired if sleep is durable.
if (callerCtx != null & !checkedDBForSleep) {
actualTimeout =
StepsDAO.sleep(
StepsDAO.durableSleepDuration(
dataSource,
callerCtx.workflowId(),
callerCtx.timeoutFunctionId(),
timeout,
true, // skip_sleep
this.schema)
.toMillis();
targetTime = System.currentTimeMillis() + actualTimeout;
Expand Down
46 changes: 16 additions & 30 deletions transact/src/main/java/dev/dbos/transact/database/StepsDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static void recordStepResultTxn(
throw new IllegalStateException("Database is closed!");
}

try (Connection connection = dataSource.getConnection(); ) {
try (Connection connection = dataSource.getConnection()) {
recordStepResultTxn(result, startTimeEpochMs, endTimeEpochMs, connection, schema);
}
DebugTriggers.debugTriggerPoint(DebugTriggers.DEBUG_TRIGGER_STEP_COMMIT);
Expand Down Expand Up @@ -270,17 +270,23 @@ List<StepInfo> listWorkflowSteps(String workflowId) throws SQLException {
return steps;
}

Duration sleep(String workflowUuid, int functionId, Duration duration, boolean skipSleep)
throws SQLException {
return StepsDAO.sleep(dataSource, workflowUuid, functionId, duration, skipSleep, this.schema);
void sleep(String workflowUuid, int functionId, Duration duration) throws SQLException {
var sleepDuration =
StepsDAO.durableSleepDuration(dataSource, workflowUuid, functionId, duration, this.schema);
logger.debug("Sleeping for duration {}", sleepDuration);
try {
Thread.sleep(sleepDuration.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Sleep was interrupted for workflow " + workflowUuid, e);
}
}

static Duration sleep(
static Duration durableSleepDuration(
HikariDataSource dataSource,
String workflowUuid,
int functionId,
Duration duration,
boolean skipSleep,
String schema)
throws SQLException {

Expand All @@ -299,16 +305,15 @@ static Duration sleep(
checkStepExecutionTxn(workflowUuid, functionId, functionName, connection, schema);
}

double endTime;

long endTime;
if (recordedOutput != null) {
logger.debug(
"Replaying sleep, workflow {}, id: {}, duration: {}", workflowUuid, functionId, duration);
if (recordedOutput.output() == null) {
throw new IllegalStateException("No recorded timeout for sleep");
}
Object[] dser = JSONUtil.deserializeToArray(recordedOutput.output());
endTime = (Double) dser[0];
endTime = (long) dser[0];
} else {
logger.debug(
"Running sleep, workflow {}, id: {}, duration: {}", workflowUuid, functionId, duration);
Expand All @@ -324,26 +329,7 @@ static Duration sleep(
}
}

double currentTime = System.currentTimeMillis();
double durationms = Math.max(0, endTime - currentTime);

logger.debug(
"sleep, endTime {}, currentTime: {}, durationMS: {}, skip: {}",
endTime,
currentTime,
durationms,
skipSleep);

if (!skipSleep) {
try {
logger.debug("Sleeping for duration {}", duration);
Thread.sleep((long) (durationms));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Sleep interrupted", e);
}
}

return Duration.ofMillis((long) durationms);
var durationms = Math.max(0, endTime - System.currentTimeMillis());
return Duration.ofMillis(durationms);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ public Object getEvent(
});
}

public Duration sleep(String workflowId, int functionId, Duration duration, boolean skipSleep) {
return DbRetry.call(
public void sleep(String workflowId, int functionId, Duration duration) {
DbRetry.run(
() -> {
return stepsDAO.sleep(workflowId, functionId, duration, skipSleep);
stepsDAO.sleep(workflowId, functionId, duration);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public class DBOSExecutor implements AutoCloseable {
private SystemDatabase systemDatabase;
private QueueService queueService;
private SchedulerService schedulerService;
private RecoveryService recoveryService;
private AdminServer adminServer;
private Conductor conductor;
private ExecutorService executorService;
Expand Down Expand Up @@ -157,8 +156,19 @@ public void start(
listener.dbosLaunched();
}

recoveryService = new RecoveryService(this, systemDatabase);
recoveryService.start();
var recoveryTask =
new Runnable() {
@Override
public void run() {
try {
recoverPendingWorkflows(List.of(executorId()));
} catch (Throwable t) {
logger.error("Recovery task failed", t);
}
}
};

executorService.submit(recoveryTask);

String conductorKey = config.conductorKey();
if (conductorKey != null) {
Expand Down Expand Up @@ -201,9 +211,6 @@ public void close() {
conductor = null;
}

recoveryService.stop();
recoveryService = null;

shutdownLifecycleListeners();

queueService.stop();
Expand Down Expand Up @@ -294,47 +301,48 @@ public Optional<Queue> getQueue(String queueName) {
String workflowId = Objects.requireNonNull(output.workflowId(), "workflowId must not be null");
String queue = output.queueName();

logger.debug("Recovery executing workflow {}", workflowId);
logger.debug("recoverWorkflow {}", workflowId);

if (queue != null) {
boolean cleared = systemDatabase.clearQueueAssignment(workflowId);
if (cleared) {
logger.debug(
"recoverWorkflow {} queue assignment {}",
workflowId,
cleared ? "cleared" : "not cleared");
return retrieveWorkflow(workflowId);
}
}
return executeWorkflowById(workflowId, true, false);
}

public List<WorkflowHandle<?, ?>> recoverPendingWorkflows(List<String> executorIDs) {
if (executorIDs == null) {
executorIDs = new ArrayList<>(List.of("local"));
}

public List<WorkflowHandle<?, ?>> recoverPendingWorkflows(List<String> executorIds) {
Objects.requireNonNull(executorIds);
String appVersion = appVersion();

List<WorkflowHandle<?, ?>> handles = new ArrayList<>();
for (String executorId : executorIDs) {
for (String executorId : executorIds) {
List<GetPendingWorkflowsOutput> pendingWorkflows;
try {
pendingWorkflows = systemDatabase.getPendingWorkflows(executorId, appVersion);
pendingWorkflows = systemDatabase.getPendingWorkflows(executorId, appVersion());
} catch (Exception e) {
logger.error(
"Failed to get pending workflows for executor {} and application version {}",
"getPendingWorkflows failed: executor {}, application version {}",
executorId,
appVersion,
e);
return new ArrayList<>();
}
logger.debug(
"Recovering {} workflow(s) for executor {} and application version {}",
logger.info(
"Recovering {} workflows for executor {} app version {}",
pendingWorkflows.size(),
executorId,
appVersion);
for (GetPendingWorkflowsOutput output : pendingWorkflows) {
executorId(),
appVersion());
for (var output : pendingWorkflows) {
try {
handles.add(recoverWorkflow(output));
} catch (Exception e) {
logger.warn("Recovery of workflow {} failed", output.workflowId(), e);
} catch (Throwable t) {
logger.error("Workflow {} recovery failed", output.workflowId(), t);
}
}
}
Expand Down Expand Up @@ -435,7 +443,8 @@ private <T, E extends Exception> T handleExistingResult(StepResult result, Strin
throw new RuntimeException(t.getMessage(), t);
}
} else {
// Note that this shouldn't happen because the result is always wrapped in an array, making
// Note that this shouldn't happen because the result is always wrapped in an
// array, making
// output not null.
throw new IllegalStateException(
String.format("Recorded output and error are both null for %s", functionName));
Expand Down Expand Up @@ -560,8 +569,7 @@ public void sleep(Duration duration) {
throw new IllegalStateException("sleep() must be called from within a workflow");
}

systemDatabase.sleep(
context.getWorkflowId(), context.getAndIncrementFunctionId(), duration, false);
systemDatabase.sleep(context.getWorkflowId(), context.getAndIncrementFunctionId(), duration);
}

public <T, E extends Exception> WorkflowHandle<T, E> resumeWorkflow(String workflowId) {
Expand Down
Loading
Loading