diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 433a4ea395e3..109fc1596d02 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -174,6 +174,179 @@ private WorkflowDefinition updateWorkflowLocation(User user, WorkflowDefinition workflowUpdateRequest); } + /** + * Create resource task definition + * + * @param loginUser login user + * @param taskCreateRequest task definition json + * @return new TaskDefinition have created + */ + @Override + @Transactional + public TaskDefinition createTaskDefinitionV2(User loginUser, + TaskCreateRequest taskCreateRequest) { + TaskDefinition taskDefinition = taskCreateRequest.convert2TaskDefinition(); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(taskCreateRequest.getWorkflowCode()); + if (processDefinition == null) { + throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, taskCreateRequest.getWorkflowCode()); + } + // Add project code from process definition if not exists + if (taskDefinition.getProjectCode() == 0L) { + taskDefinition.setProjectCode(processDefinition.getProjectCode()); + } + this.checkTaskDefinitionValid(loginUser, taskDefinition, TASK_DEFINITION_CREATE); + + long taskDefinitionCode; + try { + taskDefinitionCode = CodeGenerateUtils.genCode(); + } catch (CodeGenerateException e) { + throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS); + } + taskDefinition.setCode(taskDefinitionCode); + + int create = taskDefinitionMapper.insert(taskDefinition); + if (create <= 0) { + throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); + } + this.persist2TaskDefinitionLog(loginUser, taskDefinition); + + // update related objects: task relationship, workflow's location(need to set to null and front-end will auto + // format it) + this.updateTaskUpstreams(loginUser, taskCreateRequest.getWorkflowCode(), taskDefinition.getCode(), + taskCreateRequest.getUpstreamTasksCodes()); + this.updateWorkflowLocation(loginUser, processDefinition); + return taskDefinition; + } + + /** + * create single task definition that binds the workflow + * + * @param loginUser login user + * @param projectCode project code + * @param processDefinitionCode process definition code + * @param taskDefinitionJsonObj task definition json object + * @param upstreamCodes upstream task codes, sep comma + * @return create result code + */ + @Transactional + @Override + public Map createTaskBindsWorkFlow(User loginUser, + long projectCode, + long processDefinitionCode, + String taskDefinitionJsonObj, + String upstreamCodes) { + Project project = projectMapper.queryByCode(projectCode); + // check if user have write perm for project + Map result = new HashMap<>(); + boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); + if (!hasProjectAndWritePerm) { + return result; + } + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { + log.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode)); + return result; + } + if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { + log.warn("Task definition can not be created due to process definition is {}, processDefinitionCode:{}.", + ReleaseState.ONLINE.getDescp(), processDefinition.getCode()); + putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, String.valueOf(processDefinitionCode)); + return result; + } + TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); + if (taskDefinition == null) { + log.warn("Parameter taskDefinitionJsonObj is invalid json."); + putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj); + return result; + } + if (!checkTaskParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams())) { + putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName()); + return result; + } + long taskCode = taskDefinition.getCode(); + if (taskCode == 0) { + taskDefinition.setCode(CodeGenerateUtils.genCode()); + } + List processTaskRelationLogList = + processTaskRelationMapper.queryByProcessCode(processDefinitionCode) + .stream() + .map(ProcessTaskRelationLog::new) + .collect(Collectors.toList()); + + if (StringUtils.isNotBlank(upstreamCodes)) { + Set upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong) + .collect(Collectors.toSet()); + List upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes); + Set queryUpStreamTaskCodes = + upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet()); + // upstreamTaskCodes - queryUpStreamTaskCodes + Set diffCode = upstreamTaskCodes.stream().filter(code -> !queryUpStreamTaskCodes.contains(code)) + .collect(Collectors.toSet()); + if (CollectionUtils.isNotEmpty(diffCode)) { + String taskCodes = StringUtils.join(diffCode, Constants.COMMA); + log.error("Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}.", + taskCodes); + putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCodes); + return result; + } + for (TaskDefinition upstreamTask : upstreamTaskDefinitionList) { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setPreTaskCode(upstreamTask.getCode()); + processTaskRelationLog.setPreTaskVersion(upstreamTask.getVersion()); + processTaskRelationLog.setPostTaskCode(taskCode); + processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); + processTaskRelationLog.setConditionType(ConditionType.NONE); + processTaskRelationLog.setConditionParams("{}"); + processTaskRelationLogList.add(processTaskRelationLog); + } + } else { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setPreTaskCode(0); + processTaskRelationLog.setPreTaskVersion(0); + processTaskRelationLog.setPostTaskCode(taskCode); + processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); + processTaskRelationLog.setConditionType(ConditionType.NONE); + processTaskRelationLog.setConditionParams("{}"); + processTaskRelationLogList.add(processTaskRelationLog); + } + int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), + processDefinition.getVersion()+1, + processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE); + if (insertResult != Constants.EXIT_CODE_SUCCESS) { + log.error( + "Save new version process task relations error, processDefinitionCode:{}, processDefinitionVersion:{}.", + processDefinition.getCode(), processDefinition.getVersion()); + putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + } else + log.info( + "Save new version process task relations complete, processDefinitionCode:{}, processDefinitionVersion:{}.", + processDefinition.getCode(), processDefinition.getVersion()); + + int saveTaskResult = + processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE); + if (saveTaskResult == Constants.DEFINITION_FAILURE) { + log.error("Save task definition error, projectCode:{}, taskDefinitionCode:{}.", projectCode, + taskDefinition.getCode()); + putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); + throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); + } else + log.info("Save task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, + taskDefinition.getCode()); + + int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); + if (insertVersion == 0) { + throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); + } else + log.info("Save process definition complete, processCode:{}, processVersion:{}.", + processDefinition.getCode(), insertVersion); + + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, taskDefinition); + return result; + } + /** * query task definition *