diff --git a/apps/pre-processing-service/app/model/schemas.py b/apps/pre-processing-service/app/model/schemas.py index 4001b705..4a49ca0e 100644 --- a/apps/pre-processing-service/app/model/schemas.py +++ b/apps/pre-processing-service/app/model/schemas.py @@ -186,6 +186,13 @@ class S3ImageInfo(BaseModel): ..., title="원본 URL", description="크롤링된 원본 이미지 URL" ) s3_url: str = Field(..., title="S3 URL", description="S3에서 접근 가능한 URL") + # 새로 추가: 파일 크기 정보 (이미지 선별용) + file_size_kb: Optional[float] = Field( + None, title="파일 크기(KB)", description="이미지 파일 크기" + ) + file_name: Optional[str] = Field( + None, title="파일명", description="S3에 저장된 파일명" + ) # 상품별 S3 업로드 결과 @@ -274,14 +281,18 @@ class RequestBlogCreate(RequestBase): keyword: Optional[str] = Field( None, title="키워드", description="콘텐츠 생성용 키워드" ) + translation_language: Optional[str] = Field( + None, + title="번역한 언어", + description="이미지에서 중국어를 한국어로 번역한 언어", + ) product_info: Optional[Dict] = Field( None, title="상품 정보", description="블로그 콘텐츠에 포함할 상품 정보" ) - content_type: Optional[str] = Field( - None, title="콘텐츠 타입", description="생성할 콘텐츠 유형" - ) - target_length: Optional[int] = Field( - None, title="목표 글자 수", description="생성할 콘텐츠의 목표 길이" + uploaded_images: Optional[List[Dict]] = Field( + None, + title="업로드된 이미지", + description="S3에 업로드된 이미지 목록 (크기 정보 포함)", ) diff --git a/apps/pre-processing-service/app/service/blog/blog_create_service.py b/apps/pre-processing-service/app/service/blog/blog_create_service.py index a66fa609..fdc8b6a0 100644 --- a/apps/pre-processing-service/app/service/blog/blog_create_service.py +++ b/apps/pre-processing-service/app/service/blog/blog_create_service.py @@ -1,5 +1,6 @@ -import json import logging +import os +import boto3 from loguru import logger from datetime import datetime from typing import Dict, List, Optional, Any @@ -19,19 +20,98 @@ def __init__(self): if not self.openai_api_key: raise ValueError("OPENAI_API_KEY가 .env.dev 파일에 설정되지 않았습니다.") - # 인스턴스 레벨에서 클라이언트 생성 self.client = OpenAI(api_key=self.openai_api_key) + + # S3 클라이언트 추가 + self.s3_client = boto3.client( + "s3", + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + region_name=os.getenv("AWS_REGION", "ap-northeast-2"), + ) + self.bucket_name = os.getenv("S3_BUCKET_NAME", "icebang4-dev-bucket") + logging.basicConfig(level=logging.INFO) - def generate_blog_content(self, request: RequestBlogCreate) -> Dict[str, Any]: - """ - 요청 데이터를 기반으로 블로그 콘텐츠 생성 + def _fetch_images_from_s3(self, keyword: str, product_index: int = 1) -> List[Dict]: + """S3에서 해당 상품의 이미지 정보를 조회""" + try: + # 폴더 패턴: 20250922_키워드_1/ 형식으로 검색 + from datetime import datetime + + date_str = datetime.now().strftime("%Y%m%d") + + # 키워드 정리 (S3UploadUtil과 동일한 방식) + safe_keyword = ( + keyword.replace("/", "-") + .replace("\\", "-") + .replace(" ", "_") + .replace(":", "-") + .replace("*", "-") + .replace("?", "-") + .replace('"', "-") + .replace("<", "-") + .replace(">", "-") + .replace("|", "-")[:20] + ) - Args: - request: RequestBlogCreate 객체 + folder_prefix = f"product/{date_str}_{safe_keyword}_{product_index}/" + + logger.debug(f"S3에서 이미지 조회: {folder_prefix}") + + # S3에서 해당 폴더의 파일 목록 조회 + response = self.s3_client.list_objects_v2( + Bucket=self.bucket_name, Prefix=folder_prefix + ) + + if "Contents" not in response: + logger.warning(f"S3에서 이미지를 찾을 수 없음: {folder_prefix}") + return [] + + images = [] + base_url = f"https://{self.bucket_name}.s3.ap-northeast-2.amazonaws.com" + + # 이미지 파일만 필터링 (image_*.jpg 패턴) + for obj in response["Contents"]: + key = obj["Key"] + file_name = key.split("/")[-1] # 마지막 부분이 파일명 + + # 이미지 파일인지 확인 + if file_name.startswith("image_") and file_name.endswith( + (".jpg", ".jpeg", ".png") + ): + # 파일 크기 정보 (bytes -> KB) + file_size_kb = obj["Size"] / 1024 + + # 인덱스 추출 (image_001.jpg -> 1) + try: + index = int(file_name.split("_")[1].split(".")[0]) + except: + index = len(images) + 1 + + images.append( + { + "index": index, + "s3_url": f"{base_url}/{key}", + "file_name": file_name, + "file_size_kb": round(file_size_kb, 2), + "original_url": "", # 원본 URL은 S3에서 조회 불가 + } + ) + + # 인덱스 순으로 정렬 + images.sort(key=lambda x: x["index"]) + + logger.success(f"S3에서 이미지 {len(images)}개 조회 완료") + return images + + except Exception as e: + logger.error(f"S3 이미지 조회 실패: {e}") + return [] - Returns: - Dict: {"title": str, "content": str, "tags": List[str]} 형태의 결과 + def generate_blog_content(self, request: RequestBlogCreate) -> Dict[str, Any]: + """ + 요청 데이터를 기반으로 블로그 콘텐츠 생성 (이미지 자동 배치 포함) """ try: logger.debug("[STEP1] 콘텐츠 컨텍스트 준비 시작") @@ -50,6 +130,21 @@ def generate_blog_content(self, request: RequestBlogCreate) -> Dict[str, Any]: result = self._parse_generated_content(generated_content, request) logger.debug("[STEP4 완료]") + # STEP5: S3에서 이미지 정보 조회 (새로 추가) + uploaded_images = request.uploaded_images + if not uploaded_images and request.keyword: + logger.debug("[STEP5-1] S3에서 이미지 정보 조회 시작") + uploaded_images = self._fetch_images_from_s3(request.keyword) + logger.debug(f"[STEP5-1 완료] 조회된 이미지: {len(uploaded_images)}개") + + # STEP6: 이미지 자동 배치 + if uploaded_images and len(uploaded_images) > 0: + logger.debug("[STEP6] 이미지 자동 배치 시작") + result["content"] = self._insert_images_to_content( + result["content"], uploaded_images + ) + logger.debug("[STEP6 완료] 이미지 배치 완료") + return result except Exception as e: @@ -60,29 +155,31 @@ def _prepare_content_context(self, request: RequestBlogCreate) -> str: """요청 데이터를 콘텐츠 생성용 컨텍스트로 변환""" context_parts = [] - # 키워드 정보 추가 + # 키워드 정보 if request.keyword: context_parts.append(f"주요 키워드: {request.keyword}") - # 상품 정보 추가 + # 상품 정보 if request.product_info: context_parts.append("\n상품 정보:") - # 상품 기본 정보 if request.product_info.get("title"): context_parts.append(f"- 상품명: {request.product_info['title']}") if request.product_info.get("price"): - context_parts.append(f"- 가격: {request.product_info['price']:,}원") + try: + context_parts.append( + f"- 가격: {int(request.product_info['price']):,}원" + ) + except Exception: + context_parts.append(f"- 가격: {request.product_info.get('price')}") if request.product_info.get("rating"): context_parts.append(f"- 평점: {request.product_info['rating']}/5.0") - # 상품 상세 정보 if request.product_info.get("description"): context_parts.append(f"- 설명: {request.product_info['description']}") - # 상품 사양 (material_info 등) if request.product_info.get("material_info"): context_parts.append("- 주요 사양:") specs = request.product_info["material_info"] @@ -90,18 +187,16 @@ def _prepare_content_context(self, request: RequestBlogCreate) -> str: for key, value in specs.items(): context_parts.append(f" * {key}: {value}") - # 상품 옵션 if request.product_info.get("options"): options = request.product_info["options"] context_parts.append(f"- 구매 옵션 ({len(options)}개):") - for i, option in enumerate(options[:5], 1): # 최대 5개만 + for i, option in enumerate(options[:5], 1): if isinstance(option, dict): option_name = option.get("name", f"옵션 {i}") context_parts.append(f" {i}. {option_name}") else: context_parts.append(f" {i}. {option}") - # 구매 링크 if request.product_info.get("url") or request.product_info.get( "product_url" ): @@ -110,8 +205,123 @@ def _prepare_content_context(self, request: RequestBlogCreate) -> str: ) context_parts.append(f"- 구매 링크: {url}") + # 번역 텍스트 (translation_language) 추가 + if request.translation_language: + context_parts.append("\n이미지(OCR)에서 추출·번역된 텍스트:") + context_parts.append(request.translation_language.strip()) + return "\n".join(context_parts) if context_parts else "키워드 기반 콘텐츠 생성" + def _select_best_images( + self, uploaded_images: List[Dict], target_count: int = 4 + ) -> List[Dict]: + """크기 기반으로 최적의 이미지 4개 선별""" + if not uploaded_images: + return [] + + logger.debug( + f"이미지 선별 시작: 전체 {len(uploaded_images)}개 -> 목표 {target_count}개" + ) + + # 1단계: 너무 작은 이미지 제외 (20KB 이하는 아이콘, 로고 가능성) + filtered = [img for img in uploaded_images if img.get("file_size_kb", 0) > 20] + logger.debug(f"크기 필터링 후: {len(filtered)}개 이미지 남음") + + if len(filtered) == 0: + # 모든 이미지가 너무 작다면 원본에서 선택 + filtered = uploaded_images + + # 2단계: 크기순 정렬 (큰 이미지 = 메인 상품 사진일 가능성) + sorted_images = sorted( + filtered, key=lambda x: x.get("file_size_kb", 0), reverse=True + ) + + # 3단계: 상위 이미지 선택하되, 너무 많으면 균등 분산 + if len(sorted_images) <= target_count: + selected = sorted_images + else: + # 상위 2개 (메인 이미지) + 나머지에서 균등분산으로 2개 + selected = sorted_images[:2] # 큰 이미지 2개 + + remaining = sorted_images[2:] + if len(remaining) >= 2: + step = len(remaining) // 2 + selected.extend([remaining[i * step] for i in range(2)]) + + result = selected[:target_count] + + logger.debug(f"최종 선택된 이미지: {len(result)}개") + for i, img in enumerate(result): + logger.debug( + f" {i + 1}. {img.get('file_name', 'unknown')} ({img.get('file_size_kb', 0):.1f}KB)" + ) + + return result + + def _insert_images_to_content( + self, content: str, uploaded_images: List[Dict] + ) -> str: + """AI가 적절한 위치에 이미지 4개를 자동 배치""" + + # 1단계: 최적의 이미지 4개 선별 + selected_images = self._select_best_images(uploaded_images, target_count=4) + + if not selected_images: + logger.warning("선별된 이미지가 없어서 이미지 배치를 건너뜀") + return content + + logger.debug(f"이미지 배치 시작: {len(selected_images)}개 이미지") + + # 2단계: AI에게 이미지 배치 위치 물어보기 + image_placement_prompt = f""" +다음 HTML 콘텐츠에서 이미지 {len(selected_images)}개를 적절한 위치에 배치해주세요. + +콘텐츠: +{content} + +이미지 개수: {len(selected_images)}개 + +요구사항: +- 각 섹션(h2, h3 태그)마다 골고루 분산 배치 +- 너무 몰려있지 않게 적절한 간격 유지 +- 글의 흐름을 방해하지 않는 자연스러운 위치 +- [IMAGE_1], [IMAGE_2], [IMAGE_3], [IMAGE_4] 형식의 플레이스홀더로 표시 + +⚠️ 주의사항: +- 기존 HTML 구조와 내용은 그대로 유지 +- 오직 이미지 플레이스홀더만 적절한 위치에 삽입 +- 코드 블록(```)은 사용하지 말고 수정된 HTML만 반환 + +수정된 HTML을 반환해주세요. +""" + + try: + # 3단계: AI로 배치 위치 결정 + modified_content = self._generate_with_openai(image_placement_prompt) + + # 4단계: 플레이스홀더를 실제 img 태그로 교체 + for i, img in enumerate(selected_images): + img_tag = f""" +
,