Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/main/java/dev/dbos/transact/DBOS.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dev.dbos.transact.config.DBOSConfig;
import dev.dbos.transact.database.SystemDatabase;
import dev.dbos.transact.execution.DBOSExecutor;
import dev.dbos.transact.interceptor.AsyncInvocationHandler;
import dev.dbos.transact.interceptor.TransactInvocationHandler;
import dev.dbos.transact.migrations.DatabaseMigrator;
import org.slf4j.Logger;
Expand Down Expand Up @@ -58,30 +59,29 @@ public <T> WorkflowBuilder<T> Workflow() {
// inner builder class for workflows
public static class WorkflowBuilder<T> {
private Class<T> interfaceClass;
private T implementation;

private Object implementation;

public WorkflowBuilder<T> interfaceClass(Class<T> iface) {
this.interfaceClass = iface;
return this;
}

public WorkflowBuilder<T> implementation(T impl) {
public WorkflowBuilder<T> implementation(Object impl) {
this.implementation = impl;
return this;
}


public T build() {
if (interfaceClass == null || implementation == null) {
throw new IllegalStateException("Interface and implementation must be set");
}

return TransactInvocationHandler.createProxy(
interfaceClass,
implementation,
DBOS.getInstance().dbosExecutor
return AsyncInvocationHandler.createProxy(
interfaceClass,
implementation,
DBOS.getInstance().dbosExecutor
);

}
}

Expand All @@ -96,6 +96,7 @@ public void launch() {

public void shutdown() {
dbosExecutor.shutdown();
instance = null ;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,9 @@ public static void clear() {
contextHolder.remove();
}

public static void set(DBOSContext context) {
contextHolder.set(context);
}

}

31 changes: 22 additions & 9 deletions src/main/java/dev/dbos/transact/database/SystemDatabase.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package dev.dbos.transact.database;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import dev.dbos.transact.Constants;
import dev.dbos.transact.config.DBOSConfig;
import dev.dbos.transact.exceptions.DBOSDeadLetterQueueException;
import dev.dbos.transact.exceptions.DBOSException;
import dev.dbos.transact.exceptions.DBOSQueueDuplicatedException;
import dev.dbos.transact.exceptions.DBOSWorkflowConflictException;
import dev.dbos.transact.exceptions.*;
import dev.dbos.transact.json.JSONUtil;
import dev.dbos.transact.workflow.GetWorkflowsInput;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.WorkflowState;
import dev.dbos.transact.workflow.WorkflowStatus;
import dev.dbos.transact.workflow.internal.InsertWorkflowResult;
Expand Down Expand Up @@ -428,7 +424,23 @@ public void updateWorkflowStatus(
}
}

public List<WorkflowStatus> getWorkflows(GetWorkflowsInput input) throws SQLException {
public WorkflowStatus getWorkflow(String workflowId) {

try {
ListWorkflowsInput input = new ListWorkflowsInput();
input.setWorkflowIDs(Arrays.asList(workflowId));
List<WorkflowStatus> output = listWorkflows(input) ;
if (output.size() > 0) {
return output.get(0);
}
} catch (SQLException e) {
logger.error("Error retrieving workflow for "+workflowId, e);
}

throw new NonExistentWorkflowException(workflowId) ;
}

public List<WorkflowStatus> listWorkflows(ListWorkflowsInput input) throws SQLException {

List<WorkflowStatus> workflows = new ArrayList<>();

Expand Down Expand Up @@ -560,12 +572,13 @@ public List<WorkflowStatus> getWorkflows(GetWorkflowsInput input) throws SQLExce
String serializedError = rs.getString("error");

if (serializedInput != null) {
info.setInput(JSONUtil.fromJson(serializedInput, Object[].class));
info.setInput((Object[])JSONUtil.deserialize((serializedInput)) );
}

if (serializedOutput != null) {
info.setOutput(JSONUtil.fromJson(serializedOutput, Object.class));
info.setOutput(JSONUtil.deserialize(serializedOutput));
}

info.setError(serializedError);

info.setWorkflowDeadlineEpochMs(rs.getObject("workflow_deadline_epoch_ms", Long.class));
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/dev/dbos/transact/exceptions/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ public enum ErrorCode {
UNEXPECTED(1),
WORKFLOW_CONFLICT(2),
QUEUE_DUPLICATED(3) ,
DEAD_LETTER_QUEUE(4) ;
DEAD_LETTER_QUEUE(4) ,
NONEXISTENT_WORKFLOW(5) ;

private int code ;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package dev.dbos.transact.exceptions;

public class NonExistentWorkflowException extends DBOSException{
private String workflowId ;
public NonExistentWorkflowException(String workflowId) {
super(ErrorCode.NONEXISTENT_WORKFLOW.getCode(),
String.format("Workflow does not exist %s", workflowId));
this.workflowId = workflowId;

}

public String getWorkflowId() {
return workflowId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package dev.dbos.transact.execution;

import dev.dbos.transact.context.DBOSContext;
import dev.dbos.transact.context.DBOSContextHolder;

import java.util.concurrent.Callable;

public class ContextAwareCallable<T> implements Callable<T> {
private final Callable<T> task;
private final DBOSContext capturedContext;

public ContextAwareCallable(Callable<T> task) {
this.task = task;
this.capturedContext = DBOSContextHolder.get();
}

@Override
public T call() throws Exception {
DBOSContextHolder.set(capturedContext);
try {
return task.call();
} finally {
DBOSContextHolder.clear();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package dev.dbos.transact.execution;

import dev.dbos.transact.context.DBOSContext;
import dev.dbos.transact.context.DBOSContextHolder;

public class ContextAwareRunnable implements Runnable {
private final Runnable task;
private final DBOSContext capturedContext;

public ContextAwareRunnable(Runnable task) {
this.task = task;
this.capturedContext = DBOSContextHolder.get();
}

@Override
public void run() {
DBOSContextHolder.set(capturedContext);
try {
task.run();
} finally {
DBOSContextHolder.clear();
}
}
}
105 changes: 82 additions & 23 deletions src/main/java/dev/dbos/transact/execution/DBOSExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,36 @@
import dev.dbos.transact.database.SystemDatabase;
import dev.dbos.transact.exceptions.DBOSException;
import dev.dbos.transact.json.JSONUtil;
import dev.dbos.transact.workflow.WorkflowHandle;
import dev.dbos.transact.workflow.WorkflowState;
import dev.dbos.transact.workflow.WorkflowStatus;
import dev.dbos.transact.workflow.internal.WorkflowHandleFuture;
import dev.dbos.transact.workflow.internal.WorkflowStatusInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static dev.dbos.transact.exceptions.ErrorCode.UNEXPECTED;

public class DBOSExecutor {

private DBOSConfig config;
private SystemDatabase systemDatabase;
private ExecutorService executorService ;
Logger logger = LoggerFactory.getLogger(DBOSExecutor.class);

public DBOSExecutor(DBOSConfig config, SystemDatabase sysdb) {
this.config = config;
this.systemDatabase = sysdb ;
this.executorService = Executors.newCachedThreadPool();

}

Expand All @@ -38,18 +47,12 @@ public SystemDatabase.WorkflowInitResult preInvokeWorkflow(String workflowName,
String interfaceName,
String className,
String methodName,
Object[] inputs) {
Object[] inputs,
String workflowId) {

logger.info("In preInvokeWorkflow") ;

DBOSContext ctx = DBOSContextHolder.get();
String workflowId = ctx.getWorkflowId() ;

if (workflowId == null) {
workflowId = UUID.randomUUID().toString();
}

String inputString = JSONUtil.toJson(inputs);
String inputString = JSONUtil.serialize(inputs) ;

WorkflowStatusInternal workflowStatusInternal =
new WorkflowStatusInternal(workflowId,
Expand Down Expand Up @@ -79,6 +82,7 @@ public SystemDatabase.WorkflowInitResult preInvokeWorkflow(String workflowName,
try {
initResult = systemDatabase.initWorkflowStatus(workflowStatusInternal, 3);
} catch (SQLException e) {
logger.error("Error inserting into workflow_status", e);
throw new DBOSException(UNEXPECTED.getCode(), e.getMessage(),e) ;
}

Expand All @@ -87,12 +91,9 @@ public SystemDatabase.WorkflowInitResult preInvokeWorkflow(String workflowName,

public void postInvokeWorkflow(String workflowId, Object result) {

String resultString = JSONUtil.toJson(result);


String resultString = JSONUtil.serialize(result);
systemDatabase.recordWorkflowOutput(workflowId, resultString);

logger.info("In post Invoke workflow with result") ;
}

public void postInvokeWorkflow(String workflowId, Throwable error) {
Expand All @@ -101,27 +102,41 @@ public void postInvokeWorkflow(String workflowId, Throwable error) {

systemDatabase.recordWorkflowError(workflowId, errorString);

logger.info("In post Invoke workflow with error") ;
}

public <T> T runWorkflow(String workflowName,
String targetClassName,
String methodName,
Object[] args,
DBOSFunction<T> function) throws Throwable {
DBOSFunction<T> function,
String workflowId) throws Throwable {

String wfid = workflowId ;

SystemDatabase.WorkflowInitResult initResult = preInvokeWorkflow(workflowName, null, targetClassName, methodName, args);
if (initResult.getStatus().equals(WorkflowState.SUCCESS.name())) {
return (T) systemDatabase.getWorkflowResult(initResult.getWorkflowId()).get();
} else if (initResult.getStatus().equals(WorkflowState.ERROR.name())) {
logger.warn("Idempotency check not impl for error");
} else if (initResult.getStatus().equals(WorkflowState.CANCELLED.name())) {
logger.warn("Idempotency check not impl for cancelled");
}
if (wfid == null) {
DBOSContext ctx = DBOSContextHolder.get();
wfid = ctx.getWorkflowId() ;

if (wfid == null) {
wfid = UUID.randomUUID().toString();
}
}

SystemDatabase.WorkflowInitResult initResult = null;
try {

initResult = preInvokeWorkflow(workflowName, null,
targetClassName, methodName, args, wfid);

if (initResult.getStatus().equals(WorkflowState.SUCCESS.name())) {
return (T) systemDatabase.getWorkflowResult(initResult.getWorkflowId()).get();
} else if (initResult.getStatus().equals(WorkflowState.ERROR.name())) {
logger.warn("Idempotency check not impl for error");
} else if (initResult.getStatus().equals(WorkflowState.CANCELLED.name())) {
logger.warn("Idempotency check not impl for cancelled");
}

logger.info("Before executing workflow") ;
T result = function.execute(); // invoke the lambda
logger.info("After: Workflow completed successfully");
postInvokeWorkflow(initResult.getWorkflowId(), result);
Expand All @@ -135,6 +150,50 @@ public <T> T runWorkflow(String workflowName,
}
}

public <T> WorkflowHandle<T> submitWorkflow(String workflowName,
String targetClassName,
String methodName,
Object[] args,
DBOSFunction<T> function) throws Throwable {

DBOSContext ctx = DBOSContextHolder.get();
String workflowId = ctx.getWorkflowId() ;

if (workflowId == null) {
workflowId = UUID.randomUUID().toString();
}

final String wfId = workflowId ;

Callable<T> task = () -> {
T result = null ;
logger.info("Callable executing the workflow.. " + wfId);
try {

result = runWorkflow(workflowName,
targetClassName,
methodName,
args,
function,
wfId);


} catch (Throwable e) {
Throwable actual = (e instanceof InvocationTargetException)
? ((InvocationTargetException) e).getTargetException()
: e;

logger.error("Error executing workflow", actual);

}

return result ;
};

Future<T> future = executorService.submit(task);

return new WorkflowHandleFuture<T>(workflowId, future, systemDatabase);

}

}
Loading
Loading