diff --git a/src/main/java/dev/dbos/transact/DBOS.java b/src/main/java/dev/dbos/transact/DBOS.java index 4e8ad5f2..e76c2c3f 100644 --- a/src/main/java/dev/dbos/transact/DBOS.java +++ b/src/main/java/dev/dbos/transact/DBOS.java @@ -3,6 +3,7 @@ import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.execution.DBOSExecutor; +import dev.dbos.transact.interceptor.AsyncInvocationHandler; import dev.dbos.transact.interceptor.TransactInvocationHandler; import dev.dbos.transact.migrations.DatabaseMigrator; import org.slf4j.Logger; @@ -58,30 +59,29 @@ public WorkflowBuilder Workflow() { // inner builder class for workflows public static class WorkflowBuilder { private Class interfaceClass; - private T implementation; - + private Object implementation; public WorkflowBuilder interfaceClass(Class iface) { this.interfaceClass = iface; return this; } - public WorkflowBuilder implementation(T impl) { + public WorkflowBuilder implementation(Object impl) { this.implementation = impl; return this; } - public T build() { if (interfaceClass == null || implementation == null) { throw new IllegalStateException("Interface and implementation must be set"); } - return TransactInvocationHandler.createProxy( - interfaceClass, - implementation, - DBOS.getInstance().dbosExecutor + return AsyncInvocationHandler.createProxy( + interfaceClass, + implementation, + DBOS.getInstance().dbosExecutor ); + } } @@ -96,6 +96,7 @@ public void launch() { public void shutdown() { dbosExecutor.shutdown(); + instance = null ; } diff --git a/src/main/java/dev/dbos/transact/context/DBOSContextHolder.java b/src/main/java/dev/dbos/transact/context/DBOSContextHolder.java index cb497e87..4626b9ae 100644 --- a/src/main/java/dev/dbos/transact/context/DBOSContextHolder.java +++ b/src/main/java/dev/dbos/transact/context/DBOSContextHolder.java @@ -11,5 +11,9 @@ public static void clear() { contextHolder.remove(); } + public static void set(DBOSContext context) { + contextHolder.set(context); + } + } diff --git a/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/src/main/java/dev/dbos/transact/database/SystemDatabase.java index 7ab7226a..77b5be3d 100644 --- a/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -1,17 +1,13 @@ package dev.dbos.transact.database; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import dev.dbos.transact.Constants; import dev.dbos.transact.config.DBOSConfig; -import dev.dbos.transact.exceptions.DBOSDeadLetterQueueException; -import dev.dbos.transact.exceptions.DBOSException; -import dev.dbos.transact.exceptions.DBOSQueueDuplicatedException; -import dev.dbos.transact.exceptions.DBOSWorkflowConflictException; +import dev.dbos.transact.exceptions.*; import dev.dbos.transact.json.JSONUtil; -import dev.dbos.transact.workflow.GetWorkflowsInput; +import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.WorkflowState; import dev.dbos.transact.workflow.WorkflowStatus; import dev.dbos.transact.workflow.internal.InsertWorkflowResult; @@ -428,7 +424,23 @@ public void updateWorkflowStatus( } } -public List getWorkflows(GetWorkflowsInput input) throws SQLException { +public WorkflowStatus getWorkflow(String workflowId) { + + try { + ListWorkflowsInput input = new ListWorkflowsInput(); + input.setWorkflowIDs(Arrays.asList(workflowId)); + List output = listWorkflows(input) ; + if (output.size() > 0) { + return output.get(0); + } + } catch (SQLException e) { + logger.error("Error retrieving workflow for "+workflowId, e); + } + + throw new NonExistentWorkflowException(workflowId) ; +} + +public List listWorkflows(ListWorkflowsInput input) throws SQLException { List workflows = new ArrayList<>(); @@ -560,12 +572,13 @@ public List getWorkflows(GetWorkflowsInput input) throws SQLExce String serializedError = rs.getString("error"); if (serializedInput != null) { - info.setInput(JSONUtil.fromJson(serializedInput, Object[].class)); + info.setInput((Object[])JSONUtil.deserialize((serializedInput)) ); } if (serializedOutput != null) { - info.setOutput(JSONUtil.fromJson(serializedOutput, Object.class)); + info.setOutput(JSONUtil.deserialize(serializedOutput)); } + info.setError(serializedError); info.setWorkflowDeadlineEpochMs(rs.getObject("workflow_deadline_epoch_ms", Long.class)); diff --git a/src/main/java/dev/dbos/transact/exceptions/ErrorCode.java b/src/main/java/dev/dbos/transact/exceptions/ErrorCode.java index 58279a7a..11944bd4 100644 --- a/src/main/java/dev/dbos/transact/exceptions/ErrorCode.java +++ b/src/main/java/dev/dbos/transact/exceptions/ErrorCode.java @@ -5,7 +5,8 @@ public enum ErrorCode { UNEXPECTED(1), WORKFLOW_CONFLICT(2), QUEUE_DUPLICATED(3) , - DEAD_LETTER_QUEUE(4) ; + DEAD_LETTER_QUEUE(4) , + NONEXISTENT_WORKFLOW(5) ; private int code ; diff --git a/src/main/java/dev/dbos/transact/exceptions/NonExistentWorkflowException.java b/src/main/java/dev/dbos/transact/exceptions/NonExistentWorkflowException.java new file mode 100644 index 00000000..de130733 --- /dev/null +++ b/src/main/java/dev/dbos/transact/exceptions/NonExistentWorkflowException.java @@ -0,0 +1,15 @@ +package dev.dbos.transact.exceptions; + +public class NonExistentWorkflowException extends DBOSException{ + private String workflowId ; + public NonExistentWorkflowException(String workflowId) { + super(ErrorCode.NONEXISTENT_WORKFLOW.getCode(), + String.format("Workflow does not exist %s", workflowId)); + this.workflowId = workflowId; + + } + + public String getWorkflowId() { + return workflowId; + } +} diff --git a/src/main/java/dev/dbos/transact/execution/ContextAwareCallable.java b/src/main/java/dev/dbos/transact/execution/ContextAwareCallable.java new file mode 100644 index 00000000..554c1991 --- /dev/null +++ b/src/main/java/dev/dbos/transact/execution/ContextAwareCallable.java @@ -0,0 +1,26 @@ +package dev.dbos.transact.execution; + +import dev.dbos.transact.context.DBOSContext; +import dev.dbos.transact.context.DBOSContextHolder; + +import java.util.concurrent.Callable; + +public class ContextAwareCallable implements Callable { + private final Callable task; + private final DBOSContext capturedContext; + + public ContextAwareCallable(Callable task) { + this.task = task; + this.capturedContext = DBOSContextHolder.get(); + } + + @Override + public T call() throws Exception { + DBOSContextHolder.set(capturedContext); + try { + return task.call(); + } finally { + DBOSContextHolder.clear(); + } + } +} diff --git a/src/main/java/dev/dbos/transact/execution/ContextAwareRunnable.java b/src/main/java/dev/dbos/transact/execution/ContextAwareRunnable.java new file mode 100644 index 00000000..c37fc502 --- /dev/null +++ b/src/main/java/dev/dbos/transact/execution/ContextAwareRunnable.java @@ -0,0 +1,24 @@ +package dev.dbos.transact.execution; + +import dev.dbos.transact.context.DBOSContext; +import dev.dbos.transact.context.DBOSContextHolder; + +public class ContextAwareRunnable implements Runnable { + private final Runnable task; + private final DBOSContext capturedContext; + + public ContextAwareRunnable(Runnable task) { + this.task = task; + this.capturedContext = DBOSContextHolder.get(); + } + + @Override + public void run() { + DBOSContextHolder.set(capturedContext); + try { + task.run(); + } finally { + DBOSContextHolder.clear(); + } + } +} diff --git a/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index 1891fdbb..09680794 100644 --- a/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -6,15 +6,22 @@ import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.exceptions.DBOSException; import dev.dbos.transact.json.JSONUtil; +import dev.dbos.transact.workflow.WorkflowHandle; import dev.dbos.transact.workflow.WorkflowState; import dev.dbos.transact.workflow.WorkflowStatus; +import dev.dbos.transact.workflow.internal.WorkflowHandleFuture; import dev.dbos.transact.workflow.internal.WorkflowStatusInternal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; import java.sql.SQLException; +import java.util.Optional; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import static dev.dbos.transact.exceptions.ErrorCode.UNEXPECTED; @@ -22,11 +29,13 @@ public class DBOSExecutor { private DBOSConfig config; private SystemDatabase systemDatabase; + private ExecutorService executorService ; Logger logger = LoggerFactory.getLogger(DBOSExecutor.class); public DBOSExecutor(DBOSConfig config, SystemDatabase sysdb) { this.config = config; this.systemDatabase = sysdb ; + this.executorService = Executors.newCachedThreadPool(); } @@ -38,18 +47,12 @@ public SystemDatabase.WorkflowInitResult preInvokeWorkflow(String workflowName, String interfaceName, String className, String methodName, - Object[] inputs) { + Object[] inputs, + String workflowId) { logger.info("In preInvokeWorkflow") ; - DBOSContext ctx = DBOSContextHolder.get(); - String workflowId = ctx.getWorkflowId() ; - - if (workflowId == null) { - workflowId = UUID.randomUUID().toString(); - } - - String inputString = JSONUtil.toJson(inputs); + String inputString = JSONUtil.serialize(inputs) ; WorkflowStatusInternal workflowStatusInternal = new WorkflowStatusInternal(workflowId, @@ -79,6 +82,7 @@ public SystemDatabase.WorkflowInitResult preInvokeWorkflow(String workflowName, try { initResult = systemDatabase.initWorkflowStatus(workflowStatusInternal, 3); } catch (SQLException e) { + logger.error("Error inserting into workflow_status", e); throw new DBOSException(UNEXPECTED.getCode(), e.getMessage(),e) ; } @@ -87,12 +91,9 @@ public SystemDatabase.WorkflowInitResult preInvokeWorkflow(String workflowName, public void postInvokeWorkflow(String workflowId, Object result) { - String resultString = JSONUtil.toJson(result); - - + String resultString = JSONUtil.serialize(result); systemDatabase.recordWorkflowOutput(workflowId, resultString); - logger.info("In post Invoke workflow with result") ; } public void postInvokeWorkflow(String workflowId, Throwable error) { @@ -101,27 +102,41 @@ public void postInvokeWorkflow(String workflowId, Throwable error) { systemDatabase.recordWorkflowError(workflowId, errorString); - logger.info("In post Invoke workflow with error") ; } public T runWorkflow(String workflowName, String targetClassName, String methodName, Object[] args, - DBOSFunction function) throws Throwable { + DBOSFunction function, + String workflowId) throws Throwable { + String wfid = workflowId ; - SystemDatabase.WorkflowInitResult initResult = preInvokeWorkflow(workflowName, null, targetClassName, methodName, args); - if (initResult.getStatus().equals(WorkflowState.SUCCESS.name())) { - return (T) systemDatabase.getWorkflowResult(initResult.getWorkflowId()).get(); - } else if (initResult.getStatus().equals(WorkflowState.ERROR.name())) { - logger.warn("Idempotency check not impl for error"); - } else if (initResult.getStatus().equals(WorkflowState.CANCELLED.name())) { - logger.warn("Idempotency check not impl for cancelled"); - } + if (wfid == null) { + DBOSContext ctx = DBOSContextHolder.get(); + wfid = ctx.getWorkflowId() ; + if (wfid == null) { + wfid = UUID.randomUUID().toString(); + } + } + SystemDatabase.WorkflowInitResult initResult = null; try { + + initResult = preInvokeWorkflow(workflowName, null, + targetClassName, methodName, args, wfid); + + if (initResult.getStatus().equals(WorkflowState.SUCCESS.name())) { + return (T) systemDatabase.getWorkflowResult(initResult.getWorkflowId()).get(); + } else if (initResult.getStatus().equals(WorkflowState.ERROR.name())) { + logger.warn("Idempotency check not impl for error"); + } else if (initResult.getStatus().equals(WorkflowState.CANCELLED.name())) { + logger.warn("Idempotency check not impl for cancelled"); + } + + logger.info("Before executing workflow") ; T result = function.execute(); // invoke the lambda logger.info("After: Workflow completed successfully"); postInvokeWorkflow(initResult.getWorkflowId(), result); @@ -135,6 +150,50 @@ public T runWorkflow(String workflowName, } } + public WorkflowHandle submitWorkflow(String workflowName, + String targetClassName, + String methodName, + Object[] args, + DBOSFunction function) throws Throwable { + DBOSContext ctx = DBOSContextHolder.get(); + String workflowId = ctx.getWorkflowId() ; + + if (workflowId == null) { + workflowId = UUID.randomUUID().toString(); + } + + final String wfId = workflowId ; + + Callable task = () -> { + T result = null ; + logger.info("Callable executing the workflow.. " + wfId); + try { + + result = runWorkflow(workflowName, + targetClassName, + methodName, + args, + function, + wfId); + + + } catch (Throwable e) { + Throwable actual = (e instanceof InvocationTargetException) + ? ((InvocationTargetException) e).getTargetException() + : e; + + logger.error("Error executing workflow", actual); + + } + + return result ; + }; + + Future future = executorService.submit(task); + + return new WorkflowHandleFuture(workflowId, future, systemDatabase); + + } } diff --git a/src/main/java/dev/dbos/transact/interceptor/AsyncInvocationHandler.java b/src/main/java/dev/dbos/transact/interceptor/AsyncInvocationHandler.java new file mode 100644 index 00000000..32b3972e --- /dev/null +++ b/src/main/java/dev/dbos/transact/interceptor/AsyncInvocationHandler.java @@ -0,0 +1,86 @@ +package dev.dbos.transact.interceptor; + +import dev.dbos.transact.execution.DBOSExecutor; +import dev.dbos.transact.workflow.Workflow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +public class AsyncInvocationHandler implements InvocationHandler { + + private static final Logger logger = LoggerFactory.getLogger(AsyncInvocationHandler.class); + + private final Object target; + private final String targetClassName ; + private final DBOSExecutor dbosExecutor ; + + public static T createProxy(Class interfaceClass, Object implementation, DBOSExecutor executor) { + if (!interfaceClass.isInterface()) { + throw new IllegalArgumentException("interfaceClass must be an interface"); + } + + T proxy = (T) Proxy.newProxyInstance( + interfaceClass.getClassLoader(), + new Class[] { interfaceClass }, + new AsyncInvocationHandler(implementation, executor)) ; + + + return proxy; + } + + + public AsyncInvocationHandler(Object target, DBOSExecutor dbosExecutor) { + this.target = target; + this.targetClassName = target.getClass().getName(); + this.dbosExecutor = dbosExecutor ; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + + Method targetMethod = findMethodWithSameSignature(target.getClass(), method); + + Workflow wfAnnotation = targetMethod.getAnnotation(Workflow.class) ; + + if (wfAnnotation != null) { + + String workflowName = wfAnnotation.name().isEmpty() ? targetMethod.getName() : wfAnnotation.name(); + + String msg = String.format("Before: Starting workflow '%s' (timeout: %ds)%n", + workflowName, + wfAnnotation.timeout()); + + logger.info(msg); + + return dbosExecutor.submitWorkflow( + workflowName, + targetClassName, + method.getName(), + args, + () -> (Object) targetMethod.invoke(target, args) + ); + + + } else { + throw new RuntimeException("workflow annotation expected on target method"); + } + + } + + private Method findMethodWithSameSignature(Class clazz, Method interfaceMethod) throws NoSuchMethodException { + for (Method m : clazz.getMethods()) { + if (m.getName().equals(interfaceMethod.getName()) && + Arrays.equals(m.getParameterTypes(), interfaceMethod.getParameterTypes())) { + return m; + } + } + throw new NoSuchMethodException("Matching method not found"); + } +} diff --git a/src/main/java/dev/dbos/transact/interceptor/TransactInvocationHandler.java b/src/main/java/dev/dbos/transact/interceptor/TransactInvocationHandler.java index 14ea2999..e3ccecf1 100644 --- a/src/main/java/dev/dbos/transact/interceptor/TransactInvocationHandler.java +++ b/src/main/java/dev/dbos/transact/interceptor/TransactInvocationHandler.java @@ -12,6 +12,11 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; +/** + * + * To be removed: See AsyncInvocationHandler + * + */ public class TransactInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(TransactInvocationHandler.class); @@ -21,7 +26,7 @@ public class TransactInvocationHandler implements InvocationHandler { private final DBOSExecutor dbosExecutor ; @SuppressWarnings("unchecked") - public static T createProxy(Class interfaceClass, T implementation, DBOSExecutor executor) { + public static T createProxy(Class interfaceClass, Object implementation, DBOSExecutor executor) { if (!interfaceClass.isInterface()) { throw new IllegalArgumentException("interfaceClass must be an interface"); } @@ -69,12 +74,14 @@ protected Object handleWorkflow(Method method, Object[] args, Workflow workflow) logger.info(msg); + return dbosExecutor.runWorkflow( workflowName, targetClassName, method.getName(), args, - () -> (Object) method.invoke(target, args) + () -> (Object) method.invoke(target, args), + null ); } diff --git a/src/main/java/dev/dbos/transact/json/JSONUtil.java b/src/main/java/dev/dbos/transact/json/JSONUtil.java index 5af66cec..618922f0 100644 --- a/src/main/java/dev/dbos/transact/json/JSONUtil.java +++ b/src/main/java/dev/dbos/transact/json/JSONUtil.java @@ -9,9 +9,6 @@ public class JSONUtil { - - - private static final ObjectMapper mapper = new ObjectMapper(); static { diff --git a/src/main/java/dev/dbos/transact/workflow/GetWorkflowsInput.java b/src/main/java/dev/dbos/transact/workflow/ListWorkflowsInput.java similarity index 96% rename from src/main/java/dev/dbos/transact/workflow/GetWorkflowsInput.java rename to src/main/java/dev/dbos/transact/workflow/ListWorkflowsInput.java index 37dcd88b..aabf5162 100644 --- a/src/main/java/dev/dbos/transact/workflow/GetWorkflowsInput.java +++ b/src/main/java/dev/dbos/transact/workflow/ListWorkflowsInput.java @@ -3,7 +3,7 @@ import java.util.List; import java.time.OffsetDateTime; -public class GetWorkflowsInput { +public class ListWorkflowsInput { private List workflowIDs; private String workflowName; private String authenticatedUser; @@ -16,10 +16,10 @@ public class GetWorkflowsInput { private Boolean sortDesc; private String workflowIdPrefix; - public GetWorkflowsInput() { + public ListWorkflowsInput() { } - public GetWorkflowsInput( + public ListWorkflowsInput( List workflowIDs, String workflowName, String authenticatedUser, OffsetDateTime startTime, OffsetDateTime endTime, String status, String applicationVersion, Integer limit, Integer offset, Boolean sortDesc, diff --git a/src/main/java/dev/dbos/transact/workflow/WorkflowHandle.java b/src/main/java/dev/dbos/transact/workflow/WorkflowHandle.java new file mode 100644 index 00000000..24142b95 --- /dev/null +++ b/src/main/java/dev/dbos/transact/workflow/WorkflowHandle.java @@ -0,0 +1,24 @@ +package dev.dbos.transact.workflow; + +public interface WorkflowHandle { + + /** + * Return the applicable workflow ID. + * Corresponds to the 'workflow_id' attribute and 'get_workflow_id' method in Python. + * @return The workflow ID. + */ + String getWorkflowId(); + + /** + * Return the result of the workflow function invocation, waiting if necessary. + * This method blocks until the result is available. + * @return The result of the workflow invocation. + */ + T getResult() ; + + /** + * Return the current workflow function invocation status as `WorkflowStatus`. + * @return The current status of the workflow. + */ + WorkflowStatus getStatus(); +} diff --git a/src/main/java/dev/dbos/transact/workflow/internal/WorkflowHandleFuture.java b/src/main/java/dev/dbos/transact/workflow/internal/WorkflowHandleFuture.java new file mode 100644 index 00000000..842a713e --- /dev/null +++ b/src/main/java/dev/dbos/transact/workflow/internal/WorkflowHandleFuture.java @@ -0,0 +1,42 @@ +package dev.dbos.transact.workflow.internal; + +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.exceptions.DBOSException; +import dev.dbos.transact.workflow.WorkflowHandle; +import dev.dbos.transact.workflow.WorkflowStatus; + +import java.util.concurrent.Future; + +import static dev.dbos.transact.exceptions.ErrorCode.UNEXPECTED; + +public class WorkflowHandleFuture implements WorkflowHandle { + + private String workflowId; + private Future futureResult; + private SystemDatabase systemDatabase; + + public WorkflowHandleFuture(String workflowId, Future future, SystemDatabase sysdb) { + this.workflowId = workflowId; + this.futureResult = future ; + this.systemDatabase = sysdb; + } + + @Override + public String getWorkflowId() { + return workflowId; + } + + @Override + public T getResult() { + try { + return futureResult.get(); + } catch (Exception e) { + throw new DBOSException(UNEXPECTED.getCode(), e.getMessage()) ; + } + } + + @Override + public WorkflowStatus getStatus() { + return systemDatabase.getWorkflow(workflowId) ; + } +} diff --git a/src/test/java/dev/dbos/transact/interceptor/TransactInvocationHandlerTest.java b/src/test/java/dev/dbos/transact/interceptor/TransactInvocationHandlerTest.java index f567d2e6..95ecf54a 100644 --- a/src/test/java/dev/dbos/transact/interceptor/TransactInvocationHandlerTest.java +++ b/src/test/java/dev/dbos/transact/interceptor/TransactInvocationHandlerTest.java @@ -27,7 +27,7 @@ void invokeWorkflow() throws Throwable { DBOSExecutor executor = mock(DBOSExecutor.class) ; - doReturn("Processed: test-item").when(executor).runWorkflow(anyString(), anyString(), anyString(), any(Object[].class), any()); + doReturn("Processed: test-item").when(executor).runWorkflow(anyString(), anyString(), anyString(), any(Object[].class), any(),any()); TransactInvocationHandler realHandler = new TransactInvocationHandler(impl, executor); @@ -55,7 +55,8 @@ void invokeStep() throws Throwable { OrderServiceImpl impl = new OrderServiceImpl(); DBOSExecutor executor = mock(DBOSExecutor.class) ; - doReturn(new SystemDatabase.WorkflowInitResult("121","PENDING",123L)).when(executor).preInvokeWorkflow(anyString(), anyString(), anyString(), anyString(), any(Object[].class)); + doReturn(new SystemDatabase.WorkflowInitResult("121","PENDING",123L)).when(executor).preInvokeWorkflow(anyString(), + anyString(), anyString(), anyString(), any(Object[].class), any()); doNothing().when(executor).postInvokeWorkflow(anyString(), any()); TransactInvocationHandler realHandler = @@ -84,7 +85,8 @@ void invokeTransaction() throws Throwable { OrderServiceImpl impl = new OrderServiceImpl(); DBOSExecutor executor = mock(DBOSExecutor.class) ; - doReturn(new SystemDatabase.WorkflowInitResult("121","PENDING",123L)).when(executor).preInvokeWorkflow(anyString(), anyString(), anyString(), anyString(), any(Object[].class)); + doReturn(new SystemDatabase.WorkflowInitResult("121","PENDING",123L)).when(executor) + .preInvokeWorkflow(anyString(), anyString(), anyString(), anyString(), any(Object[].class),any()); doNothing().when(executor).postInvokeWorkflow(anyString(), any()); TransactInvocationHandler realHandler = diff --git a/src/test/java/dev/dbos/transact/workflow/BasicWorkflowTest.java b/src/test/java/dev/dbos/transact/workflow/AsyncWorkflowTest.java similarity index 64% rename from src/test/java/dev/dbos/transact/workflow/BasicWorkflowTest.java rename to src/test/java/dev/dbos/transact/workflow/AsyncWorkflowTest.java index 16ab4bcf..e1a5a183 100644 --- a/src/test/java/dev/dbos/transact/workflow/BasicWorkflowTest.java +++ b/src/test/java/dev/dbos/transact/workflow/AsyncWorkflowTest.java @@ -1,6 +1,5 @@ package dev.dbos.transact.workflow; - import dev.dbos.transact.DBOS; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.context.SetWorkflowID; @@ -17,10 +16,10 @@ import java.sql.Statement; import java.util.List; -import static org.junit.jupiter.api.Assertions.*; - -public class BasicWorkflowTest { +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +public class AsyncWorkflowTest { private static DBOSConfig dbosConfig; private static DBOS dbos ; @@ -30,7 +29,7 @@ public class BasicWorkflowTest { @BeforeAll static void onetimeSetup() throws Exception { - BasicWorkflowTest.dbosConfig = new DBOSConfig + AsyncWorkflowTest.dbosConfig = new DBOSConfig .Builder() .name("systemdbtest") .dbHost("localhost") @@ -73,19 +72,21 @@ void beforeEachTest() throws SQLException { systemDatabase.deleteWorkflowsTestHelper(); } - @Test public void workflowWithOneInput() throws SQLException { SimpleService simpleService = dbos.Workflow() .interfaceClass(SimpleService.class) - .implementation(new SimpleServiceImpl()) + .implementation(new WorkflowImpl()) .build(); - String result = simpleService.workWithString("test-item"); + WorkflowHandle handle = simpleService.workWithString("test-item"); + String result = handle.getResult(); assertEquals("Processed: test-item", result); + assertNotNull(handle.getWorkflowId()); + assertEquals("SUCCESS",handle.getStatus().getStatus()) ; - List wfs = systemDatabase.getWorkflows(new GetWorkflowsInput()) ; + List wfs = systemDatabase.listWorkflows(new ListWorkflowsInput()) ; assertEquals(1, wfs.size()); assertEquals(wfs.get(0).getName(),"workWithString"); assertNotNull(wfs.get(0).getWorkflowId()); @@ -94,92 +95,103 @@ public void workflowWithOneInput() throws SQLException { } - @Test - public void workflowWithError() throws SQLException{ - - SimpleService simpleService = dbos.Workflow() - .interfaceClass(SimpleService.class) - .implementation(new SimpleServiceImpl()) - .build(); - - try { - simpleService.workWithError(); - } catch (Exception e) { - assertEquals("DBOS Test error", e.getMessage()); - } - - List wfs = systemDatabase.getWorkflows(new GetWorkflowsInput()) ; - assertEquals(1, wfs.size()); - assertEquals(wfs.get(0).getName(),"workError"); - assertNotNull(wfs.get(0).getWorkflowId()); - - } - @Test public void setWorkflowId() throws SQLException { SimpleService simpleService = dbos.Workflow() .interfaceClass(SimpleService.class) - .implementation(new SimpleServiceImpl()) + .implementation(new WorkflowImpl()) .build(); - String result = null ; + WorkflowHandle handle = null ; try (SetWorkflowID id = new SetWorkflowID("wf-123")){ - result = simpleService.workWithString("test-item"); + handle = simpleService.workWithString("test-item"); } + String result = handle.getResult(); assertEquals("Processed: test-item", result); + assertEquals("wf-123", handle.getWorkflowId()); + assertEquals("SUCCESS", handle.getStatus().getStatus()) ; - List wfs = systemDatabase.getWorkflows(new GetWorkflowsInput()) ; + List wfs = systemDatabase.listWorkflows(new ListWorkflowsInput()) ; assertEquals(1, wfs.size()); assertEquals(wfs.get(0).getName(),"workWithString"); assertEquals("wf-123",wfs.get(0).getWorkflowId()); } - @Test public void sameWorkflowId() throws SQLException { SimpleService simpleService = dbos.Workflow() .interfaceClass(SimpleService.class) - .implementation(new SimpleServiceImpl()) + .implementation(new WorkflowImpl()) .build(); - String result = null ; - - SimpleServiceImpl.executionCount =0 ; + WorkflowImpl.executionCount =0 ; + WorkflowHandle handle = null; try (SetWorkflowID id = new SetWorkflowID("wf-123")){ - result = simpleService.workWithString("test-item"); + handle = simpleService.workWithString("test-item"); } + String result = handle.getResult() ; assertEquals("Processed: test-item", result); + assertEquals("wf-123",handle.getWorkflowId()); - List wfs = systemDatabase.getWorkflows(new GetWorkflowsInput()) ; + List wfs = systemDatabase.listWorkflows(new ListWorkflowsInput()) ; assertEquals(1, wfs.size()); assertEquals(wfs.get(0).getName(),"workWithString"); assertEquals("wf-123",wfs.get(0).getWorkflowId()); - assertEquals(1, SimpleServiceImpl.executionCount); - try (SetWorkflowID id = new SetWorkflowID("wf-123")){ - result = simpleService.workWithString("test-item"); + handle = simpleService.workWithString("test-item"); } - assertEquals(1, SimpleServiceImpl.executionCount); - wfs = systemDatabase.getWorkflows(new GetWorkflowsInput()) ; + result = handle.getResult(); + assertEquals(1, WorkflowImpl.executionCount); + // TODO fix deser has quotes assertEquals("Processed: test-item", result); + assertEquals("wf-123",handle.getWorkflowId()); + + wfs = systemDatabase.listWorkflows(new ListWorkflowsInput()) ; assertEquals(1, wfs.size()); assertEquals("wf-123",wfs.get(0).getWorkflowId()); try (SetWorkflowID id = new SetWorkflowID("wf-124")){ - result = simpleService.workWithString("test-item"); + handle = simpleService.workWithString("test-item"); } + result = handle.getResult(); + assertEquals("wf-124",handle.getWorkflowId()); - assertEquals(2, SimpleServiceImpl.executionCount); - wfs = systemDatabase.getWorkflows(new GetWorkflowsInput()) ; + assertEquals(2, WorkflowImpl.executionCount); + wfs = systemDatabase.listWorkflows(new ListWorkflowsInput()) ; assertEquals(2, wfs.size()); assertEquals("wf-124",wfs.get(1).getWorkflowId()); } + + @Test + public void workflowWithError() throws SQLException { + + SimpleService simpleService = dbos.Workflow() + .interfaceClass(SimpleService.class) + .implementation(new WorkflowImpl()) + .build(); + + WorkflowHandle handle = null; + try { + handle = simpleService.workWithError(); + } catch (Exception e) { + assertEquals("DBOS Test error", e.getMessage()); + } + + handle.getResult() ; + List wfs = systemDatabase.listWorkflows(new ListWorkflowsInput()) ; + assertEquals(1, wfs.size()); + assertEquals(wfs.get(0).getName(),"workError"); + assertNotNull(wfs.get(0).getWorkflowId()); + assertEquals(wfs.get(0).getWorkflowId(),handle.getWorkflowId()); + assertEquals(WorkflowState.ERROR.name(), handle.getStatus().getStatus()); + + } } diff --git a/src/test/java/dev/dbos/transact/workflow/SimpleService.java b/src/test/java/dev/dbos/transact/workflow/SimpleService.java index 7989b760..3a6b400d 100644 --- a/src/test/java/dev/dbos/transact/workflow/SimpleService.java +++ b/src/test/java/dev/dbos/transact/workflow/SimpleService.java @@ -2,7 +2,7 @@ public interface SimpleService { - public String workWithString(String input); + public WorkflowHandle workWithString(String input); - public void workWithError() throws Exception ; + public WorkflowHandle workWithError() throws Exception ; } diff --git a/src/test/java/dev/dbos/transact/workflow/SimpleServiceImpl.java b/src/test/java/dev/dbos/transact/workflow/WorkflowImpl.java similarity index 64% rename from src/test/java/dev/dbos/transact/workflow/SimpleServiceImpl.java rename to src/test/java/dev/dbos/transact/workflow/WorkflowImpl.java index 92312a57..d259603e 100644 --- a/src/test/java/dev/dbos/transact/workflow/SimpleServiceImpl.java +++ b/src/test/java/dev/dbos/transact/workflow/WorkflowImpl.java @@ -3,16 +3,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SimpleServiceImpl implements SimpleService { +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; - Logger logger = LoggerFactory.getLogger(SimpleServiceImpl.class); +public class WorkflowImpl { + + Logger logger = LoggerFactory.getLogger(WorkflowImpl.class); public static int executionCount = 0 ; @Workflow(name = "workWithString") public String workWithString(String input) { logger.info("Executed workflow workWithString"); - SimpleServiceImpl.executionCount++; + WorkflowImpl.executionCount++; return "Processed: " + input ; } @@ -21,4 +24,7 @@ public void workWithError() throws Exception { throw new Exception("DBOS Test error") ; } + + + } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index db0ff237..bd20a9b3 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -1,8 +1,19 @@ + + + + %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n + + + + + + + + - - - + + \ No newline at end of file