Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions src/main/java/dev/dbos/transact/DBOS.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dev.dbos.transact.config.DBOSConfig;
import dev.dbos.transact.database.SystemDatabase;
import dev.dbos.transact.execution.DBOSExecutor;
import dev.dbos.transact.interceptor.AsyncInvocationHandler;
import dev.dbos.transact.interceptor.TransactInvocationHandler;
import dev.dbos.transact.migrations.DatabaseMigrator;
import org.slf4j.Logger;
Expand Down Expand Up @@ -58,30 +59,43 @@ public <T> WorkflowBuilder<T> Workflow() {
// inner builder class for workflows
public static class WorkflowBuilder<T> {
private Class<T> interfaceClass;
private T implementation;

private Object implementation;
private boolean async ;

public WorkflowBuilder<T> interfaceClass(Class<T> iface) {
this.interfaceClass = iface;
return this;
}

public WorkflowBuilder<T> implementation(T impl) {
public WorkflowBuilder<T> implementation(Object impl) {
this.implementation = impl;
return this;
}

public WorkflowBuilder<T> async() {
this.async = true ;
return this;
}

public T build() {
if (interfaceClass == null || implementation == null) {
throw new IllegalStateException("Interface and implementation must be set");
}

if (async) {
return AsyncInvocationHandler.createProxy(
interfaceClass,
implementation,
DBOS.getInstance().dbosExecutor
);
}

return TransactInvocationHandler.createProxy(
interfaceClass,
implementation,
DBOS.getInstance().dbosExecutor
);
) ;

}
}

Expand All @@ -96,6 +110,7 @@ public void launch() {

public void shutdown() {
dbosExecutor.shutdown();
instance = null ;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,9 @@ public static void clear() {
contextHolder.remove();
}

public static void set(DBOSContext context) {
contextHolder.set(context);
}

}

106 changes: 84 additions & 22 deletions src/main/java/dev/dbos/transact/database/SystemDatabase.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package dev.dbos.transact.database;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import dev.dbos.transact.Constants;
import dev.dbos.transact.config.DBOSConfig;
import dev.dbos.transact.exceptions.DBOSDeadLetterQueueException;
import dev.dbos.transact.exceptions.DBOSException;
import dev.dbos.transact.exceptions.DBOSQueueDuplicatedException;
import dev.dbos.transact.exceptions.DBOSWorkflowConflictException;
import dev.dbos.transact.exceptions.*;
import dev.dbos.transact.json.JSONUtil;
import dev.dbos.transact.workflow.GetWorkflowsInput;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.WorkflowState;
import dev.dbos.transact.workflow.WorkflowStatus;
import dev.dbos.transact.workflow.internal.InsertWorkflowResult;
Expand Down Expand Up @@ -428,7 +424,23 @@ public void updateWorkflowStatus(
}
}

public List<WorkflowStatus> getWorkflows(GetWorkflowsInput input) throws SQLException {
public WorkflowStatus getWorkflow(String workflowId) {

try {
ListWorkflowsInput input = new ListWorkflowsInput();
input.setWorkflowIDs(Arrays.asList(workflowId));
List<WorkflowStatus> output = listWorkflows(input) ;
if (output.size() > 0) {
return output.get(0);
}
} catch (SQLException e) {
logger.error("Error retrieving workflow for "+workflowId, e);
}

throw new NonExistentWorkflowException(workflowId) ;
}

public List<WorkflowStatus> listWorkflows(ListWorkflowsInput input) throws SQLException {

List<WorkflowStatus> workflows = new ArrayList<>();

Expand Down Expand Up @@ -560,12 +572,13 @@ public List<WorkflowStatus> getWorkflows(GetWorkflowsInput input) throws SQLExce
String serializedError = rs.getString("error");

if (serializedInput != null) {
info.setInput(JSONUtil.fromJson(serializedInput, Object[].class));
info.setInput((Object[])JSONUtil.deserialize((serializedInput)) );
}

if (serializedOutput != null) {
info.setOutput(JSONUtil.fromJson(serializedOutput, Object.class));
info.setOutput(JSONUtil.deserialize(serializedOutput));
}

info.setError(serializedError);

info.setWorkflowDeadlineEpochMs(rs.getObject("workflow_deadline_epoch_ms", Long.class));
Expand All @@ -580,27 +593,76 @@ public List<WorkflowStatus> getWorkflows(GetWorkflowsInput input) throws SQLExce
return workflows ;
}

/**
* Helper method for tests
* Should be moved to TestUtils
*/
public void deleteWorkflowsTestHelper() throws SQLException{
/**
* Helper method for tests
* Should be moved to TestUtils
*/
public void deleteWorkflowsTestHelper() throws SQLException{

String sql = "delete from dbos.workflow_status";

try (Connection connection = dataSource.getConnection();
PreparedStatement pstmt = connection.prepareStatement(sql)) {

String sql = "delete from dbos.workflow_status";
int rowsAffected = pstmt.executeUpdate();
logger.info("Cleaned up: Deleted " + rowsAffected + " rows from dbos.workflow_status");

} catch (SQLException e) {
logger.error("Error deleting workflows in test helper: " + e.getMessage());
throw e;
}

}

public Object awaitWorkflowResult(String workflowId) throws Exception {

final String sql = "SELECT status, output, error "+
"FROM dbos.workflow_status " +
"WHERE workflow_uuid = ?" ;

while (true) {

try (Connection connection = dataSource.getConnection();
PreparedStatement pstmt = connection.prepareStatement(sql)) {
PreparedStatement stmt = connection.prepareStatement(sql)) {

int rowsAffected = pstmt.executeUpdate();
logger.info("Cleaned up: Deleted " + rowsAffected + " rows from dbos.workflow_status");
stmt.setString(1, workflowId);

} catch (SQLException e) {
logger.error("Error deleting workflows in test helper: " + e.getMessage());
throw e;
}
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
String status = rs.getString("status");

switch (WorkflowState.valueOf(status.toUpperCase())) {
case SUCCESS:
String output = rs.getString("output");
return output != null ? JSONUtil.deserialize(output) : null;
case ERROR:
String error = rs.getString("error");
// TODO fixException exception = serialization.deserializeException(error);
throw new Exception(error);

case CANCELLED:
throw new AwaitedWorkflowCancelledException(workflowId);

default:
// Status is PENDING or other - continue polling
break;
}
}
// Row not found - workflow hasn't appeared yet, continue polling
}
} catch (SQLException e) {
logger.error("Database error while polling workflow " + workflowId + ": " + e.getMessage());
}

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Workflow polling interrupted for " + workflowId, e);
}


}
}

private void createDataSource(String dbName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dev.dbos.transact.exceptions;

public class AwaitedWorkflowCancelledException extends DBOSException {
public AwaitedWorkflowCancelledException(String workflowId) {
super(ErrorCode.WORKFLOW_CONFLICT.getCode(),
String.format("Awaited workflow %s was cancelled.", workflowId));
}
}
4 changes: 3 additions & 1 deletion src/main/java/dev/dbos/transact/exceptions/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ public enum ErrorCode {
UNEXPECTED(1),
WORKFLOW_CONFLICT(2),
QUEUE_DUPLICATED(3) ,
DEAD_LETTER_QUEUE(4) ;
DEAD_LETTER_QUEUE(4) ,
NONEXISTENT_WORKFLOW(5) ,
AWAITED_WORKFLOW_CANCEL(6);

private int code ;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package dev.dbos.transact.exceptions;

public class NonExistentWorkflowException extends DBOSException{
private String workflowId ;
public NonExistentWorkflowException(String workflowId) {
super(ErrorCode.NONEXISTENT_WORKFLOW.getCode(),
String.format("Workflow does not exist %s", workflowId));
this.workflowId = workflowId;

}

public String getWorkflowId() {
return workflowId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package dev.dbos.transact.execution;

import dev.dbos.transact.context.DBOSContext;
import dev.dbos.transact.context.DBOSContextHolder;

import java.util.concurrent.Callable;

public class ContextAwareCallable<T> implements Callable<T> {
private final Callable<T> task;
private final DBOSContext capturedContext;

public ContextAwareCallable(Callable<T> task) {
this.task = task;
this.capturedContext = DBOSContextHolder.get();
}

@Override
public T call() throws Exception {
DBOSContextHolder.set(capturedContext);
try {
return task.call();
} finally {
DBOSContextHolder.clear();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package dev.dbos.transact.execution;

import dev.dbos.transact.context.DBOSContext;
import dev.dbos.transact.context.DBOSContextHolder;

public class ContextAwareRunnable implements Runnable {
private final Runnable task;
private final DBOSContext capturedContext;

public ContextAwareRunnable(Runnable task) {
this.task = task;
this.capturedContext = DBOSContextHolder.get();
}

@Override
public void run() {
DBOSContextHolder.set(capturedContext);
try {
task.run();
} finally {
DBOSContextHolder.clear();
}
}
}
Loading
Loading