Skip to content

Commit

Permalink
[DSIP-67] Use command to trigger workflow instance rather generate wo…
Browse files Browse the repository at this point in the history
…rkflow instance
  • Loading branch information
ruanwenjun committed Aug 27, 2024
1 parent 9448806 commit 8fec4a4
Show file tree
Hide file tree
Showing 99 changed files with 1,826 additions and 1,312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class ExecutorAPITest {

private static long processDefinitionCode;

private static long triggerCode;
private static List<Integer> workflowInstanceIds;

@BeforeAll
public static void setup() {
Expand Down Expand Up @@ -138,7 +138,7 @@ public void testStartProcessInstance() {
processDefinitionCode, scheduleTime, FailureStrategy.END, WarningType.NONE);
Assertions.assertTrue(startProcessInstanceResponse.getBody().getSuccess());

triggerCode = (long) startProcessInstanceResponse.getBody().getData();
workflowInstanceIds = (List<Integer>) startProcessInstanceResponse.getBody().getData();
} catch (Exception e) {
log.error("failed", e);
Assertions.fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.dolphinscheduler.api.test.cases;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.apache.dolphinscheduler.api.test.core.DolphinScheduler;
Expand All @@ -44,14 +45,14 @@
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.awaitility.Awaitility;
Expand Down Expand Up @@ -105,7 +106,7 @@ public static void cleanup() {

@Test
@Order(1)
public void testQueryProcessInstancesByTriggerCode() {
public void testQueryProcessInstancesByWorkflowInstanceId() {
try {
// create test project
HttpResponse createProjectResponse = projectPage.createProject(loginUser, "project-test");
Expand Down Expand Up @@ -145,17 +146,22 @@ public void testQueryProcessInstancesByTriggerCode() {
HttpResponse startProcessInstanceResponse = executorPage.startProcessInstance(loginUser, projectCode,
processDefinitionCode, scheduleTime, FailureStrategy.END, WarningType.NONE);
assertTrue(startProcessInstanceResponse.getBody().getSuccess());
final List<Integer> workflowInstanceIds = (List<Integer>) startProcessInstanceResponse.getBody().getData();

assertEquals(1, workflowInstanceIds.size());
processInstanceId = workflowInstanceIds.get(0);

// make sure process instance has completed and successfully persisted into db
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> {
// query workflow instance by trigger code
HttpResponse queryProcessInstanceListResponse =
processInstancePage.queryProcessInstanceList(loginUser, projectCode, 1, 10);
processInstancePage.queryProcessInstanceById(loginUser, projectCode, processInstanceId);
assertTrue(queryProcessInstanceListResponse.getBody().getSuccess());
assertTrue(queryProcessInstanceListResponse.getBody().getData().toString()
.contains("test_import"));
final Map<String, Object> workflowInstance =
(Map<String, Object>) queryProcessInstanceListResponse.getBody().getData();
assertEquals("SUCCESS", workflowInstance.get("state"));
});
} catch (Exception e) {
log.error("failed", e);
Expand All @@ -174,7 +180,6 @@ public void testQueryProcessInstanceList() {

@Test
@Order(3)
@Disabled
public void testQueryTaskListByProcessId() {
HttpResponse queryTaskListByProcessIdResponse =
processInstancePage.queryTaskListByProcessId(loginUser, projectCode, processInstanceId);
Expand All @@ -184,7 +189,6 @@ public void testQueryTaskListByProcessId() {

@Test
@Order(4)
@Disabled
public void testQueryProcessInstanceById() {
HttpResponse queryProcessInstanceByIdResponse =
processInstancePage.queryProcessInstanceById(loginUser, projectCode, processInstanceId);
Expand All @@ -194,7 +198,6 @@ public void testQueryProcessInstanceById() {

@Test
@Order(5)
@Disabled
public void testDeleteProcessInstanceById() {
HttpResponse deleteProcessInstanceByIdResponse =
processInstancePage.deleteProcessInstanceById(loginUser, projectCode, processInstanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import com.google.common.collect.Lists;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
Expand Down Expand Up @@ -129,27 +131,27 @@ public class ExecutorController extends BaseController {
@ResponseStatus(HttpStatus.OK)
@ApiException(START_PROCESS_INSTANCE_ERROR)
@OperatorLog(auditType = AuditType.PROCESS_START)
public Result<Long> triggerWorkflowDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionCode") long processDefinitionCode,
@RequestParam(value = "scheduleTime") String scheduleTime,
@RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@RequestParam(value = "taskDependType", required = false, defaultValue = "TASK_POST") TaskDependType taskDependType,
@RequestParam(value = "execType", required = false, defaultValue = "START_PROCESS") CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "tenantCode", required = false, defaultValue = "default") String tenantCode,
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
@RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent,
@RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) {
public Result<List<Integer>> triggerWorkflowDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionCode") long processDefinitionCode,
@RequestParam(value = "scheduleTime") String scheduleTime,
@RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@RequestParam(value = "taskDependType", required = false, defaultValue = "TASK_POST") TaskDependType taskDependType,
@RequestParam(value = "execType", required = false, defaultValue = "START_PROCESS") CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "tenantCode", required = false, defaultValue = "default") String tenantCode,
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
@RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent,
@RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) {

switch (execType) {
case START_PROCESS:
Expand All @@ -170,7 +172,8 @@ public Result<Long> triggerWorkflowDefinition(@Parameter(hidden = true) @Request
.dryRun(Flag.of(dryRun))
.testFlag(Flag.of(testFlag))
.build();
return Result.success(execService.triggerWorkflowDefinition(workflowTriggerRequest));
return Result
.success(Lists.newArrayList(execService.triggerWorkflowDefinition(workflowTriggerRequest)));
case COMPLEMENT_DATA:
final WorkflowBackFillRequest workflowBackFillRequest = WorkflowBackFillRequest.builder()
.loginUser(loginUser)
Expand Down Expand Up @@ -250,35 +253,35 @@ public Result<Long> triggerWorkflowDefinition(@Parameter(hidden = true) @Request
@ResponseStatus(HttpStatus.OK)
@ApiException(BATCH_START_PROCESS_INSTANCE_ERROR)
@OperatorLog(auditType = AuditType.PROCESS_BATCH_START)
public Result<List<Long>> batchTriggerWorkflowDefinitions(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionCodes") String processDefinitionCodes,
@RequestParam(value = "scheduleTime") String scheduleTime,
@RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "tenantCode", required = false, defaultValue = "default") String tenantCode,
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
@RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent,
@RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) {
public Result<List<Integer>> batchTriggerWorkflowDefinitions(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionCodes") String processDefinitionCodes,
@RequestParam(value = "scheduleTime") String scheduleTime,
@RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "tenantCode", required = false, defaultValue = "default") String tenantCode,
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
@RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent,
@RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) {

List<Long> workflowDefinitionCodes = Arrays.stream(processDefinitionCodes.split(Constants.COMMA))
.map(Long::parseLong)
.collect(Collectors.toList());

List<Long> result = new ArrayList<>();
List<Integer> result = new ArrayList<>();
for (Long workflowDefinitionCode : workflowDefinitionCodes) {
Result<Long> triggerCodeResult = triggerWorkflowDefinition(loginUser,
Result<List<Integer>> workflowInstanceIds = triggerWorkflowDefinition(loginUser,
workflowDefinitionCode,
scheduleTime,
failureStrategy,
Expand All @@ -299,7 +302,7 @@ public Result<List<Long>> batchTriggerWorkflowDefinitions(@Parameter(hidden = tr
complementDependentMode,
allLevelDependent,
executionOrder);
result.add(triggerCodeResult.getData());
result.addAll(workflowInstanceIds.getData());
}
return Result.success(result);
}
Expand Down
Loading

0 comments on commit 8fec4a4

Please sign in to comment.