diff --git a/transact/src/main/java/dev/dbos/transact/DBOS.java b/transact/src/main/java/dev/dbos/transact/DBOS.java index 53d86cf3..1865d0a5 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOS.java +++ b/transact/src/main/java/dev/dbos/transact/DBOS.java @@ -752,4 +752,38 @@ public static Optional getExternalState( public static ExternalState upsertExternalState(ExternalState state) { return executor("upsertExternalState").upsertExternalState(state); } + + /** + * Marks a breaking change within a workflow. Returns true for new workflows (i.e. workflow sthat + * reach this point in the workflow after the breaking change was created) and false for old + * worklows (i.e. workflows that reached this point in the workflow before the breaking change was + * created). The workflow should execute the new code if this method returns true, otherwise + * execute the old code. Note, patching must be enabled in DBOS configuration and this method must + * be called from within a workflow context. + * + * @param patchName the name of the patch to apply + * @return true for workflows started after the breaking change, false for workflows started + * before the breaking change + * @throws RuntimeException if patching is not enabled in DBOS config or if called outside a + * workflow + */ + public static boolean patch(String patchName) { + return executor("patch").patch(patchName); + } + + /** + * Deprecates a previously applied breaking change patch within a workflow. Safely executes + * workflows containing the patch marker, but does not insert the patch marker into new workflows. + * Always returns true (boolean return gives deprecatePatch the same signature as {@link #patch}). + * Like {@link #patch}, patching must be enabled in DBOS configuration and this method must be + * called from within a workflow context. + * + * @param patchName the name of the patch to deprecate + * @return true (always returns true or throws) + * @throws RuntimeException if patching is not enabled in DBOS config or if called outside a + * workflow + */ + public static boolean deprecatePatch(String patchName) { + return executor("deprecatePatch").deprecatePatch(patchName); + } } diff --git a/transact/src/main/java/dev/dbos/transact/config/DBOSConfig.java b/transact/src/main/java/dev/dbos/transact/config/DBOSConfig.java index ba4614f0..311fb428 100644 --- a/transact/src/main/java/dev/dbos/transact/config/DBOSConfig.java +++ b/transact/src/main/java/dev/dbos/transact/config/DBOSConfig.java @@ -19,7 +19,27 @@ public record DBOSConfig( String conductorDomain, String appVersion, String executorId, - String databaseSchema) { + String databaseSchema, + boolean enablePatching) { + + public DBOSConfig { + if (appName == null || appName.isEmpty()) { + throw new IllegalArgumentException("DBOSConfig.appName must not be null or empty"); + } + if (conductorKey != null && conductorKey.isEmpty()) { + throw new IllegalArgumentException("DBOSConfig.conductorKey must not be empty if specified"); + } + if (conductorDomain != null && conductorDomain.isEmpty()) { + throw new IllegalArgumentException( + "DBOSConfig.conductorDomain must not be empty if specified"); + } + if (appVersion != null && appVersion.isEmpty()) { + throw new IllegalArgumentException("DBOSConfig.appVersion must not be empty if specified"); + } + if (executorId != null && executorId.isEmpty()) { + throw new IllegalArgumentException("DBOSConfig.executorId must not be empty if specified"); + } + } public static DBOSConfig defaults(String appName) { return new DBOSConfig( @@ -28,7 +48,7 @@ public static DBOSConfig defaults(String appName) { null, false, // adminServer 3001, // adminServerPort true, // migrate - null, null, null, null, null); + null, null, null, null, null, false); } public static DBOSConfig defaultsFromEnv(String appName) { @@ -58,7 +78,8 @@ public DBOSConfig withAppName(String v) { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withDatabaseUrl(String v) { @@ -77,7 +98,8 @@ public DBOSConfig withDatabaseUrl(String v) { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withDbUser(String v) { @@ -96,7 +118,8 @@ public DBOSConfig withDbUser(String v) { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withDbPassword(String v) { @@ -115,7 +138,8 @@ public DBOSConfig withDbPassword(String v) { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withMaximumPoolSize(int v) { @@ -134,7 +158,8 @@ public DBOSConfig withMaximumPoolSize(int v) { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withConnectionTimeout(int v) { @@ -153,7 +178,8 @@ public DBOSConfig withConnectionTimeout(int v) { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withDataSource(HikariDataSource v) { @@ -172,7 +198,8 @@ public DBOSConfig withDataSource(HikariDataSource v) { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withAdminServer(boolean v) { @@ -191,7 +218,8 @@ public DBOSConfig withAdminServer(boolean v) { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withAdminServerPort(int v) { @@ -210,7 +238,8 @@ public DBOSConfig withAdminServerPort(int v) { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withMigrate(boolean v) { @@ -229,7 +258,8 @@ public DBOSConfig withMigrate(boolean v) { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withConductorKey(String v) { @@ -248,7 +278,8 @@ public DBOSConfig withConductorKey(String v) { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withConductorDomain(String v) { @@ -267,7 +298,8 @@ public DBOSConfig withConductorDomain(String v) { v, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withAppVersion(String v) { @@ -286,7 +318,8 @@ public DBOSConfig withAppVersion(String v) { conductorDomain, v, executorId, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withExecutorId(String v) { @@ -305,7 +338,8 @@ public DBOSConfig withExecutorId(String v) { conductorDomain, appVersion, v, - databaseSchema); + databaseSchema, + enablePatching); } public DBOSConfig withDatabaseSchema(String v) { @@ -324,6 +358,35 @@ public DBOSConfig withDatabaseSchema(String v) { conductorDomain, appVersion, executorId, + v, + enablePatching); + } + + public DBOSConfig withEnablePatching() { + return this.withEnablePatching(true); + } + + public DBOSConfig withDisablePatching() { + return this.withEnablePatching(false); + } + + public DBOSConfig withEnablePatching(boolean v) { + return new DBOSConfig( + appName, + databaseUrl, + dbUser, + dbPassword, + maximumPoolSize, + connectionTimeout, + dataSource, + adminServer, + adminServerPort, + migrate, + conductorKey, + conductorDomain, + appVersion, + executorId, + databaseSchema, v); } @@ -338,7 +401,7 @@ public DBOSConfig disableAdminServer() { // Override toString to mask the DB password @Override public String toString() { - return "DBOSConfig[appName=%s, databaseUrl=%s, dbUser=%s, dbPassword=***, maximumPoolSize=%d, connectionTimeout=%d, dataSource=%s, adminServer=%s, adminServerPort=%d, migrate=%s, conductorKey=%s, conductorDomain=%s, appVersion=%s, executorId=%s, dbSchema=%s]" + return "DBOSConfig[appName=%s, databaseUrl=%s, dbUser=%s, dbPassword=***, maximumPoolSize=%d, connectionTimeout=%d, dataSource=%s, adminServer=%s, adminServerPort=%d, migrate=%s, conductorKey=%s, conductorDomain=%s, appVersion=%s, executorId=%s, dbSchema=%s, enablePatching=%s]" .formatted( appName, databaseUrl, @@ -353,6 +416,7 @@ public String toString() { conductorDomain, appVersion, executorId, - databaseSchema); + databaseSchema, + enablePatching); } } diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index 9e7c6141..c1eb0a8e 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -100,10 +100,10 @@ public WorkflowInitResult initWorkflowStatus( boolean isRecoveryRequest, boolean isDequeuedRequest) { - // This ID will be used to tell if we are the first writer of the record, or if there is an - // existing one - // Note that it is generated outside of the DB retry loop, in case commit acks get lost and we - // do not know if we committed or not + // This ID will be used to tell if we are the first writer of the record, or if + // there is an existing one. + // Note that it is generated outside of the DB retry loop, in case commit acks + // get lost and we do not know if we committed or not String ownerXid = UUID.randomUUID().toString(); return DbRetry.call( () -> { @@ -442,6 +442,58 @@ SELECT function_name, COUNT(*) as count }); } + private String getCheckpointName(Connection conn, String workflowId, int functionId) + throws SQLException { + var sql = + """ + SELECT function_name + FROM %s.operation_outputs + WHERE workflow_uuid = ? AND function_id = ? + """ + .formatted(this.schema); + + try (var ps = conn.prepareStatement(sql)) { + ps.setString(1, workflowId); + ps.setInt(2, functionId); + try (var rs = ps.executeQuery()) { + if (rs.next()) { + return rs.getString("function_name"); + } else { + return null; + } + } + } + } + + public boolean patch(String workflowId, int functionId, String patchName) { + Objects.requireNonNull(patchName, "patchName cannot be null"); + return DbRetry.call( + () -> { + try (Connection conn = dataSource.getConnection()) { + var checkpointName = getCheckpointName(conn, workflowId, functionId); + if (checkpointName == null) { + var output = new StepResult(workflowId, functionId, patchName); + StepsDAO.recordStepResultTxn( + output, System.currentTimeMillis(), null, conn, this.schema); + return true; + } else { + return patchName.equals(checkpointName); + } + } + }); + } + + public boolean deprecatePatch(String workflowId, int functionId, String patchName) { + Objects.requireNonNull(patchName, "patchName cannot be null"); + return DbRetry.call( + () -> { + try (Connection conn = dataSource.getConnection()) { + var checkpointName = getCheckpointName(conn, workflowId, functionId); + return patchName.equals(checkpointName); + } + }); + } + // package public helper for test purposes Connection getSysDBConnection() throws SQLException { return dataSource.getConnection(); diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index 1e979b0c..a116e953 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -65,6 +65,7 @@ public class DBOSExecutor implements AutoCloseable { private final DBOSConfig config; + private boolean dbosCloud; private String appVersion; private String executorId; @@ -86,6 +87,28 @@ public class DBOSExecutor implements AutoCloseable { public DBOSExecutor(DBOSConfig config) { this.config = config; + + if (config.conductorKey() != null && config.executorId() != null) { + throw new IllegalArgumentException( + "DBOSConfig.executorId cannot be specified when using Conductor"); + } + + appVersion = Objects.requireNonNullElse(System.getenv("DBOS__APPVERSION"), ""); + executorId = Objects.requireNonNullElse(System.getenv("DBOS__VMID"), "local"); + dbosCloud = Objects.requireNonNullElse(System.getenv("DBOS__CLOUD"), "").equals("true"); + + if (!dbosCloud) { + if (config.enablePatching()) { + appVersion = "PATCHING_ENABLED"; + } + + if (config.appVersion() != null) { + appVersion = config.appVersion(); + } + if (config.executorId() != null) { + executorId = config.executorId(); + } + } } public void start( @@ -101,19 +124,6 @@ public void start( this.queues = Collections.unmodifiableList(queues); this.listeners = listenerSet; - this.executorId = System.getenv("DBOS__VMID"); - if (this.executorId == null || this.executorId.isEmpty()) { - this.executorId = config.executorId(); - } - if (this.executorId == null || this.executorId.isEmpty()) { - this.executorId = config.conductorKey() == null ? "local" : UUID.randomUUID().toString(); - } - - this.appVersion = System.getenv("DBOS__APPVERSION"); - if (this.appVersion == null || this.appVersion.isEmpty()) { - this.appVersion = config.appVersion(); - } - if (this.appVersion == null || this.appVersion.isEmpty()) { List> registeredClasses = workflowMap.values().stream() @@ -122,6 +132,10 @@ public void start( this.appVersion = AppVersionComputer.computeAppVersion(registeredClasses); } + if (config.conductorKey() != null) { + this.executorId = UUID.randomUUID().toString(); + } + logger.info("System Database: {}", this.config.databaseUrl()); logger.info("System Database User name: {}", this.config.dbUser()); logger.info("Executor ID: {}", this.executorId); @@ -738,6 +752,46 @@ public T getResult(String workflowId) throws E { workflowId); } + public boolean patch(String patchName) { + if (!config.enablePatching()) { + throw new IllegalStateException("Patching must be enabled in DBOS Config"); + } + + DBOSContext ctx = DBOSContextHolder.get(); + if (ctx == null || !ctx.isInWorkflow()) { + throw new IllegalStateException("DBOS.patch must be called from a workflow"); + } + + var workflowId = ctx.getWorkflowId(); + var functionId = ctx.getCurrentFunctionId(); + patchName = "DBOS.patch-%s".formatted(patchName); + var patched = systemDatabase.patch(workflowId, functionId, patchName); + if (patched) { + ctx.getAndIncrementFunctionId(); + } + return patched; + } + + public boolean deprecatePatch(String patchName) { + if (!config.enablePatching()) { + throw new IllegalStateException("Patching must be enabled in DBOS Config"); + } + + DBOSContext ctx = DBOSContextHolder.get(); + if (ctx == null || !ctx.isInWorkflow()) { + throw new IllegalStateException("DBOS.deprecatePatch must be called from a workflow"); + } + + var workflowId = ctx.getWorkflowId(); + var functionId = ctx.getCurrentFunctionId(); + patchName = "DBOS.patch-%s".formatted(patchName); + var patchExists = systemDatabase.deprecatePatch(workflowId, functionId, patchName); + if (patchExists) { + ctx.getAndIncrementFunctionId(); + } + return true; + } + private static Invocation captureInvocation( ThrowingSupplier supplier) { AtomicReference capturedInvocation = new AtomicReference<>(); diff --git a/transact/src/main/java/dev/dbos/transact/internal/AppVersionComputer.java b/transact/src/main/java/dev/dbos/transact/internal/AppVersionComputer.java index e6fcc802..39dc02a7 100644 --- a/transact/src/main/java/dev/dbos/transact/internal/AppVersionComputer.java +++ b/transact/src/main/java/dev/dbos/transact/internal/AppVersionComputer.java @@ -1,5 +1,7 @@ package dev.dbos.transact.internal; +import dev.dbos.transact.DBOS; + import java.io.InputStream; import java.security.MessageDigest; import java.util.*; @@ -30,11 +32,10 @@ public static String computeAppVersion(List> registeredClasses) { hasher.update((clazz.getName() + ":" + classHash).getBytes("UTF-8")); } - // Add DBOS version - // hasher.update(("dbos:" + getDbosVersion()).getBytes("UTF-8")); + // Different DBOS versions should produce different app versions + hasher.update(DBOS.version().getBytes("UTF-8")); return bytesToHex(hasher.digest()); - } catch (Exception e) { logger.warn("Failed to compute simplified app version", e); return getFallbackVersion(); diff --git a/transact/src/test/java/dev/dbos/transact/config/ConfigTest.java b/transact/src/test/java/dev/dbos/transact/config/ConfigTest.java index 851aac76..196cefb4 100644 --- a/transact/src/test/java/dev/dbos/transact/config/ConfigTest.java +++ b/transact/src/test/java/dev/dbos/transact/config/ConfigTest.java @@ -4,19 +4,23 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import dev.dbos.transact.Constants; import dev.dbos.transact.DBOS; import dev.dbos.transact.DBOSTestAccess; import dev.dbos.transact.database.DBTestAccess; +import dev.dbos.transact.internal.AppVersionComputer; import dev.dbos.transact.invocation.HawkService; import dev.dbos.transact.invocation.HawkServiceImpl; import dev.dbos.transact.utils.DBUtils; import java.time.LocalDate; import java.time.format.DateTimeFormatter; +import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; @@ -33,7 +37,10 @@ public class ConfigTest { @SystemStub private EnvironmentVariables envVars = new EnvironmentVariables(); @Test - public void setExecutorAndAppVersionViaConfig() throws Exception { + public void configOverridesEnvAppVerAndExecutor() throws Exception { + envVars.set("DBOS__VMID", "test-env-executor-id"); + envVars.set("DBOS__APPVERSION", "test-env-app-version"); + var config = DBOSConfig.defaults("config-test") .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys") @@ -54,11 +61,35 @@ public void setExecutorAndAppVersionViaConfig() throws Exception { } @Test - public void setExecutorAndAppVersionViaEnv() throws Exception { + public void envAppVerAndExecutor() throws Exception { envVars.set("DBOS__VMID", "test-env-executor-id"); envVars.set("DBOS__APPVERSION", "test-env-app-version"); + var config = + DBOSConfig.defaults("config-test") + .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys") + .withDbUser("postgres") + .withDbPassword(System.getenv("PGPASSWORD")); + + DBOS.reinitialize(config); + try { + DBOS.launch(); + var dbosExecutor = DBOSTestAccess.getDbosExecutor(); + assertEquals("test-env-app-version", dbosExecutor.appVersion()); + assertEquals("test-env-executor-id", dbosExecutor.executorId()); + } finally { + DBOS.shutdown(); + } + } + + @Test + public void dbosCloudEnvOverridesConfigAppVerAndExecutor() throws Exception { + + envVars.set("DBOS__CLOUD", "true"); + envVars.set("DBOS__VMID", "test-env-executor-id"); + envVars.set("DBOS__APPVERSION", "test-env-app-version"); + var config = DBOSConfig.defaults("config-test") .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys") @@ -114,6 +145,41 @@ public void conductorExecutorId() throws Exception { } } + @Test + public void cantSetExecutorIdWhenUsingConductor() throws Exception { + var config = + DBOSConfig.defaultsFromEnv("config-test") + .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys") + .withConductorKey("test-conductor-key") + .withExecutorId("test-executor-id"); + + DBOS.reinitialize(config); + try { + assertThrows(IllegalArgumentException.class, () -> DBOS.launch()); + } finally { + DBOS.shutdown(); + } + } + + @Test + public void cantSetEmptyConfigFields() throws Exception { + assertThrows(IllegalArgumentException.class, () -> DBOSConfig.defaults(null)); + assertThrows(IllegalArgumentException.class, () -> DBOSConfig.defaults("")); + + final var config = DBOSConfig.defaults("app-name"); + assertThrows(IllegalArgumentException.class, () -> config.withAppName("")); + assertThrows(IllegalArgumentException.class, () -> config.withAppName(null)); + + assertThrows(IllegalArgumentException.class, () -> config.withConductorKey("")); + assertDoesNotThrow(() -> config.withConductorKey(null)); + assertThrows(IllegalArgumentException.class, () -> config.withConductorDomain("")); + assertDoesNotThrow(() -> config.withConductorDomain(null)); + assertThrows(IllegalArgumentException.class, () -> config.withExecutorId("")); + assertDoesNotThrow(() -> config.withExecutorId(null)); + assertThrows(IllegalArgumentException.class, () -> config.withAppVersion("")); + assertDoesNotThrow(() -> config.withAppVersion(null)); + } + @Test public void calcAppVersion() throws Exception { var config = @@ -126,9 +192,12 @@ public void calcAppVersion() throws Exception { try { DBOS.launch(); var dbosExecutor = DBOSTestAccess.getDbosExecutor(); - // If we change the internally registered workflows, the expected value will change - var expected = "6482a0dde9a452189b20c5f5e0d00a661ea8f160d58244cfc0a99cc5f13dbcad"; - assertEquals(expected, dbosExecutor.appVersion()); + List> workflowClasses = + dbosExecutor.getWorkflows().stream() + .map(r -> r.target().getClass()) + .collect(Collectors.toList()); + var version = assertDoesNotThrow(() -> AppVersionComputer.computeAppVersion(workflowClasses)); + assertEquals(version, dbosExecutor.appVersion()); } finally { DBOS.shutdown(); } diff --git a/transact/src/test/java/dev/dbos/transact/invocation/PatchTest.java b/transact/src/test/java/dev/dbos/transact/invocation/PatchTest.java new file mode 100644 index 00000000..0f8ab811 --- /dev/null +++ b/transact/src/test/java/dev/dbos/transact/invocation/PatchTest.java @@ -0,0 +1,302 @@ +package dev.dbos.transact.invocation; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; +import dev.dbos.transact.config.DBOSConfig; +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.exceptions.DBOSUnexpectedStepException; +import dev.dbos.transact.utils.DBUtils; +import dev.dbos.transact.workflow.Workflow; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +interface PatchService { + int workflow(); +} + +class PatchServiceImplOne implements PatchService { + @Override + @Workflow + public int workflow() { + var a = DBOS.runStep(() -> 1, "stepOne"); + var b = DBOS.runStep(() -> 2, "stepTwo"); + return a + b; + } +} + +class PatchServiceImplTwo implements PatchService { + @Override + @Workflow + public int workflow() { + var a = + DBOS.patch("v2") ? DBOS.runStep(() -> 3, "stepThree") : DBOS.runStep(() -> 1, "stepOne"); + var b = DBOS.runStep(() -> 2, "stepTwo"); + return a + b; + } +} + +class PatchServiceImplThree implements PatchService { + @Override + @Workflow + public int workflow() { + var a = + DBOS.patch("v3") + ? DBOS.runStep(() -> 2, "stepTwo") + : DBOS.patch("v2") + ? DBOS.runStep(() -> 3, "stepThree") + : DBOS.runStep(() -> 1, "stepOne"); + var b = DBOS.runStep(() -> 2, "stepTwo"); + return a + b; + } +} + +class PatchServiceImplFour implements PatchService { + @Override + @Workflow + public int workflow() { + DBOS.deprecatePatch("v3"); + var a = DBOS.runStep(() -> 2, "stepTwo"); + var b = DBOS.runStep(() -> 2, "stepTwo"); + return a + b; + } +} + +class PatchServiceImplFive implements PatchService { + @Override + @Workflow + public int workflow() { + var a = DBOS.runStep(() -> 2, "stepTwo"); + var b = DBOS.runStep(() -> 2, "stepTwo"); + return a + b; + } +} + +// @org.junit.jupiter.api.Timeout(value = 2, unit = TimeUnit.MINUTES) +public class PatchTest { + + @AfterEach + void afterEachTest() throws Exception { + DBOS.shutdown(); + } + + @Test + public void testPatch() throws Exception { + + // Note, for this test we have to manually update the workflow name when forking across + // versions. This requires pausing and unpausing the queue service to ensure the forked + // workflow isn't executed until the workflow name is updated. + + // This hack is required because we can't have multiple service implementations with the same + // name the way you can in a dynamic programming language like python. + + // In production, developers would be expected to be updating services in place, so they would + // have the same workflow name across deployed versions. + + var dbosConfig = + DBOSConfig.defaultsFromEnv("systemdbtest") + .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys") + .withMaximumPoolSize(2) + .withEnablePatching() + .withAppVersion("test-version"); + var dataSource = SystemDatabase.createDataSource(dbosConfig); + + DBUtils.recreateDB(dbosConfig); + DBOS.reinitialize(dbosConfig); + + var proxy1 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplOne()); + DBOS.launch(); + + assertEquals("test-version", DBOSTestAccess.getDbosExecutor().appVersion()); + var queueService = DBOSTestAccess.getQueueService(); + + // Register and run the first version of a workflow + var h1 = DBOS.startWorkflow(() -> proxy1.workflow()); + assertEquals(3, h1.getResult()); + var steps = DBOS.listWorkflowSteps(h1.workflowId()); + assertEquals(2, steps.size()); + + // Recreate DBOS with a new (patched) version of a workflow + DBOS.shutdown(); + DBOS.reinitialize(dbosConfig); + var proxy2 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplTwo()); + DBOS.launch(); + + // Verify a new execution runs the post-patch workflow and stores a patch marker + var h2 = DBOS.startWorkflow(() -> proxy2.workflow()); + assertEquals(5, h2.getResult()); + steps = DBOS.listWorkflowSteps(h2.workflowId()); + assertEquals(3, steps.size()); + assertEquals("DBOS.patch-v2", steps.get(0).functionName()); + + // Verify an execution containing the patch marker can recover past the patch marker + var h2Fork2 = DBOS.forkWorkflow(h2.workflowId(), 3); + assertEquals(5, h2Fork2.getResult()); + steps = DBOS.listWorkflowSteps(h2Fork2.workflowId()); + assertEquals(3, steps.size()); + assertEquals("DBOS.patch-v2", steps.get(0).functionName()); + + // Verify an old execution runs the pre-patch workflow and does not store a patch marker + queueService.pause(); + var h2Fork1 = DBOS.forkWorkflow(h1.workflowId(), 2); + DBUtils.updateWorkflowName(dataSource, h2.workflowId(), h2Fork1.workflowId()); + queueService.unpause(); + assertEquals(3, h2Fork1.getResult()); + assertEquals(2, DBOS.listWorkflowSteps(h2Fork1.workflowId()).size()); + + // Recreate DBOS with another new (patched) version of a workflow + DBOS.shutdown(); + DBOS.reinitialize(dbosConfig); + var proxy3 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplThree()); + DBOS.launch(); + + // Verify a new execution runs the post-patch workflow and stores a patch marker + var h3 = DBOS.startWorkflow(() -> proxy3.workflow()); + assertEquals(4, h3.getResult()); + steps = DBOS.listWorkflowSteps(h3.workflowId()); + assertEquals(3, steps.size()); + assertEquals("DBOS.patch-v3", steps.get(0).functionName()); + + // Verify an execution containing the v3 patch marker recovers to v3 + var h3Fork3 = DBOS.forkWorkflow(h3.workflowId(), 3); + assertEquals(4, h3Fork3.getResult()); + steps = DBOS.listWorkflowSteps(h3Fork3.workflowId()); + assertEquals(3, steps.size()); + assertEquals("DBOS.patch-v3", steps.get(0).functionName()); + + // Verify an execution containing the v2 patch marker recovers to v2 + queueService.pause(); + var h3Fork2 = DBOS.forkWorkflow(h2.workflowId(), 3); + DBUtils.updateWorkflowName(dataSource, h3.workflowId(), h3Fork2.workflowId()); + queueService.unpause(); + assertEquals(5, h3Fork2.getResult()); + steps = DBOS.listWorkflowSteps(h3Fork2.workflowId()); + assertEquals(3, steps.size()); + assertEquals("DBOS.patch-v2", steps.get(0).functionName()); + + // Verify a v1 execution recovers the pre-patch workflow and does not store a patch marker + queueService.pause(); + var h3Fork1 = DBOS.forkWorkflow(h1.workflowId(), 2); + DBUtils.updateWorkflowName(dataSource, h3.workflowId(), h3Fork1.workflowId()); + queueService.unpause(); + assertEquals(3, h3Fork1.getResult()); + assertEquals(2, DBOS.listWorkflowSteps(h3Fork1.workflowId()).size()); + + // Now, let's deprecate the patch + DBOS.shutdown(); + DBOS.reinitialize(dbosConfig); + var proxy4 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplFour()); + DBOS.launch(); + + // Verify a new execution runs the final workflow but does not store a patch marker + var h4 = DBOS.startWorkflow(() -> proxy4.workflow()); + assertEquals(4, h4.getResult()); + assertEquals(2, DBOS.listWorkflowSteps(h4.workflowId()).size()); + + // Verify an execution sans patch marker recovers correctly + var h4Fork4 = DBOS.forkWorkflow(h4.workflowId(), 3); + assertEquals(4, h4Fork4.getResult()); + assertEquals(2, DBOS.listWorkflowSteps(h4Fork4.workflowId()).size()); + + // Verify an execution containing the v3 patch marker recovers to v3 + queueService.pause(); + var h4Fork3 = DBOS.forkWorkflow(h3.workflowId(), 3); + DBUtils.updateWorkflowName(dataSource, h4.workflowId(), h4Fork3.workflowId()); + queueService.unpause(); + assertEquals(4, h4Fork3.getResult()); + steps = DBOS.listWorkflowSteps(h4Fork3.workflowId()); + assertEquals(3, steps.size()); + assertEquals("DBOS.patch-v3", steps.get(0).functionName()); + + // Verify an execution containing the v2 patch marker cleanly fails + queueService.pause(); + var h4Fork2 = DBOS.forkWorkflow(h2.workflowId(), 3); + DBUtils.updateWorkflowName(dataSource, h4.workflowId(), h4Fork2.workflowId()); + queueService.unpause(); + assertThrows(DBOSUnexpectedStepException.class, () -> h4Fork2.getResult()); + + // Verify a v1 execution cleanly fails + queueService.pause(); + var h4Fork1 = DBOS.forkWorkflow(h1.workflowId(), 2); + DBUtils.updateWorkflowName(dataSource, h4.workflowId(), h4Fork1.workflowId()); + queueService.unpause(); + assertThrows(DBOSUnexpectedStepException.class, () -> h4Fork1.getResult()); + + // Now, let's deprecate the patch + DBOS.shutdown(); + DBOS.reinitialize(dbosConfig); + var proxy5 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplFive()); + DBOS.launch(); + + // Verify a new execution runs the final workflow but does not store a patch marker + var h5 = DBOS.startWorkflow(() -> proxy5.workflow()); + assertEquals(4, h5.getResult()); + assertEquals(2, DBOS.listWorkflowSteps(h5.workflowId()).size()); + + // Verify an execution from the deprecated patch works sans patch marker + queueService.pause(); + var h5Fork4 = DBOS.forkWorkflow(h4.workflowId(), 3); + DBUtils.updateWorkflowName(dataSource, h5.workflowId(), h5Fork4.workflowId()); + queueService.unpause(); + assertEquals(4, h5Fork4.getResult()); + assertEquals(2, DBOS.listWorkflowSteps(h5Fork4.workflowId()).size()); + + // Verify an execution containing the v3 patch marker cleanly fails + queueService.pause(); + var h5Fork3 = DBOS.forkWorkflow(h3.workflowId(), 3); + DBUtils.updateWorkflowName(dataSource, h5.workflowId(), h5Fork3.workflowId()); + queueService.unpause(); + assertThrows(DBOSUnexpectedStepException.class, () -> h5Fork3.getResult()); + + // Verify an execution containing the v2 patch marker cleanly fails + queueService.pause(); + var h5Fork2 = DBOS.forkWorkflow(h2.workflowId(), 3); + DBUtils.updateWorkflowName(dataSource, h5.workflowId(), h5Fork2.workflowId()); + queueService.unpause(); + assertThrows(DBOSUnexpectedStepException.class, () -> h5Fork2.getResult()); + + // Verify a v1 execution cleanly fails + queueService.pause(); + var h5Fork1 = DBOS.forkWorkflow(h1.workflowId(), 2); + DBUtils.updateWorkflowName(dataSource, h5.workflowId(), h5Fork1.workflowId()); + queueService.unpause(); + assertThrows(DBOSUnexpectedStepException.class, () -> h5Fork1.getResult()); + } + + @Test + public void patchThrowsNotConfigured() throws Exception { + var dbosConfig = + DBOSConfig.defaultsFromEnv("systemdbtest") + .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys") + .withMaximumPoolSize(2) + .withAppVersion("test-version"); + + DBUtils.recreateDB(dbosConfig); + DBOS.reinitialize(dbosConfig); + + var proxy2 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplTwo()); + DBOS.launch(); + + assertThrows(IllegalStateException.class, () -> proxy2.workflow()); + } + + @Test + public void deprecatePatchThrowsNotConfigured() throws Exception { + var dbosConfig = + DBOSConfig.defaultsFromEnv("systemdbtest") + .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys") + .withMaximumPoolSize(2) + .withAppVersion("test-version"); + + DBUtils.recreateDB(dbosConfig); + DBOS.reinitialize(dbosConfig); + + var proxy4 = DBOS.registerWorkflows(PatchService.class, new PatchServiceImplFour()); + DBOS.launch(); + + assertThrows(IllegalStateException.class, () -> proxy4.workflow()); + } +} diff --git a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java index f9f45ab9..ee4d3dac 100644 --- a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java +++ b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java @@ -387,4 +387,38 @@ AND status IN ('ENQUEUED', 'PENDING') } return success; } + + public static void updateWorkflowName(DataSource ds, String sourceId, String destinationId) { + updateWorkflowName(ds, sourceId, destinationId, null); + } + + public static void updateWorkflowName( + DataSource ds, String sourceId, String destinationId, String schema) { + var row = getWorkflowRow(ds, sourceId, schema); + if (row == null) { + throw new RuntimeException("Workflow %s not found".formatted(sourceId)); + } + + schema = SystemDatabase.sanitizeSchema(schema); + + var sql = + """ + UPDATE %s.workflow_status + SET name = ?, class_name = ?, config_name = ? + WHERE workflow_uuid = ? + """ + .formatted(schema); + try (var conn = ds.getConnection(); + var ps = conn.prepareStatement(sql)) { + + ps.setString(1, row.name()); + ps.setString(2, row.className()); + ps.setString(3, row.instanceName()); + ps.setString(4, destinationId); + + ps.executeUpdate(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } }