Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DSIP-67] Use command to trigger workflow instance rather generate workflow instance #16523

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class ExecutorAPITest {

private static long processDefinitionCode;

private static long triggerCode;
private static List<Integer> workflowInstanceIds;

@BeforeAll
public static void setup() {
Expand Down Expand Up @@ -138,7 +138,7 @@ public void testStartProcessInstance() {
processDefinitionCode, scheduleTime, FailureStrategy.END, WarningType.NONE);
Assertions.assertTrue(startProcessInstanceResponse.getBody().getSuccess());

triggerCode = (long) startProcessInstanceResponse.getBody().getData();
workflowInstanceIds = (List<Integer>) startProcessInstanceResponse.getBody().getData();
} catch (Exception e) {
log.error("failed", e);
Assertions.fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.dolphinscheduler.api.test.cases;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.apache.dolphinscheduler.api.test.core.DolphinScheduler;
Expand All @@ -44,14 +45,14 @@
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.awaitility.Awaitility;
Expand Down Expand Up @@ -105,7 +106,7 @@ public static void cleanup() {

@Test
@Order(1)
public void testQueryProcessInstancesByTriggerCode() {
public void testQueryProcessInstancesByWorkflowInstanceId() {
try {
// create test project
HttpResponse createProjectResponse = projectPage.createProject(loginUser, "project-test");
Expand Down Expand Up @@ -145,17 +146,22 @@ public void testQueryProcessInstancesByTriggerCode() {
HttpResponse startProcessInstanceResponse = executorPage.startProcessInstance(loginUser, projectCode,
processDefinitionCode, scheduleTime, FailureStrategy.END, WarningType.NONE);
assertTrue(startProcessInstanceResponse.getBody().getSuccess());
final List<Integer> workflowInstanceIds = (List<Integer>) startProcessInstanceResponse.getBody().getData();

assertEquals(1, workflowInstanceIds.size());
processInstanceId = workflowInstanceIds.get(0);

// make sure process instance has completed and successfully persisted into db
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> {
// query workflow instance by trigger code
HttpResponse queryProcessInstanceListResponse =
processInstancePage.queryProcessInstanceList(loginUser, projectCode, 1, 10);
processInstancePage.queryProcessInstanceById(loginUser, projectCode, processInstanceId);
assertTrue(queryProcessInstanceListResponse.getBody().getSuccess());
assertTrue(queryProcessInstanceListResponse.getBody().getData().toString()
.contains("test_import"));
final Map<String, Object> workflowInstance =
(Map<String, Object>) queryProcessInstanceListResponse.getBody().getData();
assertEquals("SUCCESS", workflowInstance.get("state"));
});
} catch (Exception e) {
log.error("failed", e);
Expand All @@ -174,7 +180,6 @@ public void testQueryProcessInstanceList() {

@Test
@Order(3)
@Disabled
public void testQueryTaskListByProcessId() {
HttpResponse queryTaskListByProcessIdResponse =
processInstancePage.queryTaskListByProcessId(loginUser, projectCode, processInstanceId);
Expand All @@ -184,7 +189,6 @@ public void testQueryTaskListByProcessId() {

@Test
@Order(4)
@Disabled
public void testQueryProcessInstanceById() {
HttpResponse queryProcessInstanceByIdResponse =
processInstancePage.queryProcessInstanceById(loginUser, projectCode, processInstanceId);
Expand All @@ -194,7 +198,6 @@ public void testQueryProcessInstanceById() {

@Test
@Order(5)
@Disabled
public void testDeleteProcessInstanceById() {
HttpResponse deleteProcessInstanceByIdResponse =
processInstancePage.deleteProcessInstanceById(loginUser, projectCode, processInstanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import com.google.common.collect.Lists;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
Expand Down Expand Up @@ -129,27 +131,27 @@ public class ExecutorController extends BaseController {
@ResponseStatus(HttpStatus.OK)
@ApiException(START_PROCESS_INSTANCE_ERROR)
@OperatorLog(auditType = AuditType.PROCESS_START)
public Result<Long> triggerWorkflowDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionCode") long processDefinitionCode,
@RequestParam(value = "scheduleTime") String scheduleTime,
@RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@RequestParam(value = "taskDependType", required = false, defaultValue = "TASK_POST") TaskDependType taskDependType,
@RequestParam(value = "execType", required = false, defaultValue = "START_PROCESS") CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "tenantCode", required = false, defaultValue = "default") String tenantCode,
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
@RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent,
@RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) {
public Result<List<Integer>> triggerWorkflowDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionCode") long processDefinitionCode,
@RequestParam(value = "scheduleTime") String scheduleTime,
@RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@RequestParam(value = "taskDependType", required = false, defaultValue = "TASK_POST") TaskDependType taskDependType,
@RequestParam(value = "execType", required = false, defaultValue = "START_PROCESS") CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "tenantCode", required = false, defaultValue = "default") String tenantCode,
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
@RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent,
@RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) {

switch (execType) {
case START_PROCESS:
Expand All @@ -170,7 +172,8 @@ public Result<Long> triggerWorkflowDefinition(@Parameter(hidden = true) @Request
.dryRun(Flag.of(dryRun))
.testFlag(Flag.of(testFlag))
.build();
return Result.success(execService.triggerWorkflowDefinition(workflowTriggerRequest));
return Result
.success(Lists.newArrayList(execService.triggerWorkflowDefinition(workflowTriggerRequest)));
case COMPLEMENT_DATA:
final WorkflowBackFillRequest workflowBackFillRequest = WorkflowBackFillRequest.builder()
.loginUser(loginUser)
Expand Down Expand Up @@ -250,35 +253,35 @@ public Result<Long> triggerWorkflowDefinition(@Parameter(hidden = true) @Request
@ResponseStatus(HttpStatus.OK)
@ApiException(BATCH_START_PROCESS_INSTANCE_ERROR)
@OperatorLog(auditType = AuditType.PROCESS_BATCH_START)
public Result<List<Long>> batchTriggerWorkflowDefinitions(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionCodes") String processDefinitionCodes,
@RequestParam(value = "scheduleTime") String scheduleTime,
@RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "tenantCode", required = false, defaultValue = "default") String tenantCode,
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
@RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent,
@RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) {
public Result<List<Integer>> batchTriggerWorkflowDefinitions(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionCodes") String processDefinitionCodes,
@RequestParam(value = "scheduleTime") String scheduleTime,
@RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "tenantCode", required = false, defaultValue = "default") String tenantCode,
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
@RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent,
@RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) {

List<Long> workflowDefinitionCodes = Arrays.stream(processDefinitionCodes.split(Constants.COMMA))
.map(Long::parseLong)
.collect(Collectors.toList());

List<Long> result = new ArrayList<>();
List<Integer> result = new ArrayList<>();
for (Long workflowDefinitionCode : workflowDefinitionCodes) {
Result<Long> triggerCodeResult = triggerWorkflowDefinition(loginUser,
Result<List<Integer>> workflowInstanceIds = triggerWorkflowDefinition(loginUser,
workflowDefinitionCode,
scheduleTime,
failureStrategy,
Expand All @@ -299,7 +302,7 @@ public Result<List<Long>> batchTriggerWorkflowDefinitions(@Parameter(hidden = tr
complementDependentMode,
allLevelDependent,
executionOrder);
result.add(triggerCodeResult.getData());
result.addAll(workflowInstanceIds.getData());
}
return Result.success(result);
}
Expand Down
Loading
Loading