diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java index 7f8791803..b08ffcc53 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskClient.java @@ -14,6 +14,7 @@ package io.dapr.durabletask; import javax.annotation.Nullable; + import java.time.Duration; import java.util.concurrent.TimeoutException; @@ -235,32 +236,6 @@ public abstract OrchestrationMetadata waitForInstanceCompletion( */ public abstract void terminate(String instanceId, @Nullable Object output); - /** - * Fetches orchestration instance metadata from the configured durable store using a status query filter. - * - * @param query filter criteria that determines which orchestrations to fetch data for. - * @return the result of the query operation, including instance metadata and possibly a continuation token - */ - public abstract OrchestrationStatusQueryResult queryInstances(OrchestrationStatusQuery query); - - /** - * Initializes the target task hub data store. - * - *

This is an administrative operation that only needs to be done once for the lifetime of the task hub.

- * - * @param recreateIfExists true to delete any existing task hub first; false to make this - * operation a no-op if the task hub data store already exists. Note that deleting a task - * hub will result in permanent data loss. Use this operation with care. - */ - public abstract void createTaskHub(boolean recreateIfExists); - - /** - * Permanently deletes the target task hub data store and any orchestration data it may contain. - * - *

This is an administrative operation that is irreversible. It should be used with great care.

- */ - public abstract void deleteTaskHub(); - /** * Purges orchestration instance metadata from the durable store. * diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java index 881b9e958..e66e6f308 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java @@ -32,13 +32,12 @@ import io.opentelemetry.api.trace.Tracer; import javax.annotation.Nullable; + import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -307,49 +306,6 @@ public void terminate(String instanceId, @Nullable Object output) { this.sidecarClient.terminateInstance(builder.build()); } - @Override - public OrchestrationStatusQueryResult queryInstances(OrchestrationStatusQuery query) { - OrchestratorService.InstanceQuery.Builder instanceQueryBuilder = OrchestratorService.InstanceQuery.newBuilder(); - Optional.ofNullable(query.getCreatedTimeFrom()).ifPresent(createdTimeFrom -> - instanceQueryBuilder.setCreatedTimeFrom(DataConverter.getTimestampFromInstant(createdTimeFrom))); - Optional.ofNullable(query.getCreatedTimeTo()).ifPresent(createdTimeTo -> - instanceQueryBuilder.setCreatedTimeTo(DataConverter.getTimestampFromInstant(createdTimeTo))); - Optional.ofNullable(query.getContinuationToken()).ifPresent(token -> - instanceQueryBuilder.setContinuationToken(StringValue.of(token))); - Optional.ofNullable(query.getInstanceIdPrefix()).ifPresent(prefix -> - instanceQueryBuilder.setInstanceIdPrefix(StringValue.of(prefix))); - instanceQueryBuilder.setFetchInputsAndOutputs(query.isFetchInputsAndOutputs()); - instanceQueryBuilder.setMaxInstanceCount(query.getMaxInstanceCount()); - query.getRuntimeStatusList().forEach(runtimeStatus -> - Optional.ofNullable(runtimeStatus).ifPresent(status -> - instanceQueryBuilder.addRuntimeStatus(OrchestrationRuntimeStatus.toProtobuf(status)))); - query.getTaskHubNames().forEach(taskHubName -> Optional.ofNullable(taskHubName).ifPresent(name -> - instanceQueryBuilder.addTaskHubNames(StringValue.of(name)))); - OrchestratorService.QueryInstancesResponse queryInstancesResponse = this.sidecarClient - .queryInstances(OrchestratorService.QueryInstancesRequest.newBuilder().setQuery(instanceQueryBuilder).build()); - return toQueryResult(queryInstancesResponse, query.isFetchInputsAndOutputs()); - } - - private OrchestrationStatusQueryResult toQueryResult( - OrchestratorService.QueryInstancesResponse queryInstancesResponse, boolean fetchInputsAndOutputs) { - List metadataList = new ArrayList<>(); - queryInstancesResponse.getOrchestrationStateList().forEach(state -> { - metadataList.add(new OrchestrationMetadata(state, this.dataConverter, fetchInputsAndOutputs)); - }); - return new OrchestrationStatusQueryResult(metadataList, queryInstancesResponse.getContinuationToken().getValue()); - } - - @Override - public void createTaskHub(boolean recreateIfExists) { - this.sidecarClient.createTaskHub(OrchestratorService.CreateTaskHubRequest.newBuilder() - .setRecreateIfExists(recreateIfExists).build()); - } - - @Override - public void deleteTaskHub() { - this.sidecarClient.deleteTaskHub(OrchestratorService.DeleteTaskHubRequest.newBuilder().build()); - } - @Override public PurgeResult purgeInstance(String instanceId) { OrchestratorService.PurgeInstancesRequest request = OrchestratorService.PurgeInstancesRequest.newBuilder() diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 806e1dfc5..1e08d0804 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -197,8 +197,6 @@ public void startAndBlock() { this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer)); - } else if (requestType == OrchestratorService.WorkItem.RequestCase.HEALTHPING) { - // No-op } else { logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationStatusQuery.java b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationStatusQuery.java deleted file mode 100644 index 864fc37c8..000000000 --- a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationStatusQuery.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright 2025 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.durabletask; - -import javax.annotation.Nullable; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; - -/** - * Class used for constructing orchestration metadata queries. - */ -public final class OrchestrationStatusQuery { - private List runtimeStatusList = new ArrayList<>(); - private Instant createdTimeFrom; - private Instant createdTimeTo; - private List taskHubNames = new ArrayList<>(); - private int maxInstanceCount = 100; - private String continuationToken; - private String instanceIdPrefix; - private boolean fetchInputsAndOutputs; - - /** - * Sole constructor. - */ - public OrchestrationStatusQuery() { - } - - /** - * Sets the list of runtime status values to use as a filter. Only orchestration instances that have a matching - * runtime status will be returned. The default {@code null} value will disable runtime status filtering. - * - * @param runtimeStatusList the list of runtime status values to use as a filter - * @return this query object - */ - public OrchestrationStatusQuery setRuntimeStatusList(@Nullable List runtimeStatusList) { - this.runtimeStatusList = runtimeStatusList; - return this; - } - - /** - * Include orchestration instances that were created after the specified instant. - * - * @param createdTimeFrom the minimum orchestration creation time to use as a filter or {@code null} to disable this - * filter - * @return this query object - */ - public OrchestrationStatusQuery setCreatedTimeFrom(@Nullable Instant createdTimeFrom) { - this.createdTimeFrom = createdTimeFrom; - return this; - } - - /** - * Include orchestration instances that were created before the specified instant. - * - * @param createdTimeTo the maximum orchestration creation time to use as a filter or {@code null} to disable this - * filter - * @return this query object - */ - public OrchestrationStatusQuery setCreatedTimeTo(@Nullable Instant createdTimeTo) { - this.createdTimeTo = createdTimeTo; - return this; - } - - /** - * Sets the maximum number of records that can be returned by the query. The default value is 100. - * - *

Requests may return fewer records than the specified page size, even if there are more records. - * Always check the continuation token to determine whether there are more records.

- * - * @param maxInstanceCount the maximum number of orchestration metadata records to return - * @return this query object - */ - public OrchestrationStatusQuery setMaxInstanceCount(int maxInstanceCount) { - this.maxInstanceCount = maxInstanceCount; - return this; - } - - /** - * Include orchestration metadata records that have a matching task hub name. - * - * @param taskHubNames the task hub name to match or {@code null} to disable this filter - * @return this query object - */ - public OrchestrationStatusQuery setTaskHubNames(@Nullable List taskHubNames) { - this.taskHubNames = taskHubNames; - return this; - } - - /** - * Sets the continuation token used to continue paging through orchestration metadata results. - * - *

This should always be the continuation token value from the previous query's - * {@link OrchestrationStatusQueryResult} result.

- * - * @param continuationToken the continuation token from the previous query - * @return this query object - */ - public OrchestrationStatusQuery setContinuationToken(@Nullable String continuationToken) { - this.continuationToken = continuationToken; - return this; - } - - /** - * Include orchestration metadata records with the specified instance ID prefix. - * - *

For example, if there are three orchestration instances in the metadata store with IDs "Foo", "Bar", and "Baz", - * specifying a prefix value of "B" will exclude "Foo" since its ID doesn't start with "B".

- * - * @param instanceIdPrefix the instance ID prefix filter value - * @return this query object - */ - public OrchestrationStatusQuery setInstanceIdPrefix(@Nullable String instanceIdPrefix) { - this.instanceIdPrefix = instanceIdPrefix; - return this; - } - - /** - * Sets whether to fetch orchestration inputs, outputs, and custom status values. The default value is {@code false}. - * - * @param fetchInputsAndOutputs {@code true} to fetch orchestration inputs, outputs, and custom status values, - * otherwise {@code false} - * @return this query object - */ - public OrchestrationStatusQuery setFetchInputsAndOutputs(boolean fetchInputsAndOutputs) { - this.fetchInputsAndOutputs = fetchInputsAndOutputs; - return this; - } - - /** - * Gets the configured runtime status filter or {@code null} if none was configured. - * - * @return the configured runtime status filter as a list of values or {@code null} if none was configured - */ - public List getRuntimeStatusList() { - return runtimeStatusList; - } - - /** - * Gets the configured minimum orchestration creation time or {@code null} if none was configured. - * - * @return the configured minimum orchestration creation time or {@code null} if none was configured - */ - @Nullable - public Instant getCreatedTimeFrom() { - return createdTimeFrom; - } - - /** - * Gets the configured maximum orchestration creation time or {@code null} if none was configured. - * - * @return the configured maximum orchestration creation time or {@code null} if none was configured - */ - @Nullable - public Instant getCreatedTimeTo() { - return createdTimeTo; - } - - /** - * Gets the configured maximum number of records that can be returned by the query. - * - * @return the configured maximum number of records that can be returned by the query - */ - public int getMaxInstanceCount() { - return maxInstanceCount; - } - - /** - * Gets the configured task hub names to match or {@code null} if none were configured. - * - * @return the configured task hub names to match or {@code null} if none were configured - */ - public List getTaskHubNames() { - return taskHubNames; - } - - /** - * Gets the configured continuation token value or {@code null} if none was configured. - * - * @return the configured continuation token value or {@code null} if none was configured - */ - @Nullable - public String getContinuationToken() { - return continuationToken; - } - - /** - * Gets the configured instance ID prefix filter value or {@code null} if none was configured. - * - * @return the configured instance ID prefix filter value or {@code null} if none was configured. - */ - @Nullable - public String getInstanceIdPrefix() { - return instanceIdPrefix; - } - - /** - * Gets the configured value that determines whether to fetch orchestration inputs, outputs, and custom status values. - * - * @return the configured value that determines whether to fetch orchestration inputs, outputs, and custom - * status values - */ - public boolean isFetchInputsAndOutputs() { - return fetchInputsAndOutputs; - } -} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationStatusQueryResult.java b/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationStatusQueryResult.java deleted file mode 100644 index efb4908c1..000000000 --- a/durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationStatusQueryResult.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2025 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.durabletask; - -import javax.annotation.Nullable; -import java.util.List; - -/** - * Class representing the results of a filtered orchestration metadata query. - * - *

Orchestration metadata can be queried with filters using the {@link DurableTaskClient#queryInstances} method.

- */ -public final class OrchestrationStatusQueryResult { - private final List orchestrationStates; - private final String continuationToken; - - OrchestrationStatusQueryResult(List orchestrationStates, @Nullable String continuationToken) { - this.orchestrationStates = orchestrationStates; - this.continuationToken = continuationToken; - } - - /** - * Gets the list of orchestration metadata records that matched the {@link DurableTaskClient#queryInstances} query. - * - * @return the list of orchestration metadata records that matched the {@link DurableTaskClient#queryInstances} query. - */ - public List getOrchestrationState() { - return this.orchestrationStates; - } - - /** - * Gets the continuation token to use with the next query or {@code null} if no more metadata records are found. - * - *

Note that a non-null value does not always mean that there are more metadata records that can be returned by a - * query.

- * - * @return the continuation token to use with the next query or {@code null} if no more metadata records are found. - */ - public String getContinuationToken() { - return this.continuationToken; - } -} diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskClientIT.java b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskClientIT.java index abf146c7c..dcd43dc49 100644 --- a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskClientIT.java +++ b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskClientIT.java @@ -1100,188 +1100,6 @@ void clearCustomStatus() throws TimeoutException { } } - // due to clock drift, client/worker and sidecar time are not exactly synchronized, this test needs to accommodate for client vs backend timestamps difference - @Test - @Disabled("Test is disabled for investigation, fixing the test retry pattern exposed the failure") - void multiInstanceQuery() throws TimeoutException { - final String plusOne = "plusOne"; - final String waitForEvent = "waitForEvent"; - final DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); - DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(plusOne, ctx -> { - int value = ctx.getInput(int.class); - for (int i = 0; i < 10; i++) { - value = ctx.callActivity(plusOne, value, int.class).await(); - } - ctx.complete(value); - }) - .addActivity(plusOne, ctx -> ctx.getInput(int.class) + 1) - .addOrchestrator(waitForEvent, ctx -> { - String name = ctx.getInput(String.class); - String output = ctx.waitForExternalEvent(name, String.class).await(); - ctx.complete(output); - }).buildAndStart(); - - try (worker; client) { - Instant startTime = Instant.now(); - String prefix = startTime.toString(); - - IntStream.range(0, 5).mapToObj(i -> { - String instanceId = String.format("%s.sequence.%d", prefix, i); - client.scheduleNewOrchestrationInstance(plusOne, 0, instanceId); - return instanceId; - }).collect(Collectors.toUnmodifiableList()).forEach(id -> { - try { - client.waitForInstanceCompletion(id, defaultTimeout, true); - } catch (TimeoutException e) { - e.printStackTrace(); - } - }); - - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - } - - Instant sequencesFinishedTime = Instant.now(); - - IntStream.range(0, 5).mapToObj(i -> { - String instanceId = String.format("%s.waiter.%d", prefix, i); - client.scheduleNewOrchestrationInstance(waitForEvent, String.valueOf(i), instanceId); - return instanceId; - }).collect(Collectors.toUnmodifiableList()).forEach(id -> { - try { - client.waitForInstanceStart(id, defaultTimeout); - } catch (TimeoutException e) { - e.printStackTrace(); - } - }); - - // Create one query object and reuse it for multiple queries - OrchestrationStatusQuery query = new OrchestrationStatusQuery(); - OrchestrationStatusQueryResult result = null; - - // Return all instances - result = client.queryInstances(query); - assertEquals(10, result.getOrchestrationState().size()); - - // Test CreatedTimeTo filter - query.setCreatedTimeTo(startTime.minus(Duration.ofSeconds(1))); - result = client.queryInstances(query); - assertTrue(result.getOrchestrationState().isEmpty(), - "Result should be empty but found " + result.getOrchestrationState().size() + " instances: " + - "Start time: " + startTime + ", " + - result.getOrchestrationState().stream() - .map(state -> String.format("\nID: %s, Status: %s, Created: %s", - state.getInstanceId(), - state.getRuntimeStatus(), - state.getCreatedAt())) - .collect(Collectors.joining(", "))); - - query.setCreatedTimeTo(sequencesFinishedTime); - result = client.queryInstances(query); - // Verify all returned instances contain "sequence" in their IDs - assertEquals(5, result.getOrchestrationState().stream() - .filter(state -> state.getInstanceId().contains("sequence")) - .count(), - "Expected exactly 5 instances with 'sequence' in their IDs"); - - query.setCreatedTimeTo(Instant.now().plus(Duration.ofSeconds(1))); - result = client.queryInstances(query); - assertEquals(10, result.getOrchestrationState().size()); - - // Test CreatedTimeFrom filter - query.setCreatedTimeFrom(Instant.now().plus(Duration.ofSeconds(1))); - result = client.queryInstances(query); - assertTrue(result.getOrchestrationState().isEmpty()); - - query.setCreatedTimeFrom(sequencesFinishedTime.minus(Duration.ofSeconds(5))); - result = client.queryInstances(query); - assertEquals(5, result.getOrchestrationState().stream() - .filter(state -> state.getInstanceId().contains("sequence")) - .count(), - "Expected exactly 5 instances with 'sequence' in their IDs"); - - query.setCreatedTimeFrom(startTime.minus(Duration.ofSeconds(1))); - result = client.queryInstances(query); - assertEquals(10, result.getOrchestrationState().size()); - - // Test RuntimeStatus filter - HashSet statusFilters = Stream.of( - OrchestrationRuntimeStatus.PENDING, - OrchestrationRuntimeStatus.FAILED, - OrchestrationRuntimeStatus.TERMINATED - ).collect(Collectors.toCollection(HashSet::new)); - - query.setRuntimeStatusList(new ArrayList<>(statusFilters)); - result = client.queryInstances(query); - assertTrue(result.getOrchestrationState().isEmpty()); - - statusFilters.add(OrchestrationRuntimeStatus.RUNNING); - query.setRuntimeStatusList(new ArrayList<>(statusFilters)); - result = client.queryInstances(query); - assertEquals(5, result.getOrchestrationState().size()); - - statusFilters.add(OrchestrationRuntimeStatus.COMPLETED); - query.setRuntimeStatusList(new ArrayList<>(statusFilters)); - result = client.queryInstances(query); - assertEquals(10, result.getOrchestrationState().size()); - - statusFilters.remove(OrchestrationRuntimeStatus.RUNNING); - query.setRuntimeStatusList(new ArrayList<>(statusFilters)); - result = client.queryInstances(query); - assertEquals(5, result.getOrchestrationState().size()); - - statusFilters.clear(); - query.setRuntimeStatusList(new ArrayList<>(statusFilters)); - result = client.queryInstances(query); - assertEquals(10, result.getOrchestrationState().size()); - - // Test InstanceIdPrefix - query.setInstanceIdPrefix("Foo"); - result = client.queryInstances(query); - assertTrue(result.getOrchestrationState().isEmpty()); - - query.setInstanceIdPrefix(prefix); - result = client.queryInstances(query); - assertEquals(10, result.getOrchestrationState().size()); - - // Test PageSize and ContinuationToken - HashSet instanceIds = new HashSet<>(); - query.setMaxInstanceCount(0); - while (query.getMaxInstanceCount() < 10) { - query.setMaxInstanceCount(query.getMaxInstanceCount() + 1); - result = client.queryInstances(query); - int total = result.getOrchestrationState().size(); - assertEquals(query.getMaxInstanceCount(), total); - result.getOrchestrationState().forEach(state -> assertTrue(instanceIds.add(state.getInstanceId()))); - while (total < 10) { - query.setContinuationToken(result.getContinuationToken()); - result = client.queryInstances(query); - int count = result.getOrchestrationState().size(); - assertNotEquals(0, count); - assertTrue(count <= query.getMaxInstanceCount()); - total += count; - assertTrue(total <= 10); - result.getOrchestrationState().forEach(state -> assertTrue(instanceIds.add(state.getInstanceId()))); - } - query.setContinuationToken(null); - instanceIds.clear(); - } - - // Test ShowInput - query.setFetchInputsAndOutputs(true); - query.setCreatedTimeFrom(sequencesFinishedTime); - result = client.queryInstances(query); - result.getOrchestrationState().forEach(state -> assertNotNull(state.readInputAs(String.class))); - - query.setFetchInputsAndOutputs(false); - query.setCreatedTimeFrom(sequencesFinishedTime); - result = client.queryInstances(query); - result.getOrchestrationState().forEach(state -> assertThrows(IllegalStateException.class, () -> state.readInputAs(String.class))); - } - } - @Test void purgeInstanceId() throws TimeoutException { final String orchestratorName = "PurgeInstance";