diff --git a/transact/build.gradle.kts b/transact/build.gradle.kts index 34032a30..a33e6202 100644 --- a/transact/build.gradle.kts +++ b/transact/build.gradle.kts @@ -31,6 +31,8 @@ dependencies { implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0") implementation("com.cronutils:cron-utils:9.2.1") // cron for scheduled wf + compileOnly("org.jspecify:jspecify:1.0.0") + testImplementation(platform("org.junit:junit-bom:5.12.1")) testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.junit-pioneer:junit-pioneer:2.3.0") diff --git a/transact/src/main/java/dev/dbos/transact/DBOS.java b/transact/src/main/java/dev/dbos/transact/DBOS.java index 1865d0a5..eb68560d 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOS.java +++ b/transact/src/main/java/dev/dbos/transact/DBOS.java @@ -32,6 +32,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +48,7 @@ private DBOS() {} private static final Logger logger = LoggerFactory.getLogger(DBOS.class); private static final String version = loadVersionFromResources(); - private static String loadVersionFromResources() { + private static @Nullable String loadVersionFromResources() { final String PROPERTIES_FILE = "/dev/dbos/transact/app.properties"; final String VERSION_KEY = "app.version"; Properties props = new Properties(); @@ -69,7 +71,7 @@ private static String loadVersionFromResources() { } } - public static String version() { + public static @Nullable String version() { return version; } @@ -89,7 +91,9 @@ private Instance() { } private void registerClassWorkflows( - Class interfaceClass, Object implementation, String instanceName) { + @NonNull Class interfaceClass, + @NonNull Object implementation, + @Nullable String instanceName) { Objects.requireNonNull(interfaceClass, "interfaceClass must not be null"); Objects.requireNonNull(implementation, "implementation must not be null"); instanceName = Objects.requireNonNullElse(instanceName, ""); @@ -113,8 +117,12 @@ private void registerClassWorkflows( } } - private String registerWorkflowMethod( - Workflow wfTag, Object target, String className, String instanceName, Method method) { + private @NonNull String registerWorkflowMethod( + @NonNull Workflow wfTag, + @NonNull Object target, + @NonNull String className, + @NonNull String instanceName, + @NonNull Method method) { if (dbosExecutor.get() != null) { throw new IllegalStateException("Cannot register workflow after DBOS is launched"); } @@ -125,7 +133,7 @@ private String registerWorkflowMethod( return name; } - void registerLifecycleListener(DBOSLifecycleListener l) { + void registerLifecycleListener(@NonNull DBOSLifecycleListener l) { if (dbosExecutor.get() != null) { throw new IllegalStateException( "Cannot register lifecycle listener after DBOS is launched"); @@ -134,7 +142,7 @@ void registerLifecycleListener(DBOSLifecycleListener l) { lifecycleRegistry.add(l); } - void registerQueue(Queue queue) { + void registerQueue(@NonNull Queue queue) { if (dbosExecutor.get() != null) { throw new IllegalStateException("Cannot build a queue after DBOS is launched"); } @@ -142,11 +150,13 @@ void registerQueue(Queue queue) { queueRegistry.register(queue); } - public T registerWorkflows(Class interfaceClass, T implementation) { + public @NonNull T registerWorkflows( + @NonNull Class interfaceClass, @NonNull T implementation) { return registerWorkflows(interfaceClass, implementation, ""); } - public T registerWorkflows(Class interfaceClass, T implementation, String instanceName) { + public @NonNull T registerWorkflows( + @NonNull Class interfaceClass, @NonNull T implementation, @NonNull String instanceName) { registerClassWorkflows(interfaceClass, implementation, instanceName); return DBOSInvocationHandler.createProxy( @@ -169,11 +179,11 @@ void clearRegistry() { } // package private methods for test purposes - DBOSExecutor getDbosExecutor() { + @Nullable DBOSExecutor getDbosExecutor() { return dbosExecutor.get(); } - public void setConfig(DBOSConfig config) { + public void setConfig(@NonNull DBOSConfig config) { if (this.config != null) { throw new IllegalStateException("DBOS has already been configured"); } @@ -230,7 +240,8 @@ public void shutdown() { * @return A proxy, with interface {@literal }, that provides durability for the workflow * functions */ - public static T registerWorkflows(Class interfaceClass, T implementation) { + public static @NonNull T registerWorkflows( + @NonNull Class interfaceClass, @NonNull T implementation) { return ensureInstance().registerWorkflows(interfaceClass, implementation, ""); } @@ -245,8 +256,8 @@ public static T registerWorkflows(Class interfaceClass, T implementation) * @return A proxy, with interface {@literal }, that provides durability for the workflow * functions */ - public static T registerWorkflows( - Class interfaceClass, T implementation, String instanceName) { + public static @NonNull T registerWorkflows( + @NonNull Class interfaceClass, @NonNull T implementation, @NonNull String instanceName) { return ensureInstance().registerWorkflows(interfaceClass, implementation, instanceName); } @@ -257,7 +268,7 @@ public static T registerWorkflows( * @param queue `Queue` to register * @return input queue */ - public static Queue registerQueue(Queue queue) { + public static @NonNull Queue registerQueue(@NonNull Queue queue) { ensureInstance().registerQueue(queue); return queue; } @@ -267,7 +278,7 @@ public static Queue registerQueue(Queue queue) { * * @param listener */ - public static void registerLifecycleListener(DBOSLifecycleListener listener) { + public static void registerLifecycleListener(@NonNull DBOSLifecycleListener listener) { ensureInstance().registerLifecycleListener(listener); } @@ -316,9 +327,9 @@ public static void shutdown() { if (globalInstance != null) globalInstance.shutdown(); } - private static Instance globalInstance = null; + private static @Nullable Instance globalInstance = null; - public static Instance instance() { + public static @Nullable Instance instance() { return globalInstance; } @@ -346,7 +357,7 @@ static DBOSExecutor executor(String caller) { * * @return Current workflow ID */ - public static String workflowId() { + public static @Nullable String workflowId() { return DBOSContext.workflowId(); } @@ -355,7 +366,7 @@ public static String workflowId() { * * @return Current step ID number */ - public static Integer stepId() { + public static @Nullable Integer stepId() { return DBOSContext.stepId(); } @@ -379,7 +390,7 @@ public static boolean inStep() { * @param queueName Name of the queue * @return Queue definition for given `queueName` */ - public static Optional getQueue(String queueName) { + public static @NonNull Optional getQueue(@NonNull String queueName) { return executor("getQueue").getQueue(queueName); } @@ -389,7 +400,7 @@ public static Optional getQueue(String queueName) { * * @param duration amount of time to sleep */ - public static void sleep(Duration duration) { + public static void sleep(@NonNull Duration duration) { if (!inWorkflow()) { try { Thread.sleep(duration.toMillis()); @@ -416,8 +427,8 @@ public static void sleep(Duration duration) { * @param options Start workflow options * @return A handle to the enqueued or running workflow */ - public static WorkflowHandle startWorkflow( - ThrowingSupplier supplier, StartWorkflowOptions options) { + public static @NonNull WorkflowHandle startWorkflow( + @NonNull ThrowingSupplier supplier, @NonNull StartWorkflowOptions options) { return executor("startWorkflow").startWorkflow(supplier, options); } @@ -429,8 +440,8 @@ public static WorkflowHandle startWorkflow( * @param supplier A lambda that calls exactly one workflow function * @return A handle to the enqueued or running workflow */ - public static WorkflowHandle startWorkflow( - ThrowingSupplier supplier) { + public static @NonNull WorkflowHandle startWorkflow( + @NonNull ThrowingSupplier supplier) { return startWorkflow(supplier, new StartWorkflowOptions()); } @@ -442,8 +453,8 @@ public static WorkflowHandle startWorkflow( * @param options Start workflow options * @return A handle to the enqueued or running workflow */ - public static WorkflowHandle startWorkflow( - ThrowingRunnable runnable, StartWorkflowOptions options) { + public static @NonNull WorkflowHandle startWorkflow( + @NonNull ThrowingRunnable runnable, @NonNull StartWorkflowOptions options) { return startWorkflow( () -> { runnable.execute(); @@ -459,8 +470,8 @@ public static WorkflowHandle startWorkflow( * @param runnable A lambda that calls exactly one workflow function * @return A handle to the enqueued or running workflow */ - public static WorkflowHandle startWorkflow( - ThrowingRunnable runnable) { + public static @NonNull WorkflowHandle startWorkflow( + @NonNull ThrowingRunnable runnable) { return startWorkflow(runnable, new StartWorkflowOptions()); } @@ -473,7 +484,7 @@ public static WorkflowHandle startWorkflow( * @return Return value of the workflow * @throws E if the workflow threw an exception */ - public static T getResult(String workflowId) throws E { + public static T getResult(@NonNull String workflowId) throws E { return executor("getResult").getResult(workflowId); } @@ -483,7 +494,7 @@ public static T getResult(String workflowId) throws E { * @param workflowId ID of the workflow to query * @return Current workflow status for the provided workflowId, or null. */ - public static WorkflowStatus getWorkflowStatus(String workflowId) { + public static @Nullable WorkflowStatus getWorkflowStatus(@NonNull String workflowId) { return executor("getWorkflowStatus").getWorkflowStatus(workflowId); } @@ -496,7 +507,10 @@ public static WorkflowStatus getWorkflowStatus(String workflowId) { * @param idempotencyKey optional idempotency key for exactly-once send */ public static void send( - String destinationId, Object message, String topic, String idempotencyKey) { + @NonNull String destinationId, + @NonNull Object message, + @NonNull String topic, + @Nullable String idempotencyKey) { executor("send") .send(destinationId, message, topic, instance().internalWorkflowsService, idempotencyKey); } @@ -508,7 +522,8 @@ public static void send( * @param message message to be sent * @param topic topic to which the message is send */ - public static void send(String destinationId, Object message, String topic) { + public static void send( + @NonNull String destinationId, @NonNull Object message, @NonNull String topic) { DBOS.send(destinationId, message, topic, null); } @@ -519,7 +534,7 @@ public static void send(String destinationId, Object message, String topic) { * @param timeout duration after which the call times out * @return the message if there is one or else null */ - public static Object recv(String topic, Duration timeout) { + public static @Nullable Object recv(@NonNull String topic, @NonNull Duration timeout) { return executor("recv").recv(topic, timeout); } @@ -529,7 +544,7 @@ public static Object recv(String topic, Duration timeout) { * @param key identifier for published data * @param value data that is published */ - public static void setEvent(String key, Object value) { + public static void setEvent(@NonNull String key, @NonNull Object value) { executor("setEvent").setEvent(key, value); } @@ -541,7 +556,8 @@ public static void setEvent(String key, Object value) { * @param timeout time to wait for data before timing out * @return the published value or null */ - public static Object getEvent(String workflowId, String key, Duration timeout) { + public static @Nullable Object getEvent( + @NonNull String workflowId, @NonNull String key, @NonNull Duration timeout) { logger.debug("Received getEvent for {} {}", workflowId, key); return executor("getEvent").getEvent(workflowId, key, timeout); @@ -556,7 +572,7 @@ public static Object getEvent(String workflowId, String key, Duration timeout) { * @throws E */ public static T runStep( - ThrowingSupplier stepfunc, StepOptions opts) throws E { + @NonNull ThrowingSupplier stepfunc, @NonNull StepOptions opts) throws E { return executor("runStep").runStepInternal(stepfunc, opts, null); } @@ -569,8 +585,8 @@ public static T runStep( * @param name name of the step, for tracing and to record in the system database * @throws E */ - public static T runStep(ThrowingSupplier stepfunc, String name) - throws E { + public static T runStep( + @NonNull ThrowingSupplier stepfunc, @NonNull String name) throws E { return executor("runStep").runStepInternal(stepfunc, new StepOptions(name), null); } @@ -583,8 +599,8 @@ public static T runStep(ThrowingSupplier stepfunc * @param opts step name, and retry options for running the step * @throws E */ - public static void runStep(ThrowingRunnable stepfunc, StepOptions opts) - throws E { + public static void runStep( + @NonNull ThrowingRunnable stepfunc, @NonNull StepOptions opts) throws E { executor("runStep") .runStepInternal( () -> { @@ -603,8 +619,8 @@ public static void runStep(ThrowingRunnable stepfunc, S * @param name Name of the step, for tracing and recording in the system database * @throws E */ - public static void runStep(ThrowingRunnable stepfunc, String name) - throws E { + public static void runStep( + @NonNull ThrowingRunnable stepfunc, @NonNull String name) throws E { runStep(stepfunc, new StepOptions(name)); } @@ -616,7 +632,8 @@ public static void runStep(ThrowingRunnable stepfunc, S * @param workflowId id of the workflow * @return A handle to the workflow */ - public static WorkflowHandle resumeWorkflow(String workflowId) { + public static @NonNull WorkflowHandle resumeWorkflow( + @NonNull String workflowId) { return executor("resumeWorkflow").resumeWorkflow(workflowId); } @@ -627,7 +644,7 @@ public static WorkflowHandle resumeWorkflow(Strin * * @param workflowId ID of the workflow to cancel */ - public static void cancelWorkflow(String workflowId) { + public static void cancelWorkflow(@NonNull String workflowId) { executor("cancelWorkflow").cancelWorkflow(workflowId); } @@ -642,8 +659,8 @@ public static void cancelWorkflow(String workflowId) { * @param options {@link ForkOptions} containing forkedWorkflowId, applicationVersion, timeout * @return handle to the workflow */ - public static WorkflowHandle forkWorkflow( - String workflowId, int startStep, ForkOptions options) { + public static @NonNull WorkflowHandle forkWorkflow( + @NonNull String workflowId, int startStep, @NonNull ForkOptions options) { return executor("forkWorkflow").forkWorkflow(workflowId, startStep, options); } @@ -657,8 +674,8 @@ public static WorkflowHandle forkWorkflow( * @param startStep Start execution from this step. Prior steps copied over * @return handle to the workflow */ - public static WorkflowHandle forkWorkflow( - String workflowId, int startStep) { + public static @NonNull WorkflowHandle forkWorkflow( + @NonNull String workflowId, int startStep) { return forkWorkflow(workflowId, startStep, new ForkOptions()); } @@ -671,7 +688,8 @@ public static WorkflowHandle forkWorkflow( * @param workflowId ID of the workflow to retrieve * @return Workflow handle for the provided workflow ID */ - public static WorkflowHandle retrieveWorkflow(String workflowId) { + public static @NonNull WorkflowHandle retrieveWorkflow( + @NonNull String workflowId) { return executor("retrieveWorkflow").retrieveWorkflow(workflowId); } @@ -681,7 +699,7 @@ public static WorkflowHandle retrieveWorkflow(Str * @param input {@link ListWorkflowsInput} parameters to query workflows * @return a list of workflow status {@link WorkflowStatus} */ - public static List listWorkflows(ListWorkflowsInput input) { + public static @NonNull List listWorkflows(@NonNull ListWorkflowsInput input) { return executor("listWorkflows").listWorkflows(input); } @@ -691,7 +709,7 @@ public static List listWorkflows(ListWorkflowsInput input) { * @param workflowId Id of the workflow whose steps to return * @return list of step information {@link StepInfo} */ - public static List listWorkflowSteps(String workflowId) { + public static @NonNull List listWorkflowSteps(@NonNull String workflowId) { return executor("listWorkflowSteps").listWorkflowSteps(workflowId); } @@ -700,7 +718,7 @@ public static List listWorkflowSteps(String workflowId) { * * @return list of all registered workflow methods */ - public static Collection getRegisteredWorkflows() { + public static @NonNull Collection getRegisteredWorkflows() { return executor("getRegisteredWorkflows").getWorkflows(); } @@ -709,7 +727,7 @@ public static Collection getRegisteredWorkflows() { * * @return list of all class instances containing registered workflow methods */ - public static Collection getRegisteredWorkflowInstances() { + public static @NonNull Collection getRegisteredWorkflowInstances() { return executor("getRegisteredWorkflowInstances").getInstances(); } @@ -767,7 +785,7 @@ public static ExternalState upsertExternalState(ExternalState state) { * @throws RuntimeException if patching is not enabled in DBOS config or if called outside a * workflow */ - public static boolean patch(String patchName) { + public static boolean patch(@NonNull String patchName) { return executor("patch").patch(patchName); } @@ -783,7 +801,7 @@ public static boolean patch(String patchName) { * @throws RuntimeException if patching is not enabled in DBOS config or if called outside a * workflow */ - public static boolean deprecatePatch(String patchName) { + public static boolean deprecatePatch(@NonNull String patchName) { return executor("deprecatePatch").deprecatePatch(patchName); } } diff --git a/transact/src/main/java/dev/dbos/transact/DBOSClient.java b/transact/src/main/java/dev/dbos/transact/DBOSClient.java index 02aeb476..4089b736 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOSClient.java +++ b/transact/src/main/java/dev/dbos/transact/DBOSClient.java @@ -19,6 +19,8 @@ import java.util.UUID; import com.zaxxer.hikari.HikariDataSource; +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; /** * DBOSClient allows external programs to interact with DBOS apps via direct system database access. @@ -27,14 +29,14 @@ */ public class DBOSClient implements AutoCloseable { private class WorkflowHandleClient implements WorkflowHandle { - private String workflowId; + private @NonNull String workflowId; - public WorkflowHandleClient(String workflowId) { + public WorkflowHandleClient(@NonNull String workflowId) { this.workflowId = workflowId; } @Override - public String workflowId() { + public @NonNull String workflowId() { return workflowId; } @@ -44,12 +46,12 @@ public T getResult() throws E { } @Override - public WorkflowStatus getStatus() { + public @Nullable WorkflowStatus getStatus() { return systemDatabase.getWorkflowStatus(workflowId); } } - private final SystemDatabase systemDatabase; + private final @NonNull SystemDatabase systemDatabase; /** * Construct a DBOSClient, by providing system database access credentials @@ -58,7 +60,7 @@ public WorkflowStatus getStatus() { * @param user System database user * @param password System database credential / password */ - public DBOSClient(String url, String user, String password) { + public DBOSClient(@NonNull String url, @NonNull String user, @NonNull String password) { this(url, user, password, null); } @@ -70,7 +72,11 @@ public DBOSClient(String url, String user, String password) { * @param password System database credential / password * @param schema Database schema for DBOS tables */ - public DBOSClient(String url, String user, String password, String schema) { + public DBOSClient( + @NonNull String url, + @NonNull String user, + @NonNull String password, + @Nullable String schema) { var dataSource = SystemDatabase.createDataSource(url, user, password, 0, 0); systemDatabase = new SystemDatabase(dataSource, schema); } @@ -80,7 +86,7 @@ public DBOSClient(String url, String user, String password, String schema) { * * @param dataSource System database data source */ - public DBOSClient(HikariDataSource dataSource) { + public DBOSClient(@NonNull HikariDataSource dataSource) { this(dataSource, null); } @@ -90,7 +96,7 @@ public DBOSClient(HikariDataSource dataSource) { * @param dataSource System database data source * @param schema Database schema for DBOS tables */ - public DBOSClient(HikariDataSource dataSource, String schema) { + public DBOSClient(@NonNull HikariDataSource dataSource, @Nullable String schema) { systemDatabase = new SystemDatabase(dataSource, schema); } @@ -105,17 +111,17 @@ public void close() { * and app version, are optional, and should be set with `with` functions. */ public record EnqueueOptions( - String workflowName, - String queueName, - String className, - String instanceName, - String workflowId, - String appVersion, - Duration timeout, - Instant deadline, - String deduplicationId, - Integer priority, - String queuePartitionKey) { + @NonNull String workflowName, + @NonNull String queueName, + @NonNull String className, + @NonNull String instanceName, + @Nullable String workflowId, + @Nullable String appVersion, + @Nullable Duration timeout, + @Nullable Instant deadline, + @Nullable String deduplicationId, + @Nullable Integer priority, + @Nullable String queuePartitionKey) { public EnqueueOptions { if (Objects.requireNonNull(workflowName, "EnqueueOptions workflowName must not be null") @@ -159,7 +165,8 @@ public record EnqueueOptions( } /** Construct `EnqueueOptions` with a minimum set of required options */ - public EnqueueOptions(String className, String workflowName, String queueName) { + public EnqueueOptions( + @NonNull String className, @NonNull String workflowName, @NonNull String queueName) { this(workflowName, queueName, className, "", null, null, null, null, null, null, null); } @@ -169,7 +176,7 @@ public EnqueueOptions(String className, String workflowName, String queueName) { * @param className Class containing the workflow to enqueue * @return New `EnqueueOptions` with the class name set */ - public EnqueueOptions withClassName(String className) { + public @NonNull EnqueueOptions withClassName(@NonNull String className) { return new EnqueueOptions( this.workflowName, this.queueName, @@ -191,7 +198,7 @@ public EnqueueOptions withClassName(String className) { * @param workflowId Workflow idempotency ID to use * @return New `EnqueueOptions` with the workflow ID set */ - public EnqueueOptions withWorkflowId(String workflowId) { + public @NonNull EnqueueOptions withWorkflowId(@Nullable String workflowId) { return new EnqueueOptions( this.workflowName, this.queueName, @@ -213,7 +220,7 @@ public EnqueueOptions withWorkflowId(String workflowId) { * @param appVersion Application version to use for executing the workflow * @return New `EnqueueOptions` with the app version set */ - public EnqueueOptions withAppVersion(String appVersion) { + public @NonNull EnqueueOptions withAppVersion(@Nullable String appVersion) { return new EnqueueOptions( this.workflowName, this.queueName, @@ -235,7 +242,7 @@ public EnqueueOptions withAppVersion(String appVersion) { * @param timeout Duration of time, from start, before the workflow is canceled. * @return New `EnqueueOptions` with the timeout set */ - public EnqueueOptions withTimeout(Duration timeout) { + public @NonNull EnqueueOptions withTimeout(@Nullable Duration timeout) { return new EnqueueOptions( this.workflowName, this.queueName, @@ -257,7 +264,7 @@ public EnqueueOptions withTimeout(Duration timeout) { * @param deadline Instant after which the workflow will be canceled. * @return New `EnqueueOptions` with the deadline set */ - public EnqueueOptions withDeadline(Instant deadline) { + public @NonNull EnqueueOptions withDeadline(@Nullable Instant deadline) { return new EnqueueOptions( this.workflowName, this.queueName, @@ -279,7 +286,7 @@ public EnqueueOptions withDeadline(Instant deadline) { * @param deduplicationId Queue deduplication ID * @return New `EnqueueOptions` with the deduplication ID set */ - public EnqueueOptions withDeduplicationId(String deduplicationId) { + public @NonNull EnqueueOptions withDeduplicationId(@Nullable String deduplicationId) { return new EnqueueOptions( this.workflowName, this.queueName, @@ -301,7 +308,7 @@ public EnqueueOptions withDeduplicationId(String deduplicationId) { * @param instName Instance name registered within `DBOS.registerWorkflows` * @return New `EnqueueOptions` with the target instance name set */ - public EnqueueOptions withInstanceName(String instName) { + public @NonNull EnqueueOptions withInstanceName(@Nullable String instName) { return new EnqueueOptions( this.workflowName, this.queueName, @@ -322,7 +329,7 @@ public EnqueueOptions withInstanceName(String instName) { * @param priority Queue priority; if `null`, priority '0' will be used. * @return New `EnqueueOptions` with the priority set */ - public EnqueueOptions withPriority(Integer priority) { + public @NonNull EnqueueOptions withPriority(@Nullable Integer priority) { return new EnqueueOptions( this.workflowName, this.queueName, @@ -345,7 +352,7 @@ public EnqueueOptions withPriority(Integer priority) { * @param partitionKey the partition key to use for queue partitioning, can be null * @return a new EnqueueOptions instance with the specified partition key */ - public EnqueueOptions withQueuePartitionKey(String partitionKey) { + public @NonNull EnqueueOptions withQueuePartitionKey(@Nullable String partitionKey) { return new EnqueueOptions( this.workflowName, this.queueName, @@ -366,7 +373,7 @@ public EnqueueOptions withQueuePartitionKey(String partitionKey) { * @return The workflow idemptence ID */ @Override - public String workflowId() { + public @Nullable String workflowId() { return workflowId != null && workflowId.isEmpty() ? null : workflowId; } } @@ -380,8 +387,8 @@ public String workflowId() { * @param args Arguments to pass to the workflow function * @return WorkflowHandle for retrieving workflow ID, status, and results */ - public WorkflowHandle enqueueWorkflow( - EnqueueOptions options, Object[] args) { + public @NonNull WorkflowHandle enqueueWorkflow( + @NonNull EnqueueOptions options, @Nullable Object[] args) { return DBOSExecutor.enqueueWorkflow( Objects.requireNonNull( @@ -416,7 +423,11 @@ public WorkflowHandle enqueueWorkflow( * @param topic Topic for the message * @param idempotencyKey If specified, use the value to ensure exactly-once send semantics */ - public void send(String destinationId, Object message, String topic, String idempotencyKey) { + public void send( + @NonNull String destinationId, + @NonNull Object message, + @NonNull String topic, + @Nullable String idempotencyKey) { if (idempotencyKey == null) { idempotencyKey = UUID.randomUUID().toString(); } @@ -438,7 +449,8 @@ public void send(String destinationId, Object message, String topic, String idem * @param timeout Maximum time duration to wait before returning `null` * @return Workflow event value, or `null` if the timeout is hit. */ - public Object getEvent(String targetId, String key, Duration timeout) { + public @Nullable Object getEvent( + @NonNull String targetId, @NonNull String key, @NonNull Duration timeout) { return systemDatabase.getEvent(targetId, key, timeout, null); } @@ -451,7 +463,8 @@ public Object getEvent(String targetId, String key, Duration timeout) { * @param workflowId ID of the workflow to retrieve * @return A `WorkflowHandle` for the specified worflow ID */ - public WorkflowHandle retrieveWorkflow(String workflowId) { + public @NonNull WorkflowHandle retrieveWorkflow( + @NonNull String workflowId) { return new WorkflowHandleClient(workflowId); } @@ -460,7 +473,7 @@ public WorkflowHandle retrieveWorkflow(String wor * * @param workflowId ID of the workflow to cancel */ - public void cancelWorkflow(String workflowId) { + public void cancelWorkflow(@NonNull String workflowId) { systemDatabase.cancelWorkflow(workflowId); } @@ -472,7 +485,8 @@ public void cancelWorkflow(String workflowId) { * @param workflowId ID of the workflow to resume * @return `WorkflowHandle` for the resumed workflow */ - public WorkflowHandle resumeWorkflow(String workflowId) { + public @NonNull WorkflowHandle resumeWorkflow( + @NonNull String workflowId) { systemDatabase.resumeWorkflow(workflowId); return retrieveWorkflow(workflowId); } @@ -488,8 +502,8 @@ public WorkflowHandle resumeWorkflow(String workf * @param options Options for forking; * @return `WorkflowHandle` for the new workflow */ - public WorkflowHandle forkWorkflow( - String originalWorkflowId, int startStep, ForkOptions options) { + public @NonNull WorkflowHandle forkWorkflow( + @NonNull String originalWorkflowId, int startStep, @NonNull ForkOptions options) { var forkedWorkflowId = systemDatabase.forkWorkflow(originalWorkflowId, startStep, options); return retrieveWorkflow(forkedWorkflowId); } @@ -500,7 +514,7 @@ public WorkflowHandle forkWorkflow( * @param workflowId ID of the workflow to query for status * @return WorkflowStatus of the workflow, or empty if the workflow does not exist */ - public Optional getWorkflowStatus(String workflowId) { + public @NonNull Optional getWorkflowStatus(@NonNull String workflowId) { return Optional.ofNullable(systemDatabase.getWorkflowStatus(workflowId)); } @@ -510,7 +524,7 @@ public Optional getWorkflowStatus(String workflowId) { * @param input Filter criteria to use for listing workflows * @return list of workflows matching the `ListWorkflowsInput` criteria */ - public List listWorkflows(ListWorkflowsInput input) { + public @NonNull List listWorkflows(@NonNull ListWorkflowsInput input) { return systemDatabase.listWorkflows(input); } @@ -520,7 +534,7 @@ public List listWorkflows(ListWorkflowsInput input) { * @param workflowId ID of the workflow to list * @return List of steps executed by the workflow */ - public List listWorkflowSteps(String workflowId) { + public @NonNull List listWorkflowSteps(@NonNull String workflowId) { return systemDatabase.listWorkflowSteps(workflowId); } } diff --git a/transact/src/main/java/dev/dbos/transact/StartWorkflowOptions.java b/transact/src/main/java/dev/dbos/transact/StartWorkflowOptions.java index 949cedc4..13d574e2 100644 --- a/transact/src/main/java/dev/dbos/transact/StartWorkflowOptions.java +++ b/transact/src/main/java/dev/dbos/transact/StartWorkflowOptions.java @@ -7,6 +7,9 @@ import java.time.Instant; import java.util.concurrent.TimeUnit; +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; + /** * Options for starting a workflow, including: Assigning the workflow idempotency ID Enqueuing, with * options Setting a timeout. @@ -27,15 +30,17 @@ * distribute workflows across queue partitions for load balancing and ordered processing. */ public record StartWorkflowOptions( - String workflowId, - Timeout timeout, - Instant deadline, - String queueName, - String deduplicationId, - Integer priority, - String queuePartitionKey) { - - public StartWorkflowOptions { + @Nullable String workflowId, + @Nullable Timeout timeout, + @Nullable Instant deadline, + @Nullable String queueName, + @Nullable String deduplicationId, + @Nullable Integer priority, + @Nullable String queuePartitionKey) { + + public + @NonNull + StartWorkflowOptions { if (timeout instanceof Timeout.Explicit explicit) { if (explicit.value().isNegative() || explicit.value().isZero()) { throw new IllegalArgumentException( @@ -60,17 +65,21 @@ public record StartWorkflowOptions( } /** Construct with default options */ - public StartWorkflowOptions() { + public + @NonNull + StartWorkflowOptions() { this(null, null, null, null, null, null, null); } /** Construct with a specified workflow ID */ - public StartWorkflowOptions(String workflowId) { + public + @NonNull + StartWorkflowOptions(String workflowId) { this(workflowId, null, null, null, null, null, null); } /** Produces a new StartWorkflowOptions that overrides the ID assigned to the started workflow */ - public StartWorkflowOptions withWorkflowId(String workflowId) { + public @NonNull StartWorkflowOptions withWorkflowId(@Nullable String workflowId) { return new StartWorkflowOptions( workflowId, this.timeout, @@ -82,7 +91,7 @@ public StartWorkflowOptions withWorkflowId(String workflowId) { } /** Produces a new StartWorkflowOptions that overrides timeout value for the started workflow */ - public StartWorkflowOptions withTimeout(Timeout timeout) { + public @NonNull StartWorkflowOptions withTimeout(@Nullable Timeout timeout) { return new StartWorkflowOptions( this.workflowId, timeout, @@ -94,22 +103,22 @@ public StartWorkflowOptions withTimeout(Timeout timeout) { } /** Produces a new StartWorkflowOptions that overrides timeout value for the started workflow */ - public StartWorkflowOptions withTimeout(Duration timeout) { + public @NonNull StartWorkflowOptions withTimeout(@NonNull Duration timeout) { return withTimeout(Timeout.of(timeout)); } /** Produces a new StartWorkflowOptions that overrides timeout value for the started workflow */ - public StartWorkflowOptions withTimeout(long value, TimeUnit unit) { + public @NonNull StartWorkflowOptions withTimeout(long value, @NonNull TimeUnit unit) { return withTimeout(Duration.ofNanos(unit.toNanos(value))); } /** Produces a new StartWorkflowOptions that removes the timeout behavior */ - public StartWorkflowOptions withNoTimeout() { + public @NonNull StartWorkflowOptions withNoTimeout() { return withTimeout(Timeout.none()); } /** Produces a new StartWorkflowOptions that overrides deadline value for the started workflow */ - public StartWorkflowOptions withDeadline(Instant deadline) { + public @NonNull StartWorkflowOptions withDeadline(@Nullable Instant deadline) { return new StartWorkflowOptions( this.workflowId, this.timeout, @@ -121,7 +130,7 @@ public StartWorkflowOptions withDeadline(Instant deadline) { } /** Produces a new StartWorkflowOptions that assigns the started workflow to a queue */ - public StartWorkflowOptions withQueue(String queue) { + public @NonNull StartWorkflowOptions withQueue(@Nullable String queue) { return new StartWorkflowOptions( this.workflowId, this.timeout, @@ -133,7 +142,7 @@ public StartWorkflowOptions withQueue(String queue) { } /** Produces a new StartWorkflowOptions that assigns the started workflow to a queue */ - public StartWorkflowOptions withQueue(Queue queue) { + public @NonNull StartWorkflowOptions withQueue(@NonNull Queue queue) { return withQueue(queue.name()); } @@ -141,7 +150,7 @@ public StartWorkflowOptions withQueue(Queue queue) { * Produces a new StartWorkflowOptions that assigns a queue deduplication ID. Note that the queue * must also be specified. */ - public StartWorkflowOptions withDeduplicationId(String deduplicationId) { + public @NonNull StartWorkflowOptions withDeduplicationId(@Nullable String deduplicationId) { return new StartWorkflowOptions( this.workflowId, this.timeout, @@ -156,7 +165,7 @@ public StartWorkflowOptions withDeduplicationId(String deduplicationId) { * Produces a new StartWorkflowOptions that assigns a queue priority. Note that the queue must * also be specified and have prioritization enabled */ - public StartWorkflowOptions withPriority(Integer priority) { + public @NonNull StartWorkflowOptions withPriority(@Nullable Integer priority) { return new StartWorkflowOptions( this.workflowId, this.timeout, @@ -168,7 +177,7 @@ public StartWorkflowOptions withPriority(Integer priority) { } /** Produces a new StartWorkflowOptions that assigns a queue partition key */ - public StartWorkflowOptions withQueuePartitionKey(String queuePartitionKey) { + public @NonNull StartWorkflowOptions withQueuePartitionKey(@Nullable String queuePartitionKey) { return new StartWorkflowOptions( this.workflowId, this.timeout, @@ -181,7 +190,7 @@ public StartWorkflowOptions withQueuePartitionKey(String queuePartitionKey) { /** Get the assigned workflow ID, replacing empty with null */ @Override - public String workflowId() { + public @Nullable String workflowId() { return workflowId != null && workflowId.isEmpty() ? null : workflowId; } } diff --git a/transact/src/main/java/dev/dbos/transact/config/DBOSConfig.java b/transact/src/main/java/dev/dbos/transact/config/DBOSConfig.java index 311fb428..201dbb61 100644 --- a/transact/src/main/java/dev/dbos/transact/config/DBOSConfig.java +++ b/transact/src/main/java/dev/dbos/transact/config/DBOSConfig.java @@ -3,23 +3,25 @@ import dev.dbos.transact.Constants; import com.zaxxer.hikari.HikariDataSource; +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; public record DBOSConfig( - String appName, - String databaseUrl, - String dbUser, - String dbPassword, + @NonNull String appName, + @Nullable String databaseUrl, + @Nullable String dbUser, + @Nullable String dbPassword, int maximumPoolSize, int connectionTimeout, - HikariDataSource dataSource, + @Nullable HikariDataSource dataSource, boolean adminServer, int adminServerPort, boolean migrate, - String conductorKey, - String conductorDomain, - String appVersion, - String executorId, - String databaseSchema, + @Nullable String conductorKey, + @Nullable String conductorDomain, + @Nullable String appVersion, + @Nullable String executorId, + @Nullable String databaseSchema, boolean enablePatching) { public DBOSConfig { @@ -41,7 +43,7 @@ public record DBOSConfig( } } - public static DBOSConfig defaults(String appName) { + public static @NonNull DBOSConfig defaults(@NonNull String appName) { return new DBOSConfig( appName, null, null, null, 3, // maximumPoolSize default 30000, // connectionTimeout default @@ -51,7 +53,7 @@ public static DBOSConfig defaults(String appName) { null, null, null, null, null, false); } - public static DBOSConfig defaultsFromEnv(String appName) { + public static @NonNull DBOSConfig defaultsFromEnv(@NonNull String appName) { String databaseUrl = System.getenv(Constants.SYSTEM_JDBC_URL_ENV_VAR); String dbUser = System.getenv(Constants.POSTGRES_USER_ENV_VAR); if (dbUser == null || dbUser.isEmpty()) dbUser = "postgres"; @@ -62,7 +64,7 @@ public static DBOSConfig defaultsFromEnv(String appName) { .withDbPassword(dbPassword); } - public DBOSConfig withAppName(String v) { + public @NonNull DBOSConfig withAppName(@NonNull String v) { return new DBOSConfig( v, databaseUrl, @@ -82,7 +84,7 @@ public DBOSConfig withAppName(String v) { enablePatching); } - public DBOSConfig withDatabaseUrl(String v) { + public @NonNull DBOSConfig withDatabaseUrl(@Nullable String v) { return new DBOSConfig( appName, v, @@ -102,7 +104,7 @@ public DBOSConfig withDatabaseUrl(String v) { enablePatching); } - public DBOSConfig withDbUser(String v) { + public @NonNull DBOSConfig withDbUser(@Nullable String v) { return new DBOSConfig( appName, databaseUrl, @@ -122,7 +124,7 @@ public DBOSConfig withDbUser(String v) { enablePatching); } - public DBOSConfig withDbPassword(String v) { + public @NonNull DBOSConfig withDbPassword(@Nullable String v) { return new DBOSConfig( appName, databaseUrl, @@ -142,7 +144,7 @@ public DBOSConfig withDbPassword(String v) { enablePatching); } - public DBOSConfig withMaximumPoolSize(int v) { + public @NonNull DBOSConfig withMaximumPoolSize(int v) { return new DBOSConfig( appName, databaseUrl, @@ -162,7 +164,7 @@ public DBOSConfig withMaximumPoolSize(int v) { enablePatching); } - public DBOSConfig withConnectionTimeout(int v) { + public @NonNull DBOSConfig withConnectionTimeout(int v) { return new DBOSConfig( appName, databaseUrl, @@ -182,7 +184,7 @@ public DBOSConfig withConnectionTimeout(int v) { enablePatching); } - public DBOSConfig withDataSource(HikariDataSource v) { + public @NonNull DBOSConfig withDataSource(@Nullable HikariDataSource v) { return new DBOSConfig( appName, databaseUrl, @@ -202,7 +204,7 @@ public DBOSConfig withDataSource(HikariDataSource v) { enablePatching); } - public DBOSConfig withAdminServer(boolean v) { + public @NonNull DBOSConfig withAdminServer(boolean v) { return new DBOSConfig( appName, databaseUrl, @@ -222,7 +224,7 @@ public DBOSConfig withAdminServer(boolean v) { enablePatching); } - public DBOSConfig withAdminServerPort(int v) { + public @NonNull DBOSConfig withAdminServerPort(int v) { return new DBOSConfig( appName, databaseUrl, @@ -242,7 +244,7 @@ public DBOSConfig withAdminServerPort(int v) { enablePatching); } - public DBOSConfig withMigrate(boolean v) { + public @NonNull DBOSConfig withMigrate(boolean v) { return new DBOSConfig( appName, databaseUrl, @@ -262,7 +264,7 @@ public DBOSConfig withMigrate(boolean v) { enablePatching); } - public DBOSConfig withConductorKey(String v) { + public @NonNull DBOSConfig withConductorKey(@Nullable String v) { return new DBOSConfig( appName, databaseUrl, @@ -282,7 +284,7 @@ public DBOSConfig withConductorKey(String v) { enablePatching); } - public DBOSConfig withConductorDomain(String v) { + public @NonNull DBOSConfig withConductorDomain(@Nullable String v) { return new DBOSConfig( appName, databaseUrl, @@ -302,7 +304,7 @@ public DBOSConfig withConductorDomain(String v) { enablePatching); } - public DBOSConfig withAppVersion(String v) { + public @NonNull DBOSConfig withAppVersion(@Nullable String v) { return new DBOSConfig( appName, databaseUrl, @@ -322,7 +324,7 @@ public DBOSConfig withAppVersion(String v) { enablePatching); } - public DBOSConfig withExecutorId(String v) { + public @NonNull DBOSConfig withExecutorId(@Nullable String v) { return new DBOSConfig( appName, databaseUrl, @@ -342,7 +344,7 @@ public DBOSConfig withExecutorId(String v) { enablePatching); } - public DBOSConfig withDatabaseSchema(String v) { + public @NonNull DBOSConfig withDatabaseSchema(@Nullable String v) { return new DBOSConfig( appName, databaseUrl, @@ -362,15 +364,15 @@ public DBOSConfig withDatabaseSchema(String v) { enablePatching); } - public DBOSConfig withEnablePatching() { + public @NonNull DBOSConfig withEnablePatching() { return this.withEnablePatching(true); } - public DBOSConfig withDisablePatching() { + public @NonNull DBOSConfig withDisablePatching() { return this.withEnablePatching(false); } - public DBOSConfig withEnablePatching(boolean v) { + public @NonNull DBOSConfig withEnablePatching(boolean v) { return new DBOSConfig( appName, databaseUrl, @@ -390,11 +392,11 @@ public DBOSConfig withEnablePatching(boolean v) { v); } - public DBOSConfig enableAdminServer() { + public @NonNull DBOSConfig enableAdminServer() { return withAdminServer(true); } - public DBOSConfig disableAdminServer() { + public @NonNull DBOSConfig disableAdminServer() { return withAdminServer(false); } diff --git a/transact/src/main/java/dev/dbos/transact/context/WorkflowOptions.java b/transact/src/main/java/dev/dbos/transact/context/WorkflowOptions.java index 6983229e..2ec14bed 100644 --- a/transact/src/main/java/dev/dbos/transact/context/WorkflowOptions.java +++ b/transact/src/main/java/dev/dbos/transact/context/WorkflowOptions.java @@ -6,6 +6,9 @@ import java.time.Instant; import java.util.concurrent.TimeUnit; +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; + /** * The WorkflowOptions class is used to specify options for DBOS workflow functions that are invoked * synchronously. For example, the following construct will run a workflow under id `wfId`, and @@ -16,7 +19,8 @@ * @param timeout The timeout to be assigned to the next workflow in the DBOS context * @param deadline The deadline to be assigned to the next workflow in the DBOS context */ -public record WorkflowOptions(String workflowId, Timeout timeout, Instant deadline) { +public record WorkflowOptions( + @Nullable String workflowId, @Nullable Timeout timeout, @Nullable Instant deadline) { public WorkflowOptions { if (timeout instanceof Timeout.Explicit explicit) { @@ -33,17 +37,21 @@ public record WorkflowOptions(String workflowId, Timeout timeout, Instant deadli } /** Create a WorkflowOptions with no ID and no timout */ - public WorkflowOptions() { + public + @NonNull + WorkflowOptions() { this(null, null, null); } /** Create a WorkflowOptions with a specified workflow ID and no timout */ - public WorkflowOptions(String workflowId) { + public + @NonNull + WorkflowOptions(@Nullable String workflowId) { this(workflowId, null, null); } /** Create a WorkflowOptions like this one, but with the workflowId set */ - public WorkflowOptions withWorkflowId(String workflowId) { + public @NonNull WorkflowOptions withWorkflowId(@Nullable String workflowId) { return new WorkflowOptions(workflowId, this.timeout, this.deadline); } @@ -52,7 +60,7 @@ public WorkflowOptions withWorkflowId(String workflowId) { * * @param timeout timeout to use, expressed as a `dev.dbos.transact.workflow.Timeout` */ - public WorkflowOptions withTimeout(Timeout timeout) { + public @NonNull WorkflowOptions withTimeout(@Nullable Timeout timeout) { return new WorkflowOptions(this.workflowId, timeout, this.deadline); } @@ -61,7 +69,7 @@ public WorkflowOptions withTimeout(Timeout timeout) { * * @param timeout timeout to use, expressed as a `java.util.Duration` */ - public WorkflowOptions withTimeout(Duration timeout) { + public @NonNull WorkflowOptions withTimeout(@NonNull Duration timeout) { return new WorkflowOptions(this.workflowId, Timeout.of(timeout), this.deadline); } @@ -71,7 +79,7 @@ public WorkflowOptions withTimeout(Duration timeout) { * @param value timeout value to use, expressed as a value (see `unit`) * @param unit units to use for interpreting timeout `value` */ - public WorkflowOptions withTimeout(long value, TimeUnit unit) { + public @NonNull WorkflowOptions withTimeout(long value, @NonNull TimeUnit unit) { return withTimeout(Duration.ofNanos(unit.toNanos(value))); } @@ -80,12 +88,12 @@ public WorkflowOptions withTimeout(long value, TimeUnit unit) { * * @param deadline deadline to use, expressed as a `java.util.Instant` */ - public WorkflowOptions withDeadline(Instant deadline) { + public @NonNull WorkflowOptions withDeadline(@Nullable Instant deadline) { return new WorkflowOptions(this.workflowId, this.timeout, deadline); } /** Create a workflow options like this one, but without a timeout */ - public WorkflowOptions withNoTimeout() { + public @NonNull WorkflowOptions withNoTimeout() { return new WorkflowOptions(this.workflowId, Timeout.none(), this.deadline); } @@ -93,7 +101,7 @@ public WorkflowOptions withNoTimeout() { * @return The workflow ID that will be used */ @Override - public String workflowId() { + public @Nullable String workflowId() { return workflowId != null && workflowId.isEmpty() ? null : workflowId; } @@ -102,7 +110,7 @@ public String workflowId() { * Should be called as an AutoCloseable so that the context is restored at the end of the block. * try (var _i = new WorkflowOptions(...).setContext()) { ... } */ - public Guard setContext() { + public @NonNull Guard setContext() { var ctx = DBOSContextHolder.get(); var guard = new Guard(ctx); @@ -126,7 +134,9 @@ public static class Guard implements AutoCloseable { private final Timeout timeout; private final Instant deadline; - private Guard(DBOSContext ctx) { + private + @NonNull + Guard(@NonNull DBOSContext ctx) { this.ctx = ctx; this.nextWorkflowId = ctx.nextWorkflowId; this.timeout = ctx.nextTimeout;