diff --git a/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java b/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java index c139532f..5d08ae22 100644 --- a/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java @@ -190,8 +190,8 @@ Object recv( if (!checkedDBForSleep) { // Support OAOO sleep actualTimeout = - StepsDAO.sleep( - dataSource, workflowUuid, timeoutFunctionId, timeout, true, this.schema) + StepsDAO.durableSleepDuration( + dataSource, workflowUuid, timeoutFunctionId, timeout, this.schema) .toMillis(); checkedDBForSleep = true; targetTime = nowTime + actualTimeout; @@ -426,12 +426,11 @@ Object getEvent( // Consult DB - part of timeout may have expired if sleep is durable. if (callerCtx != null & !checkedDBForSleep) { actualTimeout = - StepsDAO.sleep( + StepsDAO.durableSleepDuration( dataSource, callerCtx.workflowId(), callerCtx.timeoutFunctionId(), timeout, - true, // skip_sleep this.schema) .toMillis(); targetTime = System.currentTimeMillis() + actualTimeout; diff --git a/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java b/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java index d2c3d084..99252960 100644 --- a/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java @@ -41,7 +41,7 @@ static void recordStepResultTxn( throw new IllegalStateException("Database is closed!"); } - try (Connection connection = dataSource.getConnection(); ) { + try (Connection connection = dataSource.getConnection()) { recordStepResultTxn(result, startTimeEpochMs, endTimeEpochMs, connection, schema); } DebugTriggers.debugTriggerPoint(DebugTriggers.DEBUG_TRIGGER_STEP_COMMIT); @@ -270,17 +270,23 @@ List listWorkflowSteps(String workflowId) throws SQLException { return steps; } - Duration sleep(String workflowUuid, int functionId, Duration duration, boolean skipSleep) - throws SQLException { - return StepsDAO.sleep(dataSource, workflowUuid, functionId, duration, skipSleep, this.schema); + void sleep(String workflowUuid, int functionId, Duration duration) throws SQLException { + var sleepDuration = + StepsDAO.durableSleepDuration(dataSource, workflowUuid, functionId, duration, this.schema); + logger.debug("Sleeping for duration {}", sleepDuration); + try { + Thread.sleep(sleepDuration.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Sleep was interrupted for workflow " + workflowUuid, e); + } } - static Duration sleep( + static Duration durableSleepDuration( HikariDataSource dataSource, String workflowUuid, int functionId, Duration duration, - boolean skipSleep, String schema) throws SQLException { @@ -299,8 +305,7 @@ static Duration sleep( checkStepExecutionTxn(workflowUuid, functionId, functionName, connection, schema); } - double endTime; - + long endTime; if (recordedOutput != null) { logger.debug( "Replaying sleep, workflow {}, id: {}, duration: {}", workflowUuid, functionId, duration); @@ -308,7 +313,7 @@ static Duration sleep( throw new IllegalStateException("No recorded timeout for sleep"); } Object[] dser = JSONUtil.deserializeToArray(recordedOutput.output()); - endTime = (Double) dser[0]; + endTime = (long) dser[0]; } else { logger.debug( "Running sleep, workflow {}, id: {}, duration: {}", workflowUuid, functionId, duration); @@ -324,26 +329,7 @@ static Duration sleep( } } - double currentTime = System.currentTimeMillis(); - double durationms = Math.max(0, endTime - currentTime); - - logger.debug( - "sleep, endTime {}, currentTime: {}, durationMS: {}, skip: {}", - endTime, - currentTime, - durationms, - skipSleep); - - if (!skipSleep) { - try { - logger.debug("Sleeping for duration {}", duration); - Thread.sleep((long) (durationms)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Sleep interrupted", e); - } - } - - return Duration.ofMillis((long) durationms); + var durationms = Math.max(0, endTime - System.currentTimeMillis()); + return Duration.ofMillis(durationms); } } diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index c1eb0a8e..655fa091 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -273,10 +273,10 @@ public Object getEvent( }); } - public Duration sleep(String workflowId, int functionId, Duration duration, boolean skipSleep) { - return DbRetry.call( + public void sleep(String workflowId, int functionId, Duration duration) { + DbRetry.run( () -> { - return stepsDAO.sleep(workflowId, functionId, duration, skipSleep); + stepsDAO.sleep(workflowId, functionId, duration); }); } diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index a116e953..e7db5afa 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -78,7 +78,6 @@ public class DBOSExecutor implements AutoCloseable { private SystemDatabase systemDatabase; private QueueService queueService; private SchedulerService schedulerService; - private RecoveryService recoveryService; private AdminServer adminServer; private Conductor conductor; private ExecutorService executorService; @@ -157,8 +156,19 @@ public void start( listener.dbosLaunched(); } - recoveryService = new RecoveryService(this, systemDatabase); - recoveryService.start(); + var recoveryTask = + new Runnable() { + @Override + public void run() { + try { + recoverPendingWorkflows(List.of(executorId())); + } catch (Throwable t) { + logger.error("Recovery task failed", t); + } + } + }; + + executorService.submit(recoveryTask); String conductorKey = config.conductorKey(); if (conductorKey != null) { @@ -201,9 +211,6 @@ public void close() { conductor = null; } - recoveryService.stop(); - recoveryService = null; - shutdownLifecycleListeners(); queueService.stop(); @@ -294,47 +301,48 @@ public Optional getQueue(String queueName) { String workflowId = Objects.requireNonNull(output.workflowId(), "workflowId must not be null"); String queue = output.queueName(); - logger.debug("Recovery executing workflow {}", workflowId); + logger.debug("recoverWorkflow {}", workflowId); if (queue != null) { boolean cleared = systemDatabase.clearQueueAssignment(workflowId); if (cleared) { + logger.debug( + "recoverWorkflow {} queue assignment {}", + workflowId, + cleared ? "cleared" : "not cleared"); return retrieveWorkflow(workflowId); } } return executeWorkflowById(workflowId, true, false); } - public List> recoverPendingWorkflows(List executorIDs) { - if (executorIDs == null) { - executorIDs = new ArrayList<>(List.of("local")); - } - + public List> recoverPendingWorkflows(List executorIds) { + Objects.requireNonNull(executorIds); String appVersion = appVersion(); List> handles = new ArrayList<>(); - for (String executorId : executorIDs) { + for (String executorId : executorIds) { List pendingWorkflows; try { - pendingWorkflows = systemDatabase.getPendingWorkflows(executorId, appVersion); + pendingWorkflows = systemDatabase.getPendingWorkflows(executorId, appVersion()); } catch (Exception e) { logger.error( - "Failed to get pending workflows for executor {} and application version {}", + "getPendingWorkflows failed: executor {}, application version {}", executorId, appVersion, e); return new ArrayList<>(); } - logger.debug( - "Recovering {} workflow(s) for executor {} and application version {}", + logger.info( + "Recovering {} workflows for executor {} app version {}", pendingWorkflows.size(), - executorId, - appVersion); - for (GetPendingWorkflowsOutput output : pendingWorkflows) { + executorId(), + appVersion()); + for (var output : pendingWorkflows) { try { handles.add(recoverWorkflow(output)); - } catch (Exception e) { - logger.warn("Recovery of workflow {} failed", output.workflowId(), e); + } catch (Throwable t) { + logger.error("Workflow {} recovery failed", output.workflowId(), t); } } } @@ -435,7 +443,8 @@ private T handleExistingResult(StepResult result, Strin throw new RuntimeException(t.getMessage(), t); } } else { - // Note that this shouldn't happen because the result is always wrapped in an array, making + // Note that this shouldn't happen because the result is always wrapped in an + // array, making // output not null. throw new IllegalStateException( String.format("Recorded output and error are both null for %s", functionName)); @@ -560,8 +569,7 @@ public void sleep(Duration duration) { throw new IllegalStateException("sleep() must be called from within a workflow"); } - systemDatabase.sleep( - context.getWorkflowId(), context.getAndIncrementFunctionId(), duration, false); + systemDatabase.sleep(context.getWorkflowId(), context.getAndIncrementFunctionId(), duration); } public WorkflowHandle resumeWorkflow(String workflowId) { diff --git a/transact/src/main/java/dev/dbos/transact/execution/QueueService.java b/transact/src/main/java/dev/dbos/transact/execution/QueueService.java index fd570d70..9ef7a819 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/QueueService.java +++ b/transact/src/main/java/dev/dbos/transact/execution/QueueService.java @@ -1,14 +1,17 @@ package dev.dbos.transact.execution; -import static java.lang.Math.max; -import static java.lang.Math.min; - import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.workflow.Queue; +import java.time.Duration; import java.util.List; -import java.util.concurrent.CountDownLatch; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,14 +20,11 @@ public class QueueService { private static final Logger logger = LoggerFactory.getLogger(QueueService.class); - private List queues; + private final AtomicReference scheduler = new AtomicReference<>(); + private final AtomicBoolean paused = new AtomicBoolean(false); private SystemDatabase systemDatabase; private DBOSExecutor dbosExecutor; - private volatile boolean running = false; - private volatile boolean paused = false; - private Thread workerThread; - private CountDownLatch shutdownLatch; private double speedup = 1.0; public QueueService(DBOSExecutor dbosExecutor, SystemDatabase systemDatabase) { @@ -36,146 +36,123 @@ public void setSpeedupForTest() { speedup = 0.01; } - @SuppressWarnings("deprecation") // Thread.currentThread().getId() - private void pollForWorkflows() { - logger.debug("PollQueuesThread started {}", Thread.currentThread().getId()); - - double pollingIntervalSec = 1.0; - double minPollingIntervalSec = 1.0; - double maxPollingIntervalSec = 120.0; - double randomSleepFactor = 0; - - try { - - while (running) { - - randomSleepFactor = (0.95 + ThreadLocalRandom.current().nextDouble(0.1)); - - try { - Thread.sleep((long) (randomSleepFactor * pollingIntervalSec * 1000 * speedup)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("QueuesPollThread interrupted while sleeping"); - running = false; - } - - if (!running) { - // check again after sleep - break; - } - - if (paused) { - continue; - } - - for (Queue queue : queues) { - try { - if (queue.partitionedEnabled()) { - var partitions = systemDatabase.getQueuePartitions(queue.name()); - for (var partition : partitions) { - var workflowIds = - systemDatabase.getAndStartQueuedWorkflows( - queue, dbosExecutor.executorId(), dbosExecutor.appVersion(), partition); - for (var workflowId : workflowIds) { - logger.debug( - "Starting workflow {} from partition {} of queue {}", - workflowId, - partition, - queue.name()); - dbosExecutor.executeWorkflowById(workflowId, false, true); - } - } - } else { - var workflowIds = - systemDatabase.getAndStartQueuedWorkflows( - queue, dbosExecutor.executorId(), dbosExecutor.appVersion(), null); - for (var workflowId : workflowIds) { - logger.debug("Starting workflow {} from queue {}", workflowId, queue.name()); - dbosExecutor.executeWorkflowById(workflowId, false, true); - } - } - } catch (Exception e) { - pollingIntervalSec = min(maxPollingIntervalSec, pollingIntervalSec * 2); - logger.error("Error executing queued workflow", e); - } - } + public synchronized void pause() { + paused.set(true); + } - pollingIntervalSec = max(minPollingIntervalSec, pollingIntervalSec * 0.9); - } + public synchronized void unpause() { + paused.set(false); + } - } finally { - shutdownLatch.countDown(); - logger.debug("QueuesPollThread {} has ended. Exiting", Thread.currentThread().getId()); + public void start(List queues) { + if (this.scheduler.get() == null) { + var scheduler = Executors.newScheduledThreadPool(4); + if (this.scheduler.compareAndSet(null, scheduler)) { + startQueueListeners(queues); + } } } - public synchronized void pause() { - this.paused = true; + public void stop() { + var scheduler = this.scheduler.getAndSet(null); + if (scheduler != null) { + var notRun = scheduler.shutdownNow(); + logger.debug("Shutting down queue service. Tasks not run {}", notRun.size()); + } } - public synchronized void unpause() { - this.paused = false; + public boolean isStopped() { + return this.scheduler.get() == null; } - public synchronized void start(List queues) { + private void startQueueListeners(List queues) { + logger.debug("startQueueListeners"); - this.queues = queues; + final var executorId = dbosExecutor.executorId(); + final var appVersion = dbosExecutor.appVersion(); + final Duration minPollingInterval = Duration.ofSeconds(1); + final Duration maxPollingInterval = Duration.ofSeconds(120); - if (running) { - logger.warn("QueuesPollThread is already running."); - return; - } + for (var _queue : queues) { - running = true; - shutdownLatch = new CountDownLatch(1); - workerThread = new Thread(this::pollForWorkflows, "QueuesPollThread"); - workerThread.setDaemon(true); - workerThread.start(); - logger.debug("QueuesPollThread started."); - } + var task = + new Runnable() { + final Queue queue = _queue; + Duration pollingInterval = Duration.ofSeconds(1); - public synchronized void stop() { - logger.debug("stop() called"); + public void schedule() { + var randomSleepFactor = 0.95 + ThreadLocalRandom.current().nextDouble(0.1); + var delayMs = (long) (randomSleepFactor * pollingInterval.toMillis() * speedup); + var localScheduler = scheduler.get(); + if (localScheduler != null) { + localScheduler.schedule(this, delayMs, TimeUnit.MILLISECONDS); + } + } - if (!running) { - logger.warn("QueuesPollThread is not running."); - return; - } - running = false; - if (workerThread != null) { - try { - workerThread.join(100); - // Adding a latch so stop is absolute and there is no race condition for - // tests - shutdownLatch.await(); // timeout ? - if (workerThread.isAlive()) { - logger.warn( - "QueuePollThread did not stop gracefully. It might be stuck. Interrupting..."); - workerThread.interrupt(); // Interrupt if it's still alive after join - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); // Restore interrupt status - logger.warn("Interrupted QueuesPollThread", e); - } finally { - workerThread = null; - } - } + private void processPartition(String partition) { + var partitionLog = Objects.requireNonNullElse(partition, ""); + var workflowIds = + systemDatabase.getAndStartQueuedWorkflows( + queue, executorId, appVersion, partition); + if (workflowIds.size() > 0) { + logger.debug( + "Retrieved {} workflows from {} partition of queue {}", + workflowIds.size(), + partitionLog, + queue.name()); + } + for (var workflowId : workflowIds) { + logger.debug( + "Starting workflow {} from {} partition of queue {}", + workflowId, + partitionLog, + queue.name()); + dbosExecutor.executeWorkflowById(workflowId, false, true); + } + } - this.queues = null; - logger.debug("QueuePollThread stopped."); - } + @Override + public void run() { + // if scheduler service isn't running, the queue service was stopped so don't start + // the workflow or schedule the next execution + if (scheduler.get() == null) { + return; + } + + try { + if (paused.get()) { + pollingInterval = minPollingInterval; + return; + } + + if (queue.partitionedEnabled()) { + var partitions = systemDatabase.getQueuePartitions(queue.name()); + for (var partition : partitions) { + processPartition(partition); + } + } else { + processPartition(null); + } + + pollingInterval = Duration.ofMillis((long) (pollingInterval.toMillis() * 0.9)); + pollingInterval = + pollingInterval.compareTo(minPollingInterval) >= 0 + ? pollingInterval + : minPollingInterval; + } catch (Exception e) { + logger.error("Error executing queued workflow(s) for queue {}", queue.name(), e); + pollingInterval = pollingInterval.multipliedBy(2); + pollingInterval = + pollingInterval.compareTo(maxPollingInterval) <= 0 + ? pollingInterval + : maxPollingInterval; + } finally { + this.schedule(); + } + } + }; - public synchronized boolean isStopped() { - // If the workerThread reference is null, it implies it hasn't started or has - // been fully cleaned - // up. - if (workerThread == null) { - return true; + task.schedule(); } - // The most definitive check: if the latch has counted down to zero, the - // worker's run() method - // has completed. - // We also check !workerThread.isAlive() as a final confirmation. - return shutdownLatch != null && shutdownLatch.getCount() == 0 && !workerThread.isAlive(); } } diff --git a/transact/src/main/java/dev/dbos/transact/execution/RecoveryService.java b/transact/src/main/java/dev/dbos/transact/execution/RecoveryService.java deleted file mode 100644 index 37ee2ff4..00000000 --- a/transact/src/main/java/dev/dbos/transact/execution/RecoveryService.java +++ /dev/null @@ -1,128 +0,0 @@ -package dev.dbos.transact.execution; - -import dev.dbos.transact.database.SystemDatabase; -import dev.dbos.transact.exceptions.DBOSWorkflowFunctionNotFoundException; -import dev.dbos.transact.workflow.internal.GetPendingWorkflowsOutput; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RecoveryService { - - private final SystemDatabase systemDatabase; - private final DBOSExecutor dbosExecutor; - private static final Logger logger = LoggerFactory.getLogger(RecoveryService.class); - - private volatile boolean stopRequested = false; - private Thread recoveryThread; - - public RecoveryService(DBOSExecutor dbosExecutor, SystemDatabase systemDatabase) { - this.systemDatabase = systemDatabase; - this.dbosExecutor = dbosExecutor; - } - - /** - * Starts the background recovery thread for startup workflow recovery. This method will attempt - * to recover pending workflows in a separate thread. - */ - public void start() { - if (recoveryThread != null && recoveryThread.isAlive()) { - logger.warn("Recovery thread is already running"); - return; - } - - List workflows = - systemDatabase.getPendingWorkflows(dbosExecutor.executorId(), dbosExecutor.appVersion()); - - if (workflows.size() > 0) { - logger.info( - "Recovering {} workflows for application version {}", - workflows.size(), - dbosExecutor.appVersion()); - } else { - logger.info("No workflows to recover for application version {}", dbosExecutor.appVersion()); - } - - final List toRecover = workflows; - stopRequested = false; - recoveryThread = new Thread(() -> startupRecoveryThread(toRecover), "RecoveryService-Thread"); - recoveryThread.setDaemon(true); - recoveryThread.start(); - logger.debug("Recovery service started"); - } - - /** - * Stops the background recovery thread. This method will signal the thread to stop and wait for - * it to complete. - */ - public void stop() { - logger.debug("stop() called"); - - stopRequested = true; - if (recoveryThread != null) { - try { - recoveryThread.join(5000); // Wait up to 5 seconds for thread to stop - if (recoveryThread.isAlive()) { - logger.warn("Recovery thread did not stop within timeout"); - recoveryThread.interrupt(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.warn("Interrupted while stopping recovery thread", e); - } - } - logger.debug("Recovery service stopped"); - } - - /** - * Background thread method that attempts to recover local pending workflows on startup. This - * method runs continuously until stop is requested or all workflows are recovered. - */ - private void startupRecoveryThread(List wToRecover) { - try { - List pendingWorkflows = new CopyOnWriteArrayList<>(wToRecover); - - logger.debug("Starting recovery thread {} ", pendingWorkflows.size()); - - while (!stopRequested && !pendingWorkflows.isEmpty()) { - try { - // Create a copy to iterate over to avoid concurrent modification - List currentPending = new ArrayList<>(pendingWorkflows); - - for (GetPendingWorkflowsOutput pendingWorkflow : currentPending) { - if (stopRequested) { - break; - } - dbosExecutor.recoverWorkflow(pendingWorkflow); - pendingWorkflows.remove(pendingWorkflow); - } - } catch (DBOSWorkflowFunctionNotFoundException e) { - logger.debug("Workflow function not found during recovery, retrying in 1 second", e); - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - logger.warn("Recovery thread interrupted during sleep"); - break; - } - } catch (Exception e) { - logger.error("Exception encountered when recovering workflows", e); - throw e; - } - } - - if (!stopRequested && pendingWorkflows.isEmpty()) { - logger.debug("All pending workflows recovered successfully"); - } - - } catch (Exception e) { - logger.error("Unexpected error during workflow recovery", e); - } finally { - logger.debug("Exiting recovery thread "); - } - } -} diff --git a/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java b/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java index ce6111c7..462628e4 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java +++ b/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java @@ -18,6 +18,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import com.cronutils.model.Cron; import com.cronutils.model.CronType; @@ -29,14 +30,12 @@ public class SchedulerService implements DBOSLifecycleListener { - private ScheduledExecutorService scheduler; - private static final Logger logger = LoggerFactory.getLogger(SchedulerService.class); private static final CronParser cronParser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.SPRING53)); - private volatile boolean stop = false; private final String schedulerQueueName; + private final AtomicReference scheduler = new AtomicReference<>(); public SchedulerService(String defSchedulerQueue) { this.schedulerQueueName = Objects.requireNonNull(defSchedulerQueue); @@ -61,52 +60,56 @@ public static void validateScheduledWorkflow(RegisteredWorkflow workflow) { } public void dbosLaunched() { - scheduler = Executors.newScheduledThreadPool(4); - startScheduledWorkflows(); - stop = false; + if (this.scheduler.get() == null) { + var scheduler = Executors.newScheduledThreadPool(4); + if (this.scheduler.compareAndSet(null, scheduler)) { + startScheduledWorkflows(); + } + } } public void dbosShutDown() { - stop = true; - List notRun = scheduler.shutdownNow(); - logger.debug("Shutting down scheduler service. Tasks not run {}", notRun.size()); - scheduler = null; + var scheduler = this.scheduler.getAndSet(null); + if (scheduler != null) { + List notRun = scheduler.shutdownNow(); + logger.debug("Shutting down scheduler service. Tasks not run {}", notRun.size()); + } } record ScheduledWorkflow( RegisteredWorkflow workflow, Cron cron, String queue, boolean ignoreMissed) {} - private ZonedDateTime getNextTime(ScheduledWorkflow swf) { - ZonedDateTime now = null; - if (swf.ignoreMissed()) { - now = ZonedDateTime.now().withNano(0); - } else { - var extstate = + private ZonedDateTime getLastTime(ScheduledWorkflow swf) { + if (!swf.ignoreMissed()) { + var state = DBOS.getExternalState( "DBOS.SchedulerService", swf.workflow().fullyQualifiedName(), "lastTime"); - if (extstate.isPresent()) { - now = ZonedDateTime.parse(extstate.get().value()); - } else { - now = ZonedDateTime.now(ZoneOffset.UTC).withNano(0); + if (state.isPresent()) { + return ZonedDateTime.parse(state.get().value()); } } - return now; + return ZonedDateTime.now(ZoneOffset.UTC).withNano(0); } private ZonedDateTime setLastTime(ScheduledWorkflow swf, ZonedDateTime lastTime) { - var nes = - new ExternalState( - "DBOS.SchedulerService", - swf.workflow().fullyQualifiedName(), - "lastTime", - lastTime.toString(), - null, - BigInteger.valueOf(lastTime.toInstant().toEpochMilli())); - var upstate = DBOS.upsertExternalState(nes); - return ZonedDateTime.parse(upstate.value()).plus(1, ChronoUnit.MILLIS); + if (swf.ignoreMissed()) { + return ZonedDateTime.now(ZoneOffset.UTC).withNano(0); + } + + var state = + DBOS.upsertExternalState( + new ExternalState( + "DBOS.SchedulerService", + swf.workflow().fullyQualifiedName(), + "lastTime", + lastTime.toString(), + null, + BigInteger.valueOf(lastTime.toInstant().toEpochMilli()))); + return ZonedDateTime.parse(state.value()).plus(1, ChronoUnit.MILLIS); } private void startScheduledWorkflows() { + logger.debug("startScheduledWorkflows"); var expectedParams = new Class[] {Instant.class, Instant.class}; @@ -150,69 +153,66 @@ private void startScheduledWorkflows() { } } - for (var swf : scheduledWorkflows) { - ExecutionTime executionTime = ExecutionTime.forCron(swf.cron()); - - var wf = swf.workflow(); - - // Kick off the first run (but only scheduled at the next proper time) - ZonedDateTime cnow = getNextTime(swf); + for (var _swf : scheduledWorkflows) { var task = new Runnable() { - ZonedDateTime curNow = cnow; + final ScheduledWorkflow swf = _swf; + final ExecutionTime executionTime = ExecutionTime.forCron(swf.cron()); + final String workflowName = swf.workflow().fullyQualifiedName(); + + ZonedDateTime nextTime = getLastTime(swf); + + public void schedule() { + executionTime + .nextExecution(nextTime) + .ifPresent( + nextTime -> { + this.nextTime = nextTime; + long initialDelayMs = + Duration.between(ZonedDateTime.now(ZoneOffset.UTC), nextTime) + .toMillis(); + // ensure scheduler hasn't been shutdown before scheduling + var localScheduler = scheduler.get(); + if (localScheduler != null) { + logger.debug("Scheduling {} @ {}", workflowName, nextTime); + + localScheduler.schedule( + this, initialDelayMs < 0 ? 0 : initialDelayMs, TimeUnit.MILLISECONDS); + } + }); + } @Override public void run() { + // if scheduler service isn't running, the scheduler service was shut down so don't + // start the workflow or schedule the next execution + if (scheduler.get() == null) { + return; + } - ZonedDateTime scheduledTime = curNow; + ZonedDateTime scheduledTime = nextTime; try { Object[] args = new Object[2]; args[0] = scheduledTime.toInstant(); args[1] = ZonedDateTime.now(ZoneOffset.UTC).toInstant(); - logger.debug("submitting to dbos Executor {}", wf.fullyQualifiedName()); + + logger.debug("starting scheduled workflow {} at {}", workflowName, args[1]); + String workflowId = - String.format("sched-%s-%s", wf.fullyQualifiedName(), scheduledTime.toString()); + String.format("sched-%s-%s", workflowName, scheduledTime.toString()); var options = new StartWorkflowOptions(workflowId).withQueue(swf.queue()); - DBOS.startWorkflow(wf, args, options); - + DBOS.startWorkflow(swf.workflow(), args, options); + nextTime = setLastTime(swf, scheduledTime); } catch (Exception e) { - logger.error("Scheduled task exception {}", wf.fullyQualifiedName(), e); - } - - if (!stop) { - ZonedDateTime now = - swf.ignoreMissed() - ? ZonedDateTime.now(ZoneOffset.UTC).withNano(0) - : setLastTime(swf, scheduledTime); - logger.debug( - "Scheduling the next execution {} {}", wf.fullyQualifiedName(), now.toString()); - executionTime - .nextExecution(now) - .ifPresent( - nextTime -> { - logger.debug("Next execution time {}", nextTime); - curNow = nextTime; - long delayMs = - Duration.between(ZonedDateTime.now(ZoneOffset.UTC), nextTime) - .toMillis(); - scheduler.schedule( - this, delayMs < 0 ? 0 : delayMs, TimeUnit.MILLISECONDS); - }); + logger.error("Scheduled task exception {}", workflowName, e); + } finally { + schedule(); } } }; - executionTime - .nextExecution(task.curNow) - .ifPresent( - nextTime -> { - task.curNow = nextTime; - long initialDelayMs = - Duration.between(ZonedDateTime.now(ZoneOffset.UTC), nextTime).toMillis(); - scheduler.schedule( - task, initialDelayMs < 0 ? 0 : initialDelayMs, TimeUnit.MILLISECONDS); - }); + task.schedule(); } } } diff --git a/transact/src/test/java/dev/dbos/transact/database/DisruptiveServiceImpl.java b/transact/src/test/java/dev/dbos/transact/database/DisruptiveServiceImpl.java index 38f8b3af..a88ae382 100644 --- a/transact/src/test/java/dev/dbos/transact/database/DisruptiveServiceImpl.java +++ b/transact/src/test/java/dev/dbos/transact/database/DisruptiveServiceImpl.java @@ -1,9 +1,11 @@ package dev.dbos.transact.database; import dev.dbos.transact.DBOS; -import dev.dbos.transact.utils.DBUtils; import dev.dbos.transact.workflow.Workflow; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; import java.time.Duration; import javax.sql.DataSource; @@ -34,7 +36,7 @@ public String dbLossBetweenSteps() { return "B"; }, "B"); - DBUtils.causeChaos(ds); + causeChaos(ds); DBOS.runStep( () -> { return "C"; @@ -51,31 +53,47 @@ public String dbLossBetweenSteps() { @Override @Workflow() public String runChildWf() { - DBUtils.causeChaos(ds); + causeChaos(ds); var wfh = DBOS.startWorkflow(() -> self.dbLossBetweenSteps()); - DBUtils.causeChaos(ds); + causeChaos(ds); return wfh.getResult(); } @Override @Workflow() public String wfPart1() { - DBUtils.causeChaos(ds); + causeChaos(ds); var r = (String) DBOS.recv("topic", Duration.ofSeconds(5)); - DBUtils.causeChaos(ds); + causeChaos(ds); DBOS.setEvent("key", "v1"); - DBUtils.causeChaos(ds); + causeChaos(ds); return "Part1" + r; } @Override @Workflow() public String wfPart2(String id1) { - DBUtils.causeChaos(ds); + causeChaos(ds); DBOS.send(id1, "hello1", "topic"); - DBUtils.causeChaos(ds); + causeChaos(ds); var v1 = (String) DBOS.getEvent(id1, "key", Duration.ofSeconds(5)); - DBUtils.causeChaos(ds); + causeChaos(ds); return "Part2" + v1; } + + static void causeChaos(DataSource ds) { + try (Connection conn = ds.getConnection(); + Statement st = conn.createStatement()) { + + st.execute( + """ + SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE pid <> pg_backend_pid() + AND datname = current_database(); + """); + } catch (SQLException e) { + throw new RuntimeException("Could not cause chaos, credentials insufficient?", e); + } + } } diff --git a/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java b/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java index b6184ddd..9468357b 100644 --- a/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java +++ b/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java @@ -107,7 +107,7 @@ public void testDedupeId() throws Exception { assertTrue(before.equals(after)); } - void logWorkflowDetails(String wfid, String name) { + void logWorkflowDetails(String wfid, String name) throws Exception { var wfstat = DBOS.getWorkflowStatus(wfid); System.out.println( String.format("Workflow (%s) ID: %s. Status %s", name, wfid, wfstat.status())); diff --git a/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java b/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java index 38b17815..17db5e49 100644 --- a/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java +++ b/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java @@ -294,7 +294,7 @@ public void sleepRecovery() throws Exception { // let us set the state to PENDING and increase the sleep time DBUtils.setWorkflowState(dataSource, wfid, WorkflowState.PENDING.name()); long currenttime = System.currentTimeMillis(); - double newEndtime = (currenttime + 2000); + long newEndtime = (currenttime + 2000); String endTimeAsJson = JSONUtil.serialize(newEndtime); diff --git a/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java b/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java index 6c0f1c10..06b9091f 100644 --- a/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java +++ b/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java @@ -136,7 +136,8 @@ void recoverPendingWorkflows() throws Exception { setWorkflowStateToPending(dataSource); - List> pending = dbosExecutor.recoverPendingWorkflows(null); + List> pending = + dbosExecutor.recoverPendingWorkflows(List.of(dbosExecutor.executorId())); assertEquals(5, pending.size()); for (var handle : pending) { diff --git a/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java b/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java index cfacd928..bde91a92 100644 --- a/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java +++ b/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java @@ -60,7 +60,7 @@ void afterEachTest() throws Exception { } @Test - void directInvoke() { + void directInvoke() throws Exception { var result = proxy.simpleWorkflow(); assertEquals(localDate, result); @@ -79,7 +79,7 @@ void startWorkflow() throws Exception { validateWorkflow(); } - void validateWorkflow() { + void validateWorkflow() throws SQLException { var rows = DBUtils.getWorkflowRows(dataSource, schema); assertEquals(1, rows.size()); var row = rows.get(0); diff --git a/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java b/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java index 1d936bc5..0134987e 100644 --- a/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java +++ b/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java @@ -64,7 +64,7 @@ void afterEachTest() throws Exception { } @Test - void directInvoke() { + void directInvoke() throws Exception { var result = proxy.simpleWorkflow(); assertEquals(localDate, result); @@ -83,7 +83,7 @@ void directInvoke() { } @Test - void directInvokeSetWorkflowId() { + void directInvokeSetWorkflowId() throws Exception { String workflowId = "directInvokeSetWorkflowId"; try (var _o = new WorkflowOptions(workflowId).setContext()) { @@ -98,7 +98,7 @@ void directInvokeSetWorkflowId() { } @Test - void directInvokeSetTimeout() { + void directInvokeSetTimeout() throws Exception { var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(10)); try (var _o = options.setContext()) { @@ -114,7 +114,7 @@ void directInvokeSetTimeout() { } @Test - void directInvokeSetZeroTimeout() { + void directInvokeSetZeroTimeout() throws Exception { var options = new WorkflowOptions().withTimeout(Timeout.none()); try (var _o = options.setContext()) { @@ -130,7 +130,7 @@ void directInvokeSetZeroTimeout() { } @Test - void directInvokeSetWorkflowIdAndTimeout() { + void directInvokeSetWorkflowIdAndTimeout() throws Exception { String workflowId = "directInvokeSetWorkflowIdAndTimeout"; var options = new WorkflowOptions(workflowId).withTimeout(Duration.ofSeconds(10)); @@ -148,7 +148,7 @@ void directInvokeSetWorkflowIdAndTimeout() { } @Test - void directInvokeTimeoutCancellation() { + void directInvokeTimeoutCancellation() throws Exception { var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(1)); try (var _o = options.setContext()) { @@ -164,7 +164,7 @@ void directInvokeTimeoutCancellation() { } @Test - void directInvokeTimeoutDeadline() { + void directInvokeTimeoutDeadline() throws Exception { var options = new WorkflowOptions().withDeadline(Instant.ofEpochMilli(System.currentTimeMillis() + 1000)); @@ -181,7 +181,7 @@ void directInvokeTimeoutDeadline() { } @Test - void directInvokeSetWorkflowIdTimeoutCancellation() { + void directInvokeSetWorkflowIdTimeoutCancellation() throws Exception { var workflowId = "directInvokeSetWorkflowIdTimeoutCancellation"; var options = new WorkflowOptions(workflowId).withTimeout(Duration.ofSeconds(1)); @@ -199,7 +199,7 @@ void directInvokeSetWorkflowIdTimeoutCancellation() { } @Test - void directInvokeParent() { + void directInvokeParent() throws Exception { var result = proxy.parentWorkflow(); assertEquals(localDate, result); @@ -266,7 +266,7 @@ void directInvokeParentStartWorkflow() throws Exception { } @Test - void directInvokeParentSetWorkflowId() { + void directInvokeParentSetWorkflowId() throws Exception { String workflowId = "directInvokeParentSetWorkflowId"; try (var _o = new WorkflowOptions(workflowId).setContext()) { @@ -283,7 +283,7 @@ void directInvokeParentSetWorkflowId() { } @Test - void directInvokeParentSetTimeout() { + void directInvokeParentSetTimeout() throws Exception { var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(10)); try (var _o = options.setContext()) { @@ -304,7 +304,7 @@ void directInvokeParentSetTimeout() { } @Test - void directInvokeParentSetTimeoutParent() { + void directInvokeParentSetTimeoutParent() throws Exception { var result = proxy.parentSleepWorkflow(5L, 1); assertEquals(LocalDate.now().format(DateTimeFormatter.ISO_DATE), result); @@ -320,7 +320,7 @@ void directInvokeParentSetTimeoutParent() { } @Test - void directInvokeParentSetTimeoutParent2() { + void directInvokeParentSetTimeoutParent2() throws Exception { var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(10)); try (var _o = options.setContext()) { @@ -341,7 +341,7 @@ void directInvokeParentSetTimeoutParent2() { } @Test - void directInvokeParentSetTimeoutParent3() { + void directInvokeParentSetTimeoutParent3() throws Exception { var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(10)); try (var _o = options.setContext()) { @@ -362,7 +362,7 @@ void directInvokeParentSetTimeoutParent3() { } @Test - void invokeWorkflowFromStepThrows() { + void invokeWorkflowFromStepThrows() throws Exception { var ise = assertThrows(IllegalStateException.class, () -> proxy.illegalWorkflow()); assertEquals("cannot invoke a workflow from a step", ise.getMessage()); @@ -382,7 +382,7 @@ void invokeWorkflowFromStepThrows() { } @Test - void directInvokeStep() { + void directInvokeStep() throws Exception { var result = proxy.stepWorkflow(); assertNotNull(result); @@ -400,40 +400,23 @@ void directInvokeStep() { assertEquals("nowStep", step.functionName()); } - // @Test - // void directInvokeParentSetParentTimeout() { - - // var options = new WorkflowOptions(Duration.ofSeconds(10)); - // try (var _o = options.setContext()) { - // var result = proxy.parentWorkflow(); - // assertEquals(LocalDate.now().format(DateTimeFormatter.ISO_DATE), result); - // } - - // var table = DBUtils.dumpWfStatus(dataSource); - // assertEquals(2, table.size()); - // var row0 = table.get(0); - // var row1 = table.get(1); - // assertEquals(10000L, row0.get("workflow_timeout_ms")); - // assertEquals(10000L, row1.get("workflow_timeout_ms")); - // assertNotNull(row0.get("workflow_deadline_epoch_ms")); - // assertNotNull(row1.get("workflow_deadline_epoch_ms")); - // assertEquals(row0.get("workflow_deadline_epoch_ms"), row1.get("workflow_deadline_epoch_ms")); - // } - - // @Test - // void directInvokeParentTimeout() { - - // var impl = new HawkServiceImpl(); - // var proxy = - // - // dbos.Workflow().interfaceClass(HawkService.class).implementation(impl).build(); - // impl.setProxy(proxy); - - // DBOS.launch(); - - // var options = new WorkflowOptions(Duration.ofSeconds(1)); - // try (var _o = options.setContext()) { - // assertThrows(CancellationException.class, () -> proxy.parentSleepWorkflow(null, 10L)); - // } - // } + @Test + void directInvokeParentSetParentTimeout() throws Exception { + + var options = new WorkflowOptions().withTimeout(Duration.ofSeconds(10)); + try (var _o = options.setContext()) { + var result = proxy.parentWorkflow(); + assertEquals(LocalDate.now().format(DateTimeFormatter.ISO_DATE), result); + } + + var table = DBUtils.getWorkflowRows(dataSource); + assertEquals(2, table.size()); + var row0 = table.get(0); + var row1 = table.get(1); + assertEquals(10000L, row0.timeoutMs()); + assertEquals(10000L, row1.timeoutMs()); + assertNotNull(row0.deadlineEpochMs()); + assertNotNull(row1.deadlineEpochMs()); + assertEquals(row0.deadlineEpochMs(), row1.deadlineEpochMs()); + } } diff --git a/transact/src/test/java/dev/dbos/transact/invocation/PatchTest.java b/transact/src/test/java/dev/dbos/transact/invocation/PatchTest.java index 0f8ab811..800fa9fc 100644 --- a/transact/src/test/java/dev/dbos/transact/invocation/PatchTest.java +++ b/transact/src/test/java/dev/dbos/transact/invocation/PatchTest.java @@ -8,11 +8,19 @@ import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.exceptions.DBOSUnexpectedStepException; +import dev.dbos.transact.execution.RegisteredWorkflow; import dev.dbos.transact.utils.DBUtils; import dev.dbos.transact.workflow.Workflow; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +import javax.sql.DataSource; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; interface PatchService { int workflow(); @@ -75,9 +83,11 @@ public int workflow() { } } -// @org.junit.jupiter.api.Timeout(value = 2, unit = TimeUnit.MINUTES) +@org.junit.jupiter.api.Timeout(value = 2, unit = TimeUnit.MINUTES) public class PatchTest { + private static final Logger logger = LoggerFactory.getLogger(PatchTest.class); + @AfterEach void afterEachTest() throws Exception { DBOS.shutdown(); @@ -142,7 +152,7 @@ public void testPatch() throws Exception { // Verify an old execution runs the pre-patch workflow and does not store a patch marker queueService.pause(); var h2Fork1 = DBOS.forkWorkflow(h1.workflowId(), 2); - DBUtils.updateWorkflowName(dataSource, h2.workflowId(), h2Fork1.workflowId()); + updateWorkflowName(dataSource, h2.workflowId(), h2Fork1.workflowId()); queueService.unpause(); assertEquals(3, h2Fork1.getResult()); assertEquals(2, DBOS.listWorkflowSteps(h2Fork1.workflowId()).size()); @@ -170,7 +180,7 @@ public void testPatch() throws Exception { // Verify an execution containing the v2 patch marker recovers to v2 queueService.pause(); var h3Fork2 = DBOS.forkWorkflow(h2.workflowId(), 3); - DBUtils.updateWorkflowName(dataSource, h3.workflowId(), h3Fork2.workflowId()); + updateWorkflowName(dataSource, h3.workflowId(), h3Fork2.workflowId()); queueService.unpause(); assertEquals(5, h3Fork2.getResult()); steps = DBOS.listWorkflowSteps(h3Fork2.workflowId()); @@ -180,7 +190,7 @@ public void testPatch() throws Exception { // Verify a v1 execution recovers the pre-patch workflow and does not store a patch marker queueService.pause(); var h3Fork1 = DBOS.forkWorkflow(h1.workflowId(), 2); - DBUtils.updateWorkflowName(dataSource, h3.workflowId(), h3Fork1.workflowId()); + updateWorkflowName(dataSource, h3.workflowId(), h3Fork1.workflowId()); queueService.unpause(); assertEquals(3, h3Fork1.getResult()); assertEquals(2, DBOS.listWorkflowSteps(h3Fork1.workflowId()).size()); @@ -204,7 +214,7 @@ public void testPatch() throws Exception { // Verify an execution containing the v3 patch marker recovers to v3 queueService.pause(); var h4Fork3 = DBOS.forkWorkflow(h3.workflowId(), 3); - DBUtils.updateWorkflowName(dataSource, h4.workflowId(), h4Fork3.workflowId()); + updateWorkflowName(dataSource, h4.workflowId(), h4Fork3.workflowId()); queueService.unpause(); assertEquals(4, h4Fork3.getResult()); steps = DBOS.listWorkflowSteps(h4Fork3.workflowId()); @@ -214,14 +224,14 @@ public void testPatch() throws Exception { // Verify an execution containing the v2 patch marker cleanly fails queueService.pause(); var h4Fork2 = DBOS.forkWorkflow(h2.workflowId(), 3); - DBUtils.updateWorkflowName(dataSource, h4.workflowId(), h4Fork2.workflowId()); + updateWorkflowName(dataSource, h4.workflowId(), h4Fork2.workflowId()); queueService.unpause(); assertThrows(DBOSUnexpectedStepException.class, () -> h4Fork2.getResult()); // Verify a v1 execution cleanly fails queueService.pause(); var h4Fork1 = DBOS.forkWorkflow(h1.workflowId(), 2); - DBUtils.updateWorkflowName(dataSource, h4.workflowId(), h4Fork1.workflowId()); + updateWorkflowName(dataSource, h4.workflowId(), h4Fork1.workflowId()); queueService.unpause(); assertThrows(DBOSUnexpectedStepException.class, () -> h4Fork1.getResult()); @@ -239,7 +249,7 @@ public void testPatch() throws Exception { // Verify an execution from the deprecated patch works sans patch marker queueService.pause(); var h5Fork4 = DBOS.forkWorkflow(h4.workflowId(), 3); - DBUtils.updateWorkflowName(dataSource, h5.workflowId(), h5Fork4.workflowId()); + updateWorkflowName(dataSource, h5.workflowId(), h5Fork4.workflowId()); queueService.unpause(); assertEquals(4, h5Fork4.getResult()); assertEquals(2, DBOS.listWorkflowSteps(h5Fork4.workflowId()).size()); @@ -247,25 +257,59 @@ public void testPatch() throws Exception { // Verify an execution containing the v3 patch marker cleanly fails queueService.pause(); var h5Fork3 = DBOS.forkWorkflow(h3.workflowId(), 3); - DBUtils.updateWorkflowName(dataSource, h5.workflowId(), h5Fork3.workflowId()); + updateWorkflowName(dataSource, h5.workflowId(), h5Fork3.workflowId()); queueService.unpause(); assertThrows(DBOSUnexpectedStepException.class, () -> h5Fork3.getResult()); // Verify an execution containing the v2 patch marker cleanly fails queueService.pause(); var h5Fork2 = DBOS.forkWorkflow(h2.workflowId(), 3); - DBUtils.updateWorkflowName(dataSource, h5.workflowId(), h5Fork2.workflowId()); + updateWorkflowName(dataSource, h5.workflowId(), h5Fork2.workflowId()); queueService.unpause(); assertThrows(DBOSUnexpectedStepException.class, () -> h5Fork2.getResult()); // Verify a v1 execution cleanly fails queueService.pause(); var h5Fork1 = DBOS.forkWorkflow(h1.workflowId(), 2); - DBUtils.updateWorkflowName(dataSource, h5.workflowId(), h5Fork1.workflowId()); + updateWorkflowName(dataSource, h5.workflowId(), h5Fork1.workflowId()); queueService.unpause(); assertThrows(DBOSUnexpectedStepException.class, () -> h5Fork1.getResult()); } + void updateWorkflowName(DataSource dataSource, String sourceId, String destinationId) + throws SQLException { + + var row = DBUtils.getWorkflowRow(dataSource, sourceId); + if (row == null) { + throw new RuntimeException("Source workflow %s not found".formatted(sourceId)); + } + + logger.info( + "updateWorkflowName {} workflow to {}", + destinationId, + RegisteredWorkflow.fullyQualifiedName(row.className(), row.instanceName(), row.name())); + + var sql = + """ + UPDATE %s.workflow_status + SET name = ?, class_name = ?, config_name = ? + WHERE workflow_uuid = ? + """ + .formatted(SystemDatabase.sanitizeSchema(null)); + + try (var conn = dataSource.getConnection(); + var ps = conn.prepareStatement(sql)) { + ps.setString(1, row.name()); + ps.setString(2, row.className()); + ps.setString(3, row.instanceName()); + ps.setString(4, destinationId); + + if (ps.executeUpdate() == 0) { + logger.warn("updateWorkflowName {} workflow updated 0 rows", destinationId); + } + } + } + @Test public void patchThrowsNotConfigured() throws Exception { var dbosConfig = diff --git a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java index ee4d3dac..ae9c2983 100644 --- a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java +++ b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java @@ -208,11 +208,12 @@ public static Connection getConnection(DBOSConfig config) throws SQLException { return DriverManager.getConnection(config.databaseUrl(), config.dbUser(), config.dbPassword()); } - public static List getWorkflowRows(DataSource ds) { + public static List getWorkflowRows(DataSource ds) throws SQLException { return getWorkflowRows(ds, null); } - public static List getWorkflowRows(DataSource ds, String schema) { + public static List getWorkflowRows(DataSource ds, String schema) + throws SQLException { schema = SystemDatabase.sanitizeSchema(schema); String sql = "SELECT * FROM %s.workflow_status ORDER BY created_at".formatted(schema); try (var conn = ds.getConnection(); @@ -223,16 +224,16 @@ public static List getWorkflowRows(DataSource ds, String sche rows.add(new WorkflowStatusRow(rs)); } return rows; - } catch (SQLException e) { - throw new RuntimeException(e); } } - public static WorkflowStatusRow getWorkflowRow(DataSource ds, String workflowId) { + public static WorkflowStatusRow getWorkflowRow(DataSource ds, String workflowId) + throws SQLException { return getWorkflowRow(ds, workflowId, null); } - public static WorkflowStatusRow getWorkflowRow(DataSource ds, String workflowId, String schema) { + public static WorkflowStatusRow getWorkflowRow(DataSource ds, String workflowId, String schema) + throws SQLException { schema = SystemDatabase.sanitizeSchema(schema); var sql = "SELECT * FROM %s.workflow_status WHERE workflow_uuid = ?".formatted(schema); try (var conn = ds.getConnection(); @@ -245,17 +246,16 @@ public static WorkflowStatusRow getWorkflowRow(DataSource ds, String workflowId, return null; } } - } catch (SQLException e) { - throw new RuntimeException(e); } } - public static List getStepRows(DataSource ds, String workflowId) { + public static List getStepRows(DataSource ds, String workflowId) + throws SQLException { return getStepRows(ds, workflowId, null); } public static List getStepRows( - DataSource ds, String workflowId, String schema) { + DataSource ds, String workflowId, String schema) throws SQLException { schema = SystemDatabase.sanitizeSchema(schema); var sql = "SELECT * FROM %s.operation_outputs WHERE workflow_uuid = ? ORDER BY function_id" @@ -272,18 +272,18 @@ public static List getStepRows( } return rows; } - } catch (SQLException e) { - throw new RuntimeException(e); } } public record Event(String key, String value) {} - public static List getWorkflowEvents(DataSource ds, String workflowId) { + public static List getWorkflowEvents(DataSource ds, String workflowId) + throws SQLException { return getWorkflowEvents(ds, workflowId, null); } - public static List getWorkflowEvents(DataSource ds, String workflowId, String schema) { + public static List getWorkflowEvents(DataSource ds, String workflowId, String schema) + throws SQLException { schema = SystemDatabase.sanitizeSchema(schema); try (var conn = ds.getConnection(); ) { var stmt = @@ -300,19 +300,18 @@ public static List getWorkflowEvents(DataSource ds, String workflowId, St } return rows; - } catch (SQLException e) { - throw new RuntimeException(e); } } public record EventHistory(int stepId, String key, String value) {} - public static List getWorkflowEventHistory(DataSource ds, String workflowId) { + public static List getWorkflowEventHistory(DataSource ds, String workflowId) + throws SQLException { return getWorkflowEventHistory(ds, workflowId, null); } public static List getWorkflowEventHistory( - DataSource ds, String workflowId, String schema) { + DataSource ds, String workflowId, String schema) throws SQLException { schema = SystemDatabase.sanitizeSchema(schema); try (var conn = ds.getConnection(); ) { var stmt = @@ -330,32 +329,14 @@ public static List getWorkflowEventHistory( } return rows; - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public static void causeChaos(DataSource ds) { - try (Connection conn = ds.getConnection(); - Statement st = conn.createStatement()) { - - st.execute( - """ - SELECT pg_terminate_backend(pid) - FROM pg_stat_activity - WHERE pid <> pg_backend_pid() - AND datname = current_database(); - """); - } catch (SQLException e) { - throw new RuntimeException("Could not cause chaos, credentials insufficient?", e); } } - public static boolean queueEntriesCleanedUp(DataSource ds) { + public static boolean queueEntriesCleanedUp(DataSource ds) throws SQLException { return queueEntriesCleanedUp(ds, null); } - public static boolean queueEntriesCleanedUp(DataSource ds, String schema) { + public static boolean queueEntriesCleanedUp(DataSource ds, String schema) throws SQLException { schema = SystemDatabase.sanitizeSchema(schema); var sql = """ @@ -381,44 +362,8 @@ AND status IN ('ENQUEUED', 'PENDING') break; } } - } catch (SQLException e) { - throw new RuntimeException(e); } } return success; } - - public static void updateWorkflowName(DataSource ds, String sourceId, String destinationId) { - updateWorkflowName(ds, sourceId, destinationId, null); - } - - public static void updateWorkflowName( - DataSource ds, String sourceId, String destinationId, String schema) { - var row = getWorkflowRow(ds, sourceId, schema); - if (row == null) { - throw new RuntimeException("Workflow %s not found".formatted(sourceId)); - } - - schema = SystemDatabase.sanitizeSchema(schema); - - var sql = - """ - UPDATE %s.workflow_status - SET name = ?, class_name = ?, config_name = ? - WHERE workflow_uuid = ? - """ - .formatted(schema); - try (var conn = ds.getConnection(); - var ps = conn.prepareStatement(sql)) { - - ps.setString(1, row.name()); - ps.setString(2, row.className()); - ps.setString(3, row.instanceName()); - ps.setString(4, destinationId); - - ps.executeUpdate(); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } }