diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskIoDataMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskIoDataMapper.java new file mode 100644 index 00000000..6f44de02 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskIoDataMapper.java @@ -0,0 +1,14 @@ +package site.icebang.domain.workflow.mapper; + +import java.util.Optional; + +import org.apache.ibatis.annotations.Mapper; + +import site.icebang.domain.workflow.model.TaskIoData; + +@Mapper +public interface TaskIoDataMapper { + void insert(TaskIoData taskIoData); + + Optional findOutputByTaskRunId(Long taskRunId); +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java index e177dee6..267e931a 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java @@ -1,5 +1,7 @@ package site.icebang.domain.workflow.mapper; +import java.util.Optional; + import org.apache.ibatis.annotations.Mapper; import site.icebang.domain.workflow.model.TaskRun; @@ -9,4 +11,6 @@ public interface TaskRunMapper { void insert(TaskRun taskRun); void update(TaskRun taskRun); + + Optional findLatestSuccessRunInJob(Long jobRunId, String taskName); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskIoData.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskIoData.java new file mode 100644 index 00000000..16bbd7c2 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/TaskIoData.java @@ -0,0 +1,35 @@ +package site.icebang.domain.workflow.model; + +import java.time.Instant; + +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Getter +@NoArgsConstructor +public class TaskIoData { + private Long id; + private Long taskRunId; + private String ioType; + private String name; + private String dataType; + private String dataValue; // JSON을 문자열로 저장 + private Long dataSize; + private Instant createdAt; + + public TaskIoData( + Long taskRunId, + String ioType, + String name, + String dataType, + String dataValue, + Long dataSize) { + this.taskRunId = taskRunId; + this.ioType = ioType; + this.name = name; + this.dataType = dataType; + this.dataValue = dataValue; + this.dataSize = dataSize; + this.createdAt = Instant.now(); + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java index ed148061..39cb9378 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogPublishBodyBuilder.java @@ -1,6 +1,5 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; import java.util.Optional; import org.springframework.stereotype.Component; @@ -11,13 +10,16 @@ import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; @Component @RequiredArgsConstructor public class BlogPublishBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; private static final String TASK_NAME = "블로그 발행 태스크"; private static final String RAG_SOURCE_TASK = "블로그 RAG 생성 태스크"; @@ -27,33 +29,40 @@ public boolean supports(String taskName) { } @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - // RAG에서 생성된 블로그 콘텐츠 가져오기 - Optional.ofNullable(workflowContext.get(RAG_SOURCE_TASK)) - .ifPresent( - ragResult -> { - JsonNode data = ragResult.path("data"); - - // 제목, 내용, 태그 설정 - Optional.ofNullable(data.path("title")) - .filter(node -> !node.isMissingNode()) - .ifPresent(titleNode -> body.set("post_title", titleNode)); - - Optional.ofNullable(data.path("content")) - .filter(node -> !node.isMissingNode()) - .ifPresent(contentNode -> body.set("post_content", contentNode)); - - Optional.ofNullable(data.path("tags")) - .filter(node -> !node.isMissingNode()) - .ifPresent(tagsNode -> body.set("post_tags", tagsNode)); - }); - String blog_name = task.getSettings().path("blog_name").asText(""); - body.put("tag", task.getSettings().get("tag").asText()); - body.put("blog_name", blog_name); - body.put("blog_id", task.getSettings().get("blog_id").asText()); - body.put("blog_pw", task.getSettings().get("blog_pw").asText()); + Optional ragResultOpt = contextService.getPreviousTaskOutput(jobRun, RAG_SOURCE_TASK); + ragResultOpt.ifPresent( + ragResult -> { + JsonNode data = ragResult.path("data"); + + // 📌 1. .path()로 노드를 가져옵니다. + JsonNode titleNode = data.path("title"); + // 📌 2. .isMissingNode()로 노드가 존재하는지 확인합니다. + if (!titleNode.isMissingNode()) { + body.set("post_title", titleNode); + } + + JsonNode contentNode = data.path("content"); + if (!contentNode.isMissingNode()) { + body.set("post_content", contentNode); + } + + JsonNode tagsNode = data.path("tags"); + if (!tagsNode.isMissingNode()) { + body.set("post_tags", tagsNode); + } + }); + + Optional settingsOpt = Optional.ofNullable(task.getSettings()); + settingsOpt.ifPresent( + settings -> { + body.put("tag", settings.path("tag").asText()); + body.put("blog_name", settings.path("blog_name").asText()); + body.put("blog_id", settings.path("blog_id").asText()); + body.put("blog_pw", settings.path("blog_pw").asText()); + }); return body; } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java index ed528629..33b3ca55 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/BlogRagBodyBuilder.java @@ -1,6 +1,5 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; import java.util.Optional; import org.springframework.stereotype.Component; @@ -11,14 +10,19 @@ import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; @Component @RequiredArgsConstructor public class BlogRagBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 private static final String TASK_NAME = "블로그 RAG 생성 태스크"; + + // 📌 데이터 소스가 되는 이전 Task들의 이름 private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; private static final String PRODUCT_SELECT_SOURCE_TASK = "상품 선택 태스크"; private static final String OCR_SOURCE_TASK = "이미지 OCR 태스크"; @@ -28,23 +32,35 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 '블로그 RAG 생성'을 위한 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @return 생성된 JSON Body + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - // 키워드 정보 가져오기 - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) + // 1. '키워드 검색 태스크' 결과에서 키워드 정보 가져오기 + Optional keywordResult = + contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + keywordResult .map(node -> node.path("data").path("keyword")) .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - // OCR 번역 결과 가져오기 (새로 추가) - Optional.ofNullable(workflowContext.get(OCR_SOURCE_TASK)) + // 2. '이미지 OCR 태스크' 결과에서 번역 언어 정보 가져오기 + Optional ocrResult = contextService.getPreviousTaskOutput(jobRun, OCR_SOURCE_TASK); + ocrResult .map(node -> node.path("data").path("translation_language")) .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) .ifPresent(translationNode -> body.set("translation_language", translationNode)); - // 선택된 상품 정보 가져오기 - Optional.ofNullable(workflowContext.get(PRODUCT_SELECT_SOURCE_TASK)) + // 3. '상품 선택 태스크' 결과에서 선택된 상품 정보 가져오기 + Optional productSelectResult = + contextService.getPreviousTaskOutput(jobRun, PRODUCT_SELECT_SOURCE_TASK); + productSelectResult .map(node -> node.path("data").path("selected_product")) .ifPresent(productNode -> body.set("product_info", productNode)); diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java index b045c0fe..5d819dc6 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ImageOcrBodyBuilder.java @@ -1,6 +1,5 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; import java.util.Optional; import org.springframework.stereotype.Component; @@ -11,28 +10,41 @@ import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; @Component @RequiredArgsConstructor public class ImageOcrBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 private static final String TASK_NAME = "이미지 OCR 태스크"; - private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; + private static final String SOURCE_TASK_NAME = "키워드 검색 태스크"; @Override public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * 이전 Task 결과(키워드)를 DB에서 조회하여 OCR Task의 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 + * @return 생성된 JSON Body + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - // 키워드 정보 가져오기 (OCR 처리용) - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) - .map(node -> node.path("data").path("keyword")) + // 📌 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회합니다. + Optional sourceResult = + contextService.getPreviousTaskOutput(jobRun, SOURCE_TASK_NAME); + + sourceResult + .map(result -> result.path("data").path("keyword")) .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) .ifPresent(keywordNode -> body.set("keyword", keywordNode)); diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java index 597ab0b7..00c9551e 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/KeywordSearchBodyBuilder.java @@ -1,15 +1,15 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; +import java.util.Optional; import org.springframework.stereotype.Component; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; @Component @@ -24,10 +24,22 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * Task에 주입된 사용자 정의 설정(settings)을 기반으로 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 (settings 포함) + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이 빌더에서는 사용되지 않음) + * @return 생성된 JSON Body (예: {"tag": "google"}) + */ @Override - public ObjectNode build(Task task, Map workflowContext) { - // 이 Task는 항상 정적인 Body를 가집니다. - String tag = task.getSettings().get("tag").asText(); + public ObjectNode build(Task task, JobRun jobRun) { + // 📌 Task에 동적으로 주입된 settings에서 'tag' 값을 가져옵니다. + // settings가 없거나 'tag' 필드가 없으면 기본값으로 "naver"를 사용합니다. + String tag = + Optional.ofNullable(task.getSettings()) + .map(settings -> settings.path("tag").asText("naver")) + .orElse("naver"); + return objectMapper.createObjectNode().put("tag", tag); } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java index 4c90e31a..7cc9c005 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductCrawlBodyBuilder.java @@ -1,6 +1,5 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; import java.util.Optional; import org.springframework.stereotype.Component; @@ -12,13 +11,16 @@ import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; @Component @RequiredArgsConstructor public class ProductCrawlBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 private static final String TASK_NAME = "상품 정보 크롤링 태스크"; private static final String SIMILARITY_SOURCE_TASK = "상품 유사도 분석 태스크"; @@ -27,32 +29,39 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * 이전 Task 결과(유사도 분석 결과)를 DB에서 조회하여 크롤링할 상품 URL 목록으로 구성된 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 + * @return 생성된 JSON Body (예: {"product_urls": ["url1", "url2", ...]}) + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - - // ArrayNode 준비 (product_urls 배열로 변경) ArrayNode productUrls = objectMapper.createArrayNode(); - // 유사도 분석에서 선택된 상품들의 URL 가져오기 (복수로 변경) - Optional.ofNullable(workflowContext.get(SIMILARITY_SOURCE_TASK)) - .ifPresent( - node -> { - JsonNode topProducts = node.path("data").path("top_products"); - if (topProducts.isArray()) { - // top_products 배열에서 각 상품의 URL 추출 - topProducts.forEach( - product -> { - JsonNode urlNode = product.path("url"); - if (!urlNode.isMissingNode() && !urlNode.asText().isEmpty()) { - productUrls.add(urlNode.asText()); - } - }); - } - }); + // 📌 컨텍스트 서비스를 통해 DB에서 '상품 유사도 분석 태스크'의 결과를 조회합니다. + Optional sourceResult = + contextService.getPreviousTaskOutput(jobRun, SIMILARITY_SOURCE_TASK); - body.set("product_urls", productUrls); + sourceResult.ifPresent( + node -> { + JsonNode topProducts = node.path("data").path("top_products"); + if (topProducts.isArray()) { + topProducts.forEach( + product -> { + JsonNode urlNode = product.path("url"); + if (!urlNode.isMissingNode() + && urlNode.isTextual() + && !urlNode.asText().isEmpty()) { + productUrls.add(urlNode.asText()); + } + }); + } + }); + body.set("product_urls", productUrls); return body; } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java index 65e693f3..a1b55970 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductMatchBodyBuilder.java @@ -1,6 +1,5 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; import java.util.Optional; import org.springframework.stereotype.Component; @@ -11,14 +10,19 @@ import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; @Component @RequiredArgsConstructor public class ProductMatchBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 private static final String TASK_NAME = "상품 매칭 태스크"; + + // 📌 데이터 소스가 되는 이전 Task들의 이름 private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; private static final String SEARCH_SOURCE_TASK = "상품 검색 태스크"; @@ -27,17 +31,28 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 '상품 매칭'을 위한 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @return 생성된 JSON Body + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - // 키워드 정보 가져오기 - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) + // 📌 1. 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회 + Optional keywordResult = + contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + keywordResult .map(node -> node.path("data").path("keyword")) .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - // 상품 검색 결과 정보 가져오기 - Optional.ofNullable(workflowContext.get(SEARCH_SOURCE_TASK)) + // 📌 2. 컨텍스트 서비스를 통해 DB에서 '상품 검색 태스크'의 결과를 조회 + Optional searchResult = + contextService.getPreviousTaskOutput(jobRun, SEARCH_SOURCE_TASK); + searchResult .map(node -> node.path("data").path("search_results")) .ifPresent(resultsNode -> body.set("search_results", resultsNode)); diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java index d594e4e2..9056b1fd 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSearchBodyBuilder.java @@ -1,6 +1,6 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; +import java.util.Optional; import org.springframework.stereotype.Component; @@ -10,13 +10,17 @@ import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; @Component @RequiredArgsConstructor public class ProductSearchBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; + private static final String TASK_NAME = "상품 검색 태스크"; private static final String SOURCE_TASK_NAME = "키워드 검색 태스크"; @@ -26,10 +30,13 @@ public boolean supports(String taskName) { } @Override - public ObjectNode build(Task task, Map workflowContext) { - JsonNode sourceResult = workflowContext.get(SOURCE_TASK_NAME); + public ObjectNode build(Task task, JobRun jobRun) { + Optional sourceResult = + contextService.getPreviousTaskOutput(jobRun, SOURCE_TASK_NAME); + String keyword = - sourceResult != null ? sourceResult.path("data").path("keyword").asText("") : ""; + sourceResult.map(result -> result.path("data").path("keyword").asText("")).orElse(""); + return objectMapper.createObjectNode().put("keyword", keyword); } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java index a8a885ed..b25cd966 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java @@ -1,15 +1,13 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; - import org.springframework.stereotype.Component; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; @Component @@ -24,10 +22,18 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * '상품 선택' Task를 위한 정적인 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 (이 빌더에서는 사용되지 않음) + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이 빌더에서는 사용되지 않음) + * @return 생성된 JSON Body (예: {"selection_criteria": "image_count_priority"}) + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); + // 이 Task는 항상 고정된 선택 기준을 Body에 담아 보냅니다. body.put("selection_criteria", "image_count_priority"); return body; diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java index 45f19ad8..d4857602 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSimilarityBodyBuilder.java @@ -1,6 +1,5 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; import java.util.Optional; import org.springframework.stereotype.Component; @@ -11,14 +10,19 @@ import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; @Component @RequiredArgsConstructor public class ProductSimilarityBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 private static final String TASK_NAME = "상품 유사도 분석 태스크"; + + // 📌 데이터 소스가 되는 이전 Task들의 이름 private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; private static final String MATCH_SOURCE_TASK = "상품 매칭 태스크"; private static final String SEARCH_SOURCE_TASK = "상품 검색 태스크"; @@ -28,22 +32,35 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 '상품 유사도 분석'을 위한 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @return 생성된 JSON Body + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - // 키워드 정보 가져오기 - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) + // 1. 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회 + Optional keywordResult = + contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + keywordResult .map(node -> node.path("data").path("keyword")) .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - // 매칭된 상품 정보 가져오기 - Optional.ofNullable(workflowContext.get(MATCH_SOURCE_TASK)) + // 2. 컨텍스트 서비스를 통해 DB에서 '상품 매칭 태스크'의 결과를 조회 + Optional matchResult = + contextService.getPreviousTaskOutput(jobRun, MATCH_SOURCE_TASK); + matchResult .map(node -> node.path("data").path("matched_products")) .ifPresent(matchedNode -> body.set("matched_products", matchedNode)); - // 상품 검색 결과 정보 가져오기 - Optional.ofNullable(workflowContext.get(SEARCH_SOURCE_TASK)) + // 3. 컨텍스트 서비스를 통해 DB에서 '상품 검색 태스크'의 결과를 조회 + Optional searchResult = + contextService.getPreviousTaskOutput(jobRun, SEARCH_SOURCE_TASK); + searchResult .map(node -> node.path("data").path("search_results")) .ifPresent(resultsNode -> body.set("search_results", resultsNode)); diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java index 7548452a..ddd3c296 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/S3UploadBodyBuilder.java @@ -1,6 +1,5 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; import java.util.Optional; import org.springframework.stereotype.Component; @@ -11,14 +10,19 @@ import lombok.RequiredArgsConstructor; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.service.WorkflowContextService; @Component @RequiredArgsConstructor public class S3UploadBodyBuilder implements TaskBodyBuilder { private final ObjectMapper objectMapper; + private final WorkflowContextService contextService; // 📌 컨텍스트 서비스 주입 private static final String TASK_NAME = "S3 업로드 태스크"; + + // 📌 데이터 소스가 되는 이전 Task들의 이름 private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; private static final String CRAWL_SOURCE_TASK = "상품 정보 크롤링 태스크"; @@ -27,23 +31,34 @@ public boolean supports(String taskName) { return TASK_NAME.equals(taskName); } + /** + * 여러 이전 Task들의 결과를 DB에서 조회하고 조합하여 'S3 업로드'를 위한 Request Body를 생성합니다. + * + * @param task 실행할 Task의 도메인 모델 + * @param jobRun 현재 실행 중인 Job의 기록 객체 (이전 Task 결과를 조회하는 키로 사용) + * @return 생성된 JSON Body + */ @Override - public ObjectNode build(Task task, Map workflowContext) { + public ObjectNode build(Task task, JobRun jobRun) { ObjectNode body = objectMapper.createObjectNode(); - // 키워드 정보 가져오기 (폴더명 생성용 - 스키마 주석 참조) - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) + // 1. 컨텍스트 서비스를 통해 DB에서 '키워드 검색 태스크'의 결과를 조회 + Optional keywordResult = + contextService.getPreviousTaskOutput(jobRun, KEYWORD_SOURCE_TASK); + keywordResult .map(node -> node.path("data").path("keyword")) .filter(node -> !node.isMissingNode() && !node.asText().trim().isEmpty()) .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - // 크롤링된 상품 데이터 가져오기 - Optional.ofNullable(workflowContext.get(CRAWL_SOURCE_TASK)) + // 2. 컨텍스트 서비스를 통해 DB에서 '상품 정보 크롤링 태스크'의 결과를 조회 + Optional crawlResult = + contextService.getPreviousTaskOutput(jobRun, CRAWL_SOURCE_TASK); + crawlResult .map(node -> node.path("data").path("crawled_products")) .filter(node -> !node.isMissingNode()) .ifPresent(crawledProductsNode -> body.set("crawled_products", crawledProductsNode)); - // 기본 폴더 설정 (스키마의 기본값과 일치) + // 3. 정적 데이터 설정 body.put("base_folder", "product"); return body; diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java index 04dacef4..ffd76457 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/TaskBodyBuilder.java @@ -1,10 +1,8 @@ package site.icebang.domain.workflow.runner.fastapi.body; -import java.util.Map; - -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; public interface TaskBodyBuilder { @@ -17,12 +15,6 @@ public interface TaskBodyBuilder { */ boolean supports(String taskName); - /** - * 실제 API 요청에 사용될 Body를 생성합니다. - * - * @param task DB에 저장된 Task의 원본 정의 - * @param workflowContext 이전 Task들의 결과가 담긴 컨텍스트 - * @return 생성된 JSON Body - */ - ObjectNode build(Task task, Map workflowContext); + // 📌 workflowContext(Map) 대신 JobRun 객체를 받도록 변경 + ObjectNode build(Task task, JobRun jobRun); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java new file mode 100644 index 00000000..bbdff181 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java @@ -0,0 +1,53 @@ +package site.icebang.domain.workflow.service; + +import java.util.Optional; + +import org.springframework.stereotype.Service; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import site.icebang.domain.workflow.mapper.TaskIoDataMapper; +import site.icebang.domain.workflow.mapper.TaskRunMapper; +import site.icebang.domain.workflow.model.JobRun; + +@Slf4j +@Service +@RequiredArgsConstructor +public class WorkflowContextService { + + private final TaskRunMapper taskRunMapper; + private final TaskIoDataMapper taskIoDataMapper; + private final ObjectMapper objectMapper; + + /** + * 특정 Job 실행 내에서, 이전에 성공한 Task의 이름으로 결과(Output)를 조회합니다. + * + * @param jobRun 현재 실행중인 JobRun + * @param sourceTaskName 결과를 조회할 이전 Task의 이름 + * @return 조회된 결과 데이터 (JsonNode) + */ + public Optional getPreviousTaskOutput(JobRun jobRun, String sourceTaskName) { + try { + return taskRunMapper + .findLatestSuccessRunInJob(jobRun.getId(), sourceTaskName) + .flatMap(taskRun -> taskIoDataMapper.findOutputByTaskRunId(taskRun.getId())) + .map( + ioData -> { + try { + return objectMapper.readTree(ioData.getDataValue()); + } catch (Exception e) { + log.error("TaskIoData JSON 파싱 실패: TaskIoDataId={}", ioData.getId(), e); + return null; + } + }); + } catch (Exception e) { + log.error( + "이전 Task 결과 조회 중 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), sourceTaskName, e); + return Optional.empty(); + } + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index d536f4de..2e1ca782 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -2,17 +2,14 @@ import java.math.BigInteger; import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -24,10 +21,16 @@ import site.icebang.domain.workflow.dto.TaskDto; import site.icebang.domain.workflow.dto.WorkflowDetailCardDto; import site.icebang.domain.workflow.manager.ExecutionMdcManager; -import site.icebang.domain.workflow.mapper.*; +import site.icebang.domain.workflow.mapper.JobMapper; +import site.icebang.domain.workflow.mapper.JobRunMapper; +import site.icebang.domain.workflow.mapper.TaskIoDataMapper; +import site.icebang.domain.workflow.mapper.TaskRunMapper; +import site.icebang.domain.workflow.mapper.WorkflowMapper; +import site.icebang.domain.workflow.mapper.WorkflowRunMapper; import site.icebang.domain.workflow.model.Job; import site.icebang.domain.workflow.model.JobRun; import site.icebang.domain.workflow.model.Task; +import site.icebang.domain.workflow.model.TaskIoData; import site.icebang.domain.workflow.model.TaskRun; import site.icebang.domain.workflow.model.WorkflowRun; import site.icebang.domain.workflow.runner.TaskRunner; @@ -41,12 +44,14 @@ public class WorkflowExecutionService { private final WorkflowRunMapper workflowRunMapper; private final JobRunMapper jobRunMapper; private final TaskRunMapper taskRunMapper; + private final TaskIoDataMapper taskIoDataMapper; private final ObjectMapper objectMapper; private final List bodyBuilders; private final ExecutionMdcManager mdcManager; private final TaskExecutionService taskExecutionService; private final WorkflowMapper workflowMapper; + @Transactional @Async("traceExecutor") public void executeWorkflow(Long workflowId, RequestContext context) { WorkflowRun workflowRun = WorkflowRun.start(workflowId, context.getTraceId()); @@ -57,52 +62,45 @@ public void executeWorkflow(Long workflowId, RequestContext context) { try { workflowLogger.info("========== 워크플로우 실행 시작: WorkflowId={} ==========", workflowId); - Map workflowContext = new HashMap<>(); - WorkflowDetailCardDto settings = + // 📌 1. selectWorkflowDetailById를 호출하여 워크플로우의 모든 상세 정보를 가져옵니다. + WorkflowDetailCardDto settingsDto = workflowMapper.selectWorkflowDetailById(BigInteger.valueOf(workflowId)); - workflowLogger.info("Workflow 정보 로드 성공"); + if (settingsDto == null) { + throw new IllegalStateException("실행할 워크플로우를 찾을 수 없습니다: ID " + workflowId); + } - workflowLogger.info("Default config 로드 시도"); - JsonNode setting = objectMapper.readTree(settings.getDefaultConfig()); - workflowLogger.info("Default config 로드 성공"); + // 📌 2. 가져온 DTO 객체에서 getDefaultConfig() 메소드를 호출하여 값을 얻습니다. + String defaultConfigJson = settingsDto.getDefaultConfig(); + JsonNode setting = + (defaultConfigJson != null && !defaultConfigJson.isEmpty()) + ? objectMapper.readTree(defaultConfigJson) + : objectMapper.createObjectNode(); - workflowLogger.info("Job 목록 로드 시도"); List jobDtos = jobMapper.findJobsByWorkflowId(workflowId); - workflowLogger.info("Job 목록 로드 성공"); - - workflowLogger.info("execution_order 기준으로 정렬 시도"); jobDtos.sort( Comparator.comparing( JobDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) .thenComparing(JobDto::getId)); - workflowLogger.info("execution_order 기준으로 성공"); - - workflowLogger.info("총 {}개의 Job을 순차적으로 실행합니다.", jobDtos.size()); boolean hasAnyJobFailed = false; - // 📌 정렬된 JobDto 리스트를 순회합니다. for (JobDto jobDto : jobDtos) { - // 📌 DTO로부터 Job 모델을 생성합니다. Job job = new Job(jobDto); - mdcManager.setJobContext(job.getId()); JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); jobRunMapper.insert(jobRun); workflowLogger.info( "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); - boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext, setting); + boolean jobSucceeded = executeTasksForJob(jobRun, setting); jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); jobRunMapper.update(jobRun); if (!jobSucceeded) { - workflowLogger.error("Job 실행 실패: JobRunId={}", jobRun.getId()); hasAnyJobFailed = true; - } else { - workflowLogger.info("---------- Job 실행 성공: JobRunId={} ----------", jobRun.getId()); } - mdcManager.setWorkflowContext(workflowId); + mdcManager.setWorkflowContext( + workflowId, context.getTraceId(), context.getClientIp(), context.getUserAgent()); } workflowRun.finish(hasAnyJobFailed ? "FAILED" : "SUCCESS"); workflowRunMapper.update(workflowRun); @@ -110,52 +108,50 @@ public void executeWorkflow(Long workflowId, RequestContext context) { "========== 워크플로우 실행 {} : WorkflowRunId={} ==========", hasAnyJobFailed ? "실패" : "성공", workflowRun.getId()); - } catch (JsonMappingException e) { - throw new RuntimeException(e); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); + } catch (Exception e) { + workflowLogger.error("워크플로우 실행 중 심각한 오류 발생: WorkflowId={}", workflowId, e); + if (workflowRun != null) { + workflowRun.finish("FAILED"); + workflowRunMapper.update(workflowRun); + } } finally { mdcManager.clearExecutionContext(); } } - private boolean executeTasksForJob( - JobRun jobRun, Map workflowContext, JsonNode setting) { + private boolean executeTasksForJob(JobRun jobRun, JsonNode setting) { List taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId()); - for (TaskDto taskDto : taskDtos) { - String taskId = taskDto.getId().toString(); - JsonNode settingForTask = setting.get(taskId); - if (settingForTask != null) { - taskDto.setSettings(settingForTask); - } - } + taskDtos.forEach( + dto -> { + JsonNode s = setting.get(String.valueOf(dto.getId())); + if (s != null) dto.setSettings(s); + }); taskDtos.sort( Comparator.comparing( TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder())) .thenComparing(TaskDto::getId)); - workflowLogger.info( - "Job (JobRunId={}) 내 총 {}개의 Task를 순차 실행합니다.", jobRun.getId(), taskDtos.size()); boolean hasAnyTaskFailed = false; Long s3UploadTaskRunId = null; // S3 업로드 태스크의 task_run_id 저장용 for (TaskDto taskDto : taskDtos) { + TaskRun taskRun = null; try { - TaskRun taskRun = - TaskRun.start(jobRun.getId(), taskDto.getId(), taskDto.getExecutionOrder()); + taskRun = TaskRun.start(jobRun.getId(), taskDto.getId(), taskDto.getExecutionOrder()); taskRunMapper.insert(taskRun); mdcManager.setTaskContext(taskRun.getId()); - workflowLogger.info("Task 실행 시작: TaskId={}, Name={}", taskDto.getId(), taskDto.getName()); Task task = new Task(taskDto); + workflowLogger.info("Task 실행 시작: TaskId={}, Name={}", task.getId(), task.getName()); ObjectNode requestBody = bodyBuilders.stream() .filter(builder -> builder.supports(task.getName())) .findFirst() - .map(builder -> builder.build(task, workflowContext)) + .map(builder -> builder.build(task, jobRun)) .orElse(objectMapper.createObjectNode()); + // TODO: 아래 로직 다른 곳으로 분리시키기 if ("S3 업로드 태스크".equals(task.getName())) { requestBody.put("task_run_id", taskRun.getId()); s3UploadTaskRunId = taskRun.getId(); // S3 업로드의 task_run_id 저장 @@ -169,28 +165,48 @@ private boolean executeTasksForJob( } } + saveIoData(taskRun.getId(), "INPUT", "request_body", requestBody); TaskRunner.TaskExecutionResult result = taskExecutionService.executeWithRetry(task, taskRun, requestBody); taskRun.finish(result.status(), result.message()); - taskRunMapper.update(taskRun); if (result.isFailure()) { - workflowLogger.error( - "Task 최종 실패: TaskRunId={}, Message={}", taskRun.getId(), result.message()); hasAnyTaskFailed = true; + saveIoData( + taskRun.getId(), + "OUTPUT", + "error_message", + objectMapper.valueToTree(result.message())); } else { JsonNode resultJson = objectMapper.readTree(result.message()); - workflowContext.put(task.getName(), resultJson); - workflowLogger.info("Task 실행 성공: TaskRunId={}", taskRun.getId()); + saveIoData(taskRun.getId(), "OUTPUT", "response_body", resultJson); } } catch (Exception e) { workflowLogger.error( "Task 처리 중 심각한 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), taskDto.getName(), e); hasAnyTaskFailed = true; + if (taskRun != null) { + taskRun.finish("FAILED", e.getMessage()); + saveIoData( + taskRun.getId(), "OUTPUT", "error_message", objectMapper.valueToTree(e.getMessage())); + } } finally { + if (taskRun != null) taskRunMapper.update(taskRun); mdcManager.setJobContext(jobRun.getId()); } } return !hasAnyTaskFailed; } + + private void saveIoData(Long taskRunId, String ioType, String name, JsonNode data) { + try { + String dataValue = data.toString(); + TaskIoData ioData = + new TaskIoData( + taskRunId, ioType, name, "JSON", dataValue, (long) dataValue.getBytes().length); + taskIoDataMapper.insert(ioData); + } catch (Exception e) { + workflowLogger.error("Task IO 데이터 저장 실패: TaskRunId={}, Type={}", taskRunId, ioType, e); + } + } } diff --git a/apps/user-service/src/main/resources/mybatis/mapper/TaskIoDataMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/TaskIoDataMapper.xml new file mode 100644 index 00000000..fa0dd73c --- /dev/null +++ b/apps/user-service/src/main/resources/mybatis/mapper/TaskIoDataMapper.xml @@ -0,0 +1,16 @@ + + + + + + INSERT INTO task_io_data (task_run_id, io_type, name, data_type, data_value, data_size, created_at) + VALUES (#{taskRunId}, #{ioType}, #{name}, #{dataType}, #{dataValue}, #{dataSize}, #{createdAt}) + + + + \ No newline at end of file diff --git a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml index 61ec3cf0..322a9f04 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml @@ -26,4 +26,15 @@ finished_at = #{finishedAt} WHERE id = #{id} + + \ No newline at end of file