diff --git a/apps/pre-processing-service/app/api/__init__.py b/apps/pre-processing-service/app/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/pre-processing-service/app/api/endpoints/keywords.py b/apps/pre-processing-service/app/api/endpoints/keywords.py index 9b3fd61d..ed8357c7 100644 --- a/apps/pre-processing-service/app/api/endpoints/keywords.py +++ b/apps/pre-processing-service/app/api/endpoints/keywords.py @@ -39,7 +39,8 @@ async def search(request: RequestNaverSearch): schedule_id=schedule_id, category=category, keyword=keywords, - total_keyword = {1: "바밥밥", 2: "밥밥밥", 3: "바밤바"} + total_keyword = {1: "바밥밥", 2: "밥밥밥", 3: "바밤바"}, + status="SUCCESS" ) @router.post("/search/test",response_model=ResponsetSadaguValidate) diff --git a/apps/pre-processing-service/app/core/__init__.py b/apps/pre-processing-service/app/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/pre-processing-service/app/db/__init__.py b/apps/pre-processing-service/app/db/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/pre-processing-service/app/errors/CrawlingException.py b/apps/pre-processing-service/app/errors/CrawlingException.py new file mode 100644 index 00000000..d641abff --- /dev/null +++ b/apps/pre-processing-service/app/errors/CrawlingException.py @@ -0,0 +1,63 @@ +from CustomException import CustomException +from typing import List + +class PageLoadTimeoutException(CustomException): + """ + 페이지 로드 타임아웃 예외 + @:param url: 로드하려는 페이지의 URL + """ + def __init__(self, url : str): + super().__init__( + status_code=408, + detail=f"페이지 로드가 시간 초과되었습니다. URL: {url}", + code="PAGE_LOAD_TIMEOUT" + ) + +class WebDriverConnectionException(CustomException): + """ + 웹 드라이버 연결 실패 예외 + """ + def __init__(self): + super().__init__( + status_code=500, + detail="웹 드라이버 연결에 실패했습니다.", + code="WEBDRIVER_ERROR" + ) + + +class ElementNotFoundException(CustomException): + """ + 특정 HTML 요소를 찾을 수 없는 예외 + @:param selector: 찾으려는 요소의 CSS 선택자 + """ + def __init__(self, selector: str): + super().__init__( + status_code=404, + detail=f"요소를 찾을 수 없습니다. 선택자: {selector}", + code="ELEMENT_NOT_FOUND" + ) + +class HtmlParsingException(CustomException): + """ + HTML 파싱 실패 예외 + @:param reason: 파싱 실패 이유 + """ + def __init__(self, reason: str): + super().__init__( + status_code=422, + detail=f"HTML 파싱에 실패했습니다. 이유: {reason}", + code="HTML_PARSING_ERROR" + ) + +class DataExtractionException(CustomException): + """ + 데이터 추출 실패 예외 + @:param field: 추출하려는 데이터 필드 목록 + """ + def __init__(self, field: List[str]): + super().__init__( + status_code=422, + detail=f"데이터 추출에 실패했습니다. 필드: {', '.join(field)}", + code="DATA_EXTRACTION_ERROR" + ) + diff --git a/apps/pre-processing-service/app/errors/CustomException.py b/apps/pre-processing-service/app/errors/CustomException.py index c228748e..4c3f84a3 100644 --- a/apps/pre-processing-service/app/errors/CustomException.py +++ b/apps/pre-processing-service/app/errors/CustomException.py @@ -10,6 +10,10 @@ def __init__(self, status_code: int, detail: str, code: str): # 구체적인 커스텀 예외 정의 class ItemNotFoundException(CustomException): + """ + 아이템을 찾을수 없는 예외 + @:param item_id: 찾을수 없는 아이템의 ID + """ def __init__(self, item_id: int): super().__init__( status_code=404, @@ -18,9 +22,23 @@ def __init__(self, item_id: int): ) class InvalidItemDataException(CustomException): + """ + 데이터 유효성 검사 실패 예외 + """ def __init__(self): super().__init__( status_code=422, detail="데이터가 유효하지않습니다..", code="INVALID_ITEM_DATA" + ) + +class DatabaseConnectionException(CustomException): + """ + 데이터베이스 연결 실패 예외 + """ + def __init__(self): + super().__init__( + status_code=500, + detail="데이터베이스 연결에 실패했습니다.", + code="DATABASE_CONNECTION_ERROR" ) \ No newline at end of file diff --git a/apps/pre-processing-service/app/main.py b/apps/pre-processing-service/app/main.py index 2ca44875..d13c523d 100644 --- a/apps/pre-processing-service/app/main.py +++ b/apps/pre-processing-service/app/main.py @@ -3,6 +3,7 @@ from fastapi import FastAPI from starlette.exceptions import HTTPException as StarletteHTTPException from fastapi.exceptions import RequestValidationError +from app.middleware.ServiceLoggerMiddleware import ServiceLoggerMiddleware # --- 애플리케이션 구성 요소 임포트 --- from app.api.router import api_router @@ -25,6 +26,7 @@ app.add_exception_handler(Exception, unhandled_exception_handler) # --- 미들웨어 등록 --- +app.add_middleware(ServiceLoggerMiddleware) app.add_middleware(LoggingMiddleware) # --- 라우터 등록 --- diff --git a/apps/pre-processing-service/app/middleware/BackServiceLoggerDependency.py b/apps/pre-processing-service/app/middleware/BackServiceLoggerDependency.py new file mode 100644 index 00000000..bbaa2cfd --- /dev/null +++ b/apps/pre-processing-service/app/middleware/BackServiceLoggerDependency.py @@ -0,0 +1,124 @@ +# import time +# from typing import Dict, Any, List, Optional +# from fastapi import Request +# from loguru import logger +# from contextvars import ContextVar +# +# trace_id_context: ContextVar[str] = ContextVar('trace_id', default="NO_TRACE_ID") +# +# +# class ServiceLoggingDependency: +# """ +# 서비스 로깅을 위한 의존성 클래스 +# :param service_type: 서비스 유형 (예: "CHUNKING", "PARSING", "EMBEDDING") +# :param track_params: 추적할 매개변수 이름 목록 +# :param response_trackers: 응답에서 추적할 필드 이름 목록 (딕셔너리) +# """ +# +# def __init__(self, service_type: str, +# track_params: List[str] = None, +# response_trackers: List[str] = None): +# self.service_type = service_type +# self.track_params = track_params or [] +# self.response_trackers = response_trackers or [] +# +# async def __call__(self, request: Request): +# """ +# 의존성 주입 시 호출되는 메서드 +# :param request: FastAPI Request 객체 +# :return: 서비스 유형과 추출된 매개변수 딕셔너리 +# """ +# trace_id = trace_id_context.get("NO_TRACE_ID") +# start_time = time.time() +# +# # 파라미터 추출 +# params = await self._extract_params(request) +# param_str = "" +# if params: +# param_strs = [f"{k}={v}" for k, v in params.items()] +# param_str = " " + " ".join(param_strs) +# +# logger.info(f"[{self.service_type}_START] trace_id={trace_id}{param_str}") +# +# # 응답 시 사용할 정보를 request.state에 저장 +# request.state.service_type = self.service_type +# request.state.start_time = start_time +# request.state.param_str = param_str +# request.state.response_trackers = self.response_trackers +# +# return {"service_type": self.service_type, "params": params} +# +# async def _extract_params(self, request: Request) -> Dict[str, Any]: +# """ +# 요청에서 추적 파라미터 추출 +# :param request: FastAPI Request 객체 +# :return: 추출된 매개변수 딕셔너리 +# """ +# params = {} +# +# try: +# # Query Parameters 추출 +# for key, value in request.query_params.items(): +# if key in self.track_params: +# params[key] = value +# +# # JSON Body 추출 +# try: +# json_body = await request.json() +# if json_body: +# for key, value in json_body.items(): +# if key in self.track_params: +# if isinstance(value, str) and len(value) > 50: +# params[f"{key}_length"] = len(value) +# elif isinstance(value, list): +# params[f"{key}_count"] = len(value) +# else: +# params[key] = value +# except: +# pass +# except: +# pass +# +# return params +# +# # 서비스 응답 시 성공 로그 함수 +# async def log_service_response_with_data(request: Request, response_data: Optional[Dict] = None): +# """ +# 서비스 응답 시 성공 로그 기록 +# :param request: FastAPI Request 객체 +# :param response_data: 응답 데이터 +# """ +# if hasattr(request.state, 'service_type'): +# trace_id = trace_id_context.get("NO_TRACE_ID") +# duration = time.time() - request.state.start_time +# +# # 기본 로그 문자열 +# log_parts = [f"[{request.state.service_type}_SUCCESS]", +# f"trace_id={trace_id}", +# f"execution_time={duration:.4f}s{request.state.param_str}"] +# +# # 응답 데이터에서 추적할 필드 추출 +# if response_data and hasattr(request.state, 'response_trackers'): +# response_params = [] +# for tracker in request.state.response_trackers: +# if tracker in response_data: +# value = response_data[tracker] +# if isinstance(value, dict): +# response_params.append(f"{tracker}_keys={list(value.keys())}") +# response_params.append(f"{tracker}_count={len(value)}") +# elif isinstance(value, list): +# response_params.append(f"{tracker}_count={len(value)}") +# else: +# response_params.append(f"{tracker}={value}") +# +# if response_params: +# log_parts.append(" ".join(response_params)) +# +# logger.info(" ".join(log_parts)) +# return None +# +# naver_search_dependency = ServiceLoggingDependency( +# "NAVER_CRAWLING", +# track_params=["job_id", "schedule_id", "tag", "category", "startDate", "endDate"], +# response_trackers=["keyword", "total_keyword"] +# ) \ No newline at end of file diff --git a/apps/pre-processing-service/app/middleware/RepositoryLoggerMiddleware.py b/apps/pre-processing-service/app/middleware/RepositoryLoggerMiddleware.py deleted file mode 100644 index 703834a6..00000000 --- a/apps/pre-processing-service/app/middleware/RepositoryLoggerMiddleware.py +++ /dev/null @@ -1,117 +0,0 @@ -import time -from typing import Dict, Any, List -from fastapi import Request -from loguru import logger -from contextvars import ContextVar - -trace_id_context: ContextVar[str] = ContextVar('trace_id', default="NO_TRACE_ID") - -class RepositoryLoggingDependency: - """ - 레포지토리 로깅을 위한 의존성 클래스 - :param repository_type: 레포지토리 유형 (예: "VECTOR_DB", "RDB", "REDIS") - :param track_params: 추적할 매개변수 이름 목록 - """ - - def __init__(self, repository_type: str, track_params: List[str] = None): - self.repository_type = repository_type - self.track_params = track_params or [] - - async def __call__(self, request: Request): - """ - 의존성 주입 시 호출되는 메서드 - :param request: FastAPI Request 객체 - :return: 레포지토리 유형과 추출된 매개변수 딕셔너리 - """ - trace_id = trace_id_context.get("NO_TRACE_ID") - start_time = time.time() - - # 파라미터 추출 - params = await self._extract_params(request) - param_str = "" - if params: - param_strs = [f"{k}={v}" for k, v in params.items()] - param_str = " " + " ".join(param_strs) - - logger.info(f"[{self.repository_type}_START] trace_id={trace_id}{param_str}") - - # 응답 시 사용할 정보를 request.state에 저장 - request.state.repository_type = self.repository_type - request.state.start_time = start_time - request.state.param_str = param_str - - return {"repository_type": self.repository_type, "params": params} - - async def _extract_params(self, request: Request) -> Dict[str, Any]: - """ - 요청에서 추적 파라미터 추출 - :param request: FastAPI Request 객체 - :return: 추출된 매개변수 딕셔너리 - """ - params = {} - - try: - # Query Parameters 추출 - for key, value in request.query_params.items(): - if key in self.track_params: - params[key] = value - - # JSON Body 추출 - try: - json_body = await request.json() - if json_body: - for key, value in json_body.items(): - if key in self.track_params: - if isinstance(value, str) and len(value) > 50: - params[f"{key}_length"] = len(value) - elif isinstance(value, list): - params[f"{key}_count"] = len(value) - else: - params[key] = value - except: - pass - except: - pass - - return params - - -# 레포지토리별 의존성 인스턴스 생성 -vector_db_dependency = RepositoryLoggingDependency("VECTOR_DB", ["query", "embeddings", "top_k", "collection", "filters"]) -rdb_dependency = RepositoryLoggingDependency("RDB", ["table", "where_clause", "limit", "data"]) -redis_dependency = RepositoryLoggingDependency("REDIS", ["key", "value", "ttl", "pattern"]) -elasticsearch_dependency = RepositoryLoggingDependency("ELASTICSEARCH", ["index", "query", "size", "document"]) - - -# 응답 로깅을 위한 의존성 -async def log_repository_response(request: Request): - """ - 레포지토리 응답 시 성공 로그 기록 - :param request: FastAPI Request 객체 - """ - if hasattr(request.state, 'repository_type'): - trace_id = trace_id_context.get("NO_TRACE_ID") - duration = time.time() - request.state.start_time - logger.info( - f"[{request.state.repository_type}_SUCCESS] trace_id={trace_id} execution_time={duration:.4f}s{request.state.param_str}") - return None - - -""" -라우터 예시 -@router.post("/search") -async def vector_search( - query: str, - top_k: int = 10, - request: Request = None, - _: None = Depends(vector_db_dependency), # 직접 의존성 주입 - __: None = Depends(log_repository_response) -): - -또는 라우터 레벨에서: -vector_router = APIRouter( - prefix="/vector", - tags=["vector"], - dependencies=[Depends(vector_db_dependency)] -) -""" \ No newline at end of file diff --git a/apps/pre-processing-service/app/middleware/ServiceLoggerMiddleware.py b/apps/pre-processing-service/app/middleware/ServiceLoggerMiddleware.py index 5e4816a1..e9e9afcf 100644 --- a/apps/pre-processing-service/app/middleware/ServiceLoggerMiddleware.py +++ b/apps/pre-processing-service/app/middleware/ServiceLoggerMiddleware.py @@ -1,109 +1,198 @@ -import time -from typing import Dict, Any, List -from fastapi import Request +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import Response from loguru import logger from contextvars import ContextVar +from fastapi.responses import JSONResponse +from typing import Dict, Optional, List, Any + +import json +import time trace_id_context: ContextVar[str] = ContextVar('trace_id', default="NO_TRACE_ID") -class ServiceLoggingDependency: +class ServiceLoggerMiddleware(BaseHTTPMiddleware): """ - 서비스 로깅을 위한 의존성 클래스 - :param service_type: 서비스 유형 (예: "CHUNKING", "PARSING", "EMBEDDING") - :param track_params: 추적할 매개변수 이름 목록 + 완전 자동 서비스 로깅 미들웨어 - 의존성 주입 불필요 + URL 패턴을 기반으로 자동으로 서비스 타입 식별 및 로깅 """ - def __init__(self, service_type: str, track_params: List[str] = None): - self.service_type = service_type - self.track_params = track_params or [] - - async def __call__(self, request: Request): + def __init__(self, app, service_mappings: Dict[str, Dict] = None): """ - 의존성 주입 시 호출되는 메서드 - :param request: FastAPI Request 객체 - :return: 서비스 유형과 추출된 매개변수 딕셔너리 + :param service_mappings: URL 패턴별 서비스 설정 + 예: { + "/keywords/search": { + "service_type": "NAVER_CRAWLING", + "track_params": ["keyword", "category"], + "response_trackers": ["total_keywords", "results_count"] + } + } """ + super().__init__(app) + self.service_mappings = service_mappings or self._default_mappings() + + def _default_mappings(self) -> Dict[str, Dict]: + """기본 서비스 매핑 설정""" + return { + "/keywords/search": { + "service_type": "NAVER_CRAWLING", + "track_params": ["keyword", "category", "startDate", "endDate", "job_id", "schedule_id"], + "response_trackers": ["keyword", "total_keywords", "results_count"] + } + } + + async def dispatch(self, request: Request, call_next): + """요청-응답 사이클을 가로채서 자동 로깅 처리""" + + # 1. 서비스 설정 확인 + service_config = self._get_service_config(request.url.path) + if not service_config: + # 로깅 대상이 아닌 경우 그냥 통과 + return await call_next(request) + + # 2. 시작 로깅 trace_id = trace_id_context.get("NO_TRACE_ID") start_time = time.time() - # 파라미터 추출 - params = await self._extract_params(request) + # 파라미터 추출 및 시작 로그 + params = await self._extract_params(request, service_config["track_params"]) param_str = "" if params: param_strs = [f"{k}={v}" for k, v in params.items()] param_str = " " + " ".join(param_strs) - logger.info(f"[{self.service_type}_START] trace_id={trace_id}{param_str}") - - # 응답 시 사용할 정보를 request.state에 저장 - request.state.service_type = self.service_type - request.state.start_time = start_time - request.state.param_str = param_str + service_type = service_config["service_type"] + logger.info(f"[{service_type}_START] trace_id={trace_id}{param_str}") - return {"service_type": self.service_type, "params": params} - - async def _extract_params(self, request: Request) -> Dict[str, Any]: - """ - 요청에서 추적 파라미터 추출 - :param request: FastAPI Request 객체 - :return: 추출된 매개변수 딕셔너리 - """ + # 3. 요청 처리 + try: + response = await call_next(request) + + # 4. 성공 로깅 + if 200 <= response.status_code < 300: + await self._log_success_response( + service_type, trace_id, start_time, param_str, + response, service_config["response_trackers"] + ) + else: + await self._log_error_response( + service_type, trace_id, start_time, param_str, response + ) + + return response + + except Exception as e: + # 5. 예외 로깅 + await self._log_exception(service_type, trace_id, start_time, param_str, e) + raise + + def _get_service_config(self, url_path: str) -> Optional[Dict]: + """URL 경로를 기반으로 서비스 설정 반환""" + for pattern, config in self.service_mappings.items(): + if self._match_pattern(url_path, pattern): + return config + return None + + def _match_pattern(self, url_path: str, pattern: str) -> bool: + """URL 패턴 매칭 (간단한 구현, 필요시 정규식으로 확장 가능)""" + # 정확히 일치하거나 패턴이 접두사인 경우 + return url_path == pattern or url_path.startswith(pattern.rstrip('*')) + + async def _extract_params(self, request: Request, track_params: List[str]) -> Dict[str, Any]: + """요청에서 추적 파라미터 추출""" params = {} try: # Query Parameters 추출 for key, value in request.query_params.items(): - if key in self.track_params: + if key in track_params: params[key] = value # JSON Body 추출 try: - json_body = await request.json() - if json_body: - for key, value in json_body.items(): - if key in self.track_params: - if isinstance(value, str) and len(value) > 50: - params[f"{key}_length"] = len(value) - elif isinstance(value, list): - params[f"{key}_count"] = len(value) - else: - params[key] = value + # request body를 읽기 위한 안전한 방법 + body = await request.body() + if body: + json_body = json.loads(body.decode()) + if isinstance(json_body, dict): + for key, value in json_body.items(): + if key in track_params: + if isinstance(value, str) and len(value) > 50: + params[f"{key}_length"] = len(value) + elif isinstance(value, list): + params[f"{key}_count"] = len(value) + else: + params[key] = value except: pass - except: - pass - - return params + except Exception as e: + logger.debug(f"파라미터 추출 실패: {e}") -# 서비스별 의존성 인스턴스 생성 -chunking_dependency = ServiceLoggingDependency("CHUNKING", ["text", "chunk_size", "overlap"]) -parsing_dependency = ServiceLoggingDependency("PARSING", ["file_path", "file_type", "document"]) -embedding_dependency = ServiceLoggingDependency("EMBEDDING", ["chunks", "model_name", "batch_size"]) + return params -# 응답 로깅을 위한 의존성 -async def log_service_response(request: Request): - """ - 서비스 응답 시 성공 로그 기록 - :param request: FastAPI Request 객체 - """ - if hasattr(request.state, 'service_type'): - trace_id = trace_id_context.get("NO_TRACE_ID") - duration = time.time() - request.state.start_time - logger.info( - f"[{request.state.service_type}_SUCCESS] trace_id={trace_id} execution_time={duration:.4f}s{request.state.param_str}") - return None - -""" -라우터 예시 -@router.post("/chunk") -async def chunk_text( - text: str, - chunk_size: int = 100, - overlap: int = 20, - request: Request = None, - _: None = Depends(chunking_dependency), # 직접 의존성 주입 - __: None = Depends(log_service_response) -): -""" \ No newline at end of file + async def _log_success_response(self, service_type: str, trace_id: str, + start_time: float, param_str: str, + response: Response, response_trackers: List[str]): + """성공 응답 로깅""" + duration = time.time() - start_time + + log_parts = [ + f"[{service_type}_SUCCESS]", + f"trace_id={trace_id}", + f"execution_time={duration:.4f}s{param_str}", + f"status_code={response.status_code}" + ] + + # 응답 데이터에서 추적 정보 추출 + if isinstance(response, JSONResponse) and response_trackers: + try: + # JSONResponse body 읽기 + if hasattr(response, 'body'): + response_data = json.loads(response.body.decode()) + elif hasattr(response, 'content'): + response_data = response.content + else: + response_data = None + + if response_data and isinstance(response_data, dict): + response_params = [] + for tracker in response_trackers: + if tracker in response_data: + value = response_data[tracker] + if isinstance(value, dict): + response_params.append(f"{tracker}_keys={list(value.keys())}") + response_params.append(f"{tracker}_count={len(value)}") + elif isinstance(value, list): + response_params.append(f"{tracker}_count={len(value)}") + else: + response_params.append(f"{tracker}={value}") + + if response_params: + log_parts.append(" ".join(response_params)) + + except Exception as e: + logger.debug(f"응답 추적 정보 추출 실패: {e}") + + logger.info(" ".join(log_parts)) + + async def _log_error_response(self, service_type: str, trace_id: str, + start_time: float, param_str: str, response: Response): + """에러 응답 로깅""" + duration = time.time() - start_time + logger.error( + f"[{service_type}_ERROR] trace_id={trace_id} " + f"execution_time={duration:.4f}s{param_str} " + f"status_code={response.status_code}" + ) + + async def _log_exception(self, service_type: str, trace_id: str, + start_time: float, param_str: str, exception: Exception): + """예외 로깅""" + duration = time.time() - start_time + logger.error( + f"[{service_type}_EXCEPTION] trace_id={trace_id} " + f"execution_time={duration:.4f}s{param_str} " + f"exception={str(exception)}" + ) \ No newline at end of file diff --git a/apps/pre-processing-service/app/test/__init__.py b/apps/pre-processing-service/app/test/__init__.py new file mode 100644 index 00000000..e69de29b