Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions transact/src/main/java/dev/dbos/transact/DBOS.java
Original file line number Diff line number Diff line change
Expand Up @@ -752,4 +752,38 @@ public static Optional<ExternalState> getExternalState(
public static ExternalState upsertExternalState(ExternalState state) {
return executor("upsertExternalState").upsertExternalState(state);
}

/**
* Marks a breaking change within a workflow. Returns true for new workflows (i.e. workflow sthat
* reach this point in the workflow after the breaking change was created) and false for old
* worklows (i.e. workflows that reached this point in the workflow before the breaking change was
* created). The workflow should execute the new code if this method returns true, otherwise
* execute the old code. Note, patching must be enabled in DBOS configuration and this method must
* be called from within a workflow context.
*
* @param patchName the name of the patch to apply
* @return true for workflows started after the breaking change, false for workflows started
* before the breaking change
* @throws RuntimeException if patching is not enabled in DBOS config or if called outside a
* workflow
*/
public static boolean patch(String patchName) {
return executor("patch").patch(patchName);
}

/**
* Deprecates a previously applied breaking change patch within a workflow. Safely executes
* workflows containing the patch marker, but does not insert the patch marker into new workflows.
* Always returns true (boolean return gives deprecatePatch the same signature as {@link #patch}).
* Like {@link #patch}, patching must be enabled in DBOS configuration and this method must be
* called from within a workflow context.
*
* @param patchName the name of the patch to deprecate
* @return true (always returns true or throws)
* @throws RuntimeException if patching is not enabled in DBOS config or if called outside a
* workflow
*/
public static boolean deprecatePatch(String patchName) {
return executor("deprecatePatch").deprecatePatch(patchName);
}
}
100 changes: 82 additions & 18 deletions transact/src/main/java/dev/dbos/transact/config/DBOSConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,27 @@ public record DBOSConfig(
String conductorDomain,
String appVersion,
String executorId,
String databaseSchema) {
String databaseSchema,
boolean enablePatching) {

public DBOSConfig {
if (appName == null || appName.isEmpty()) {
throw new IllegalArgumentException("DBOSConfig.appName must not be null or empty");
}
if (conductorKey != null && conductorKey.isEmpty()) {
throw new IllegalArgumentException("DBOSConfig.conductorKey must not be empty if specified");
}
if (conductorDomain != null && conductorDomain.isEmpty()) {
throw new IllegalArgumentException(
"DBOSConfig.conductorDomain must not be empty if specified");
}
if (appVersion != null && appVersion.isEmpty()) {
throw new IllegalArgumentException("DBOSConfig.appVersion must not be empty if specified");
}
if (executorId != null && executorId.isEmpty()) {
throw new IllegalArgumentException("DBOSConfig.executorId must not be empty if specified");
}
}

public static DBOSConfig defaults(String appName) {
return new DBOSConfig(
Expand All @@ -28,7 +48,7 @@ public static DBOSConfig defaults(String appName) {
null, false, // adminServer
3001, // adminServerPort
true, // migrate
null, null, null, null, null);
null, null, null, null, null, false);
}

public static DBOSConfig defaultsFromEnv(String appName) {
Expand Down Expand Up @@ -58,7 +78,8 @@ public DBOSConfig withAppName(String v) {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withDatabaseUrl(String v) {
Expand All @@ -77,7 +98,8 @@ public DBOSConfig withDatabaseUrl(String v) {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withDbUser(String v) {
Expand All @@ -96,7 +118,8 @@ public DBOSConfig withDbUser(String v) {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withDbPassword(String v) {
Expand All @@ -115,7 +138,8 @@ public DBOSConfig withDbPassword(String v) {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withMaximumPoolSize(int v) {
Expand All @@ -134,7 +158,8 @@ public DBOSConfig withMaximumPoolSize(int v) {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withConnectionTimeout(int v) {
Expand All @@ -153,7 +178,8 @@ public DBOSConfig withConnectionTimeout(int v) {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withDataSource(HikariDataSource v) {
Expand All @@ -172,7 +198,8 @@ public DBOSConfig withDataSource(HikariDataSource v) {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withAdminServer(boolean v) {
Expand All @@ -191,7 +218,8 @@ public DBOSConfig withAdminServer(boolean v) {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withAdminServerPort(int v) {
Expand All @@ -210,7 +238,8 @@ public DBOSConfig withAdminServerPort(int v) {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withMigrate(boolean v) {
Expand All @@ -229,7 +258,8 @@ public DBOSConfig withMigrate(boolean v) {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withConductorKey(String v) {
Expand All @@ -248,7 +278,8 @@ public DBOSConfig withConductorKey(String v) {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withConductorDomain(String v) {
Expand All @@ -267,7 +298,8 @@ public DBOSConfig withConductorDomain(String v) {
v,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withAppVersion(String v) {
Expand All @@ -286,7 +318,8 @@ public DBOSConfig withAppVersion(String v) {
conductorDomain,
v,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withExecutorId(String v) {
Expand All @@ -305,7 +338,8 @@ public DBOSConfig withExecutorId(String v) {
conductorDomain,
appVersion,
v,
databaseSchema);
databaseSchema,
enablePatching);
}

public DBOSConfig withDatabaseSchema(String v) {
Expand All @@ -324,6 +358,35 @@ public DBOSConfig withDatabaseSchema(String v) {
conductorDomain,
appVersion,
executorId,
v,
enablePatching);
}

public DBOSConfig withEnablePatching() {
return this.withEnablePatching(true);
}

public DBOSConfig withDisablePatching() {
return this.withEnablePatching(false);
}

public DBOSConfig withEnablePatching(boolean v) {
return new DBOSConfig(
appName,
databaseUrl,
dbUser,
dbPassword,
maximumPoolSize,
connectionTimeout,
dataSource,
adminServer,
adminServerPort,
migrate,
conductorKey,
conductorDomain,
appVersion,
executorId,
databaseSchema,
v);
}

Expand All @@ -338,7 +401,7 @@ public DBOSConfig disableAdminServer() {
// Override toString to mask the DB password
@Override
public String toString() {
return "DBOSConfig[appName=%s, databaseUrl=%s, dbUser=%s, dbPassword=***, maximumPoolSize=%d, connectionTimeout=%d, dataSource=%s, adminServer=%s, adminServerPort=%d, migrate=%s, conductorKey=%s, conductorDomain=%s, appVersion=%s, executorId=%s, dbSchema=%s]"
return "DBOSConfig[appName=%s, databaseUrl=%s, dbUser=%s, dbPassword=***, maximumPoolSize=%d, connectionTimeout=%d, dataSource=%s, adminServer=%s, adminServerPort=%d, migrate=%s, conductorKey=%s, conductorDomain=%s, appVersion=%s, executorId=%s, dbSchema=%s, enablePatching=%s]"
.formatted(
appName,
databaseUrl,
Expand All @@ -353,6 +416,7 @@ public String toString() {
conductorDomain,
appVersion,
executorId,
databaseSchema);
databaseSchema,
enablePatching);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ public WorkflowInitResult initWorkflowStatus(
boolean isRecoveryRequest,
boolean isDequeuedRequest) {

// This ID will be used to tell if we are the first writer of the record, or if there is an
// existing one
// Note that it is generated outside of the DB retry loop, in case commit acks get lost and we
// do not know if we committed or not
// This ID will be used to tell if we are the first writer of the record, or if
// there is an existing one.
// Note that it is generated outside of the DB retry loop, in case commit acks
// get lost and we do not know if we committed or not
String ownerXid = UUID.randomUUID().toString();
return DbRetry.call(
() -> {
Expand Down Expand Up @@ -442,6 +442,58 @@ SELECT function_name, COUNT(*) as count
});
}

private String getCheckpointName(Connection conn, String workflowId, int functionId)
throws SQLException {
var sql =
"""
SELECT function_name
FROM %s.operation_outputs
WHERE workflow_uuid = ? AND function_id = ?
"""
.formatted(this.schema);

try (var ps = conn.prepareStatement(sql)) {
ps.setString(1, workflowId);
ps.setInt(2, functionId);
try (var rs = ps.executeQuery()) {
if (rs.next()) {
return rs.getString("function_name");
} else {
return null;
}
}
}
}

public boolean patch(String workflowId, int functionId, String patchName) {
Objects.requireNonNull(patchName, "patchName cannot be null");
return DbRetry.call(
() -> {
try (Connection conn = dataSource.getConnection()) {
var checkpointName = getCheckpointName(conn, workflowId, functionId);
if (checkpointName == null) {
var output = new StepResult(workflowId, functionId, patchName);
StepsDAO.recordStepResultTxn(
output, System.currentTimeMillis(), null, conn, this.schema);
return true;
} else {
return patchName.equals(checkpointName);
}
}
});
}

public boolean deprecatePatch(String workflowId, int functionId, String patchName) {
Objects.requireNonNull(patchName, "patchName cannot be null");
return DbRetry.call(
() -> {
try (Connection conn = dataSource.getConnection()) {
var checkpointName = getCheckpointName(conn, workflowId, functionId);
return patchName.equals(checkpointName);
}
});
}

// package public helper for test purposes
Connection getSysDBConnection() throws SQLException {
return dataSource.getConnection();
Expand Down
Loading