Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.dapr.durabletask;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -235,32 +236,6 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
*/
public abstract void terminate(String instanceId, @Nullable Object output);

/**
* Fetches orchestration instance metadata from the configured durable store using a status query filter.
*
* @param query filter criteria that determines which orchestrations to fetch data for.
* @return the result of the query operation, including instance metadata and possibly a continuation token
*/
public abstract OrchestrationStatusQueryResult queryInstances(OrchestrationStatusQuery query);

/**
* Initializes the target task hub data store.
*
* <p>This is an administrative operation that only needs to be done once for the lifetime of the task hub.</p>
*
* @param recreateIfExists <code>true</code> to delete any existing task hub first; <code>false</code> to make this
* operation a no-op if the task hub data store already exists. Note that deleting a task
* hub will result in permanent data loss. Use this operation with care.
*/
public abstract void createTaskHub(boolean recreateIfExists);

/**
* Permanently deletes the target task hub data store and any orchestration data it may contain.
*
* <p>This is an administrative operation that is irreversible. It should be used with great care.</p>
*/
public abstract void deleteTaskHub();

/**
* Purges orchestration instance metadata from the durable store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@
import io.opentelemetry.api.trace.Tracer;

import javax.annotation.Nullable;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -307,49 +306,6 @@ public void terminate(String instanceId, @Nullable Object output) {
this.sidecarClient.terminateInstance(builder.build());
}

@Override
public OrchestrationStatusQueryResult queryInstances(OrchestrationStatusQuery query) {
OrchestratorService.InstanceQuery.Builder instanceQueryBuilder = OrchestratorService.InstanceQuery.newBuilder();
Optional.ofNullable(query.getCreatedTimeFrom()).ifPresent(createdTimeFrom ->
instanceQueryBuilder.setCreatedTimeFrom(DataConverter.getTimestampFromInstant(createdTimeFrom)));
Optional.ofNullable(query.getCreatedTimeTo()).ifPresent(createdTimeTo ->
instanceQueryBuilder.setCreatedTimeTo(DataConverter.getTimestampFromInstant(createdTimeTo)));
Optional.ofNullable(query.getContinuationToken()).ifPresent(token ->
instanceQueryBuilder.setContinuationToken(StringValue.of(token)));
Optional.ofNullable(query.getInstanceIdPrefix()).ifPresent(prefix ->
instanceQueryBuilder.setInstanceIdPrefix(StringValue.of(prefix)));
instanceQueryBuilder.setFetchInputsAndOutputs(query.isFetchInputsAndOutputs());
instanceQueryBuilder.setMaxInstanceCount(query.getMaxInstanceCount());
query.getRuntimeStatusList().forEach(runtimeStatus ->
Optional.ofNullable(runtimeStatus).ifPresent(status ->
instanceQueryBuilder.addRuntimeStatus(OrchestrationRuntimeStatus.toProtobuf(status))));
query.getTaskHubNames().forEach(taskHubName -> Optional.ofNullable(taskHubName).ifPresent(name ->
instanceQueryBuilder.addTaskHubNames(StringValue.of(name))));
OrchestratorService.QueryInstancesResponse queryInstancesResponse = this.sidecarClient
.queryInstances(OrchestratorService.QueryInstancesRequest.newBuilder().setQuery(instanceQueryBuilder).build());
return toQueryResult(queryInstancesResponse, query.isFetchInputsAndOutputs());
}

private OrchestrationStatusQueryResult toQueryResult(
OrchestratorService.QueryInstancesResponse queryInstancesResponse, boolean fetchInputsAndOutputs) {
List<OrchestrationMetadata> metadataList = new ArrayList<>();
queryInstancesResponse.getOrchestrationStateList().forEach(state -> {
metadataList.add(new OrchestrationMetadata(state, this.dataConverter, fetchInputsAndOutputs));
});
return new OrchestrationStatusQueryResult(metadataList, queryInstancesResponse.getContinuationToken().getValue());
}

@Override
public void createTaskHub(boolean recreateIfExists) {
this.sidecarClient.createTaskHub(OrchestratorService.CreateTaskHubRequest.newBuilder()
.setRecreateIfExists(recreateIfExists).build());
}

@Override
public void deleteTaskHub() {
this.sidecarClient.deleteTaskHub(OrchestratorService.DeleteTaskHubRequest.newBuilder().build());
}

@Override
public PurgeResult purgeInstance(String instanceId) {
OrchestratorService.PurgeInstancesRequest request = OrchestratorService.PurgeInstancesRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@ public void startAndBlock() {

this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer));

} else if (requestType == OrchestratorService.WorkItem.RequestCase.HEALTHPING) {
// No-op
} else {
logger.log(Level.WARNING,
"Received and dropped an unknown '{0}' work-item from the sidecar.",
Expand Down

This file was deleted.

This file was deleted.

Loading