diff --git a/pyproject.toml b/pyproject.toml index c19a1bf..1db67a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,8 +21,12 @@ dependencies = [ "cloudpickle>=3.1.1", "runpod", "python-dotenv>=1.0.0", + "rich>=14.0.0", ] +[project.optional-dependencies] +rich = ["rich>=13.0.0"] + [dependency-groups] dev = [ "mypy>=1.16.1", diff --git a/src/tetra_rp/__init__.py b/src/tetra_rp/__init__.py index a8bca51..97be4b0 100644 --- a/src/tetra_rp/__init__.py +++ b/src/tetra_rp/__init__.py @@ -22,10 +22,12 @@ runpod, NetworkVolume, ) +from .core.utils.rich_ui import capture_local_prints # noqa: E402 __all__ = [ "remote", + "capture_local_prints", "CpuServerlessEndpoint", "CpuInstanceType", "CudaVersion", diff --git a/src/tetra_rp/core/api/runpod.py b/src/tetra_rp/core/api/runpod.py index af1754d..9278b5c 100644 --- a/src/tetra_rp/core/api/runpod.py +++ b/src/tetra_rp/core/api/runpod.py @@ -9,9 +9,12 @@ from typing import Any, Dict, Optional import aiohttp +from ..utils.rich_ui import format_endpoint_created + log = logging.getLogger(__name__) + RUNPOD_API_BASE_URL = os.environ.get("RUNPOD_API_BASE_URL", "https://api.runpod.io") RUNPOD_REST_API_URL = os.environ.get("RUNPOD_REST_API_URL", "https://rest.runpod.io/v1") @@ -126,9 +129,16 @@ async def create_endpoint(self, input_data: Dict[str, Any]) -> Dict[str, Any]: raise Exception("Unexpected GraphQL response structure") endpoint_data = result["saveEndpoint"] - log.info( - f"Created endpoint: {endpoint_data.get('id', 'unknown')} - {endpoint_data.get('name', 'unnamed')}" - ) + + # Use Rich formatting if available + try: + format_endpoint_created( + endpoint_data.get("id", "unknown"), endpoint_data.get("name", "unnamed") + ) + except ImportError: + log.info( + f"Created endpoint: {endpoint_data.get('id', 'unknown')} - {endpoint_data.get('name', 'unnamed')}" + ) return endpoint_data diff --git a/src/tetra_rp/core/resources/resource_manager.py b/src/tetra_rp/core/resources/resource_manager.py index e593f0e..34e526b 100644 --- a/src/tetra_rp/core/resources/resource_manager.py +++ b/src/tetra_rp/core/resources/resource_manager.py @@ -4,6 +4,12 @@ from pathlib import Path from ..utils.singleton import SingletonMixin +from ..utils.rich_ui import ( + create_reused_resource_panel, + format_console_url, + is_rich_enabled, + print_with_rich, +) from .base import DeployableResource @@ -29,7 +35,8 @@ def _load_resources(self) -> Dict[str, DeployableResource]: try: with open(RESOURCE_STATE_FILE, "rb") as f: self._resources = cloudpickle.load(f) - log.debug(f"Loaded saved resources from {RESOURCE_STATE_FILE}") + if not is_rich_enabled(): + log.debug(f"Loaded saved resources from {RESOURCE_STATE_FILE}") except Exception as e: log.error(f"Failed to load resources from {RESOURCE_STATE_FILE}: {e}") return self._resources @@ -38,7 +45,8 @@ def _save_resources(self) -> None: """Persist state of resources to disk using cloudpickle.""" with open(RESOURCE_STATE_FILE, "wb") as f: cloudpickle.dump(self._resources, f) - log.debug(f"Saved resources in {RESOURCE_STATE_FILE}") + if not is_rich_enabled(): + log.debug(f"Saved resources in {RESOURCE_STATE_FILE}") def add_resource(self, uid: str, resource: DeployableResource): """Add a resource to the manager.""" @@ -68,12 +76,18 @@ async def get_or_deploy_resource( self.remove_resource(uid) return await self.get_or_deploy_resource(config) - log.debug(f"{existing} exists, reusing.") - log.info(f"URL: {existing.url}") + if is_rich_enabled(): + # Show a panel for reused resources similar to fresh deployments + panel = create_reused_resource_panel( + existing.name, existing.id, existing.url + ) + print_with_rich(panel) + else: + log.debug(f"{existing} exists, reusing.") return existing if deployed_resource := await config.deploy(): - log.info(f"URL: {deployed_resource.url}") + format_console_url(deployed_resource.url) self.add_resource(uid, deployed_resource) return deployed_resource diff --git a/src/tetra_rp/core/resources/serverless.py b/src/tetra_rp/core/resources/serverless.py index 236dd1f..9088586 100644 --- a/src/tetra_rp/core/resources/serverless.py +++ b/src/tetra_rp/core/resources/serverless.py @@ -10,6 +10,14 @@ field_validator, model_validator, ) + +from tetra_rp.core.utils.rich_ui import ( + job_progress_tracker, + create_deployment_panel, + create_metrics_table, + print_with_rich, + is_rich_enabled, +) from runpod.endpoint.runner import Job from ..api.runpod import RunpodGraphQLClient @@ -22,6 +30,7 @@ from .gpu import GpuGroup from .network_volume import NetworkVolume from .template import KeyValuePair, PodTemplate +from ..utils.rich_ui import rich_ui, format_api_info, format_job_status # Environment variables are loaded from the .env file @@ -153,7 +162,7 @@ def validate_gpus(cls, value: List[GpuGroup]) -> List[GpuGroup]: @model_validator(mode="after") def sync_input_fields(self): """Sync between temporary inputs and exported fields""" - if self.flashboot: + if self.flashboot and not self.name.endswith("-fb"): self.name += "-fb" if self.networkVolume and self.networkVolume.is_created: @@ -243,6 +252,13 @@ async def deploy(self) -> "DeployableResource": result = await client.create_endpoint(payload) if endpoint := self.__class__(**result): + if is_rich_enabled(): + panel = create_deployment_panel( + endpoint.name, endpoint.id or "", endpoint.url + ) + print_with_rich(panel) + else: + log.info(f"Deployed: {endpoint}") return endpoint raise ValueError("Deployment failed, no endpoint was returned.") @@ -292,44 +308,60 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput": # log.debug(f"[{self}] Payload: {payload}") # Create a job using the endpoint - log.info(f"{self} | API /run") + format_api_info(str(self), self.id or "", "/run") job = await asyncio.to_thread(self.endpoint.run, request_input=payload) log_subgroup = f"Job:{job.job_id}" - log.info(f"{self} | Started {log_subgroup}") - - current_pace = 0 - attempt = 0 - job_status = Status.UNKNOWN - last_status = job_status + # Use Rich progress tracker if available + with job_progress_tracker(job.job_id, self.name) as tracker: + if not is_rich_enabled(): + log.info(f"{self} | Started {log_subgroup}") + elif tracker: + # Initialize the progress tracker with starting status + tracker.update_status( + "IN_QUEUE", "Job submitted, waiting for worker..." + ) + + current_pace = 0 + attempt = 0 + job_status = Status.UNKNOWN + last_status = job_status - # Poll for job status - while True: - await asyncio.sleep(current_pace) + # Poll for job status + while True: + await asyncio.sleep(current_pace) # Check job status job_status = await asyncio.to_thread(job.status) - if last_status == job_status: - # nothing changed, increase the gap - attempt += 1 - indicator = "." * (attempt // 2) if attempt % 2 == 0 else "" - if indicator: - log.info(f"{log_subgroup} | {indicator}") - else: - # status changed, reset the gap - log.info(f"{log_subgroup} | Status: {job_status}") - attempt = 0 - - last_status = job_status - - # Adjust polling pace appropriately - current_pace = get_backoff_delay(attempt) - - if job_status in ("COMPLETED", "FAILED", "CANCELLED"): - response = await asyncio.to_thread(job._fetch_job) - return JobOutput(**response) + if last_status == job_status: + # nothing changed, increase the gap + attempt += 1 + if tracker: + tracker.show_progress_indicator() + else: + if not rich_ui.enabled: + indicator = ( + "." * (attempt // 2) if attempt % 2 == 0 else "" + ) + if indicator: + log.info(f"{log_subgroup} | {indicator}") + else: + # status changed, reset the gap + format_job_status(job.job_id, job_status) + attempt = 0 + + last_status = job_status + + # Adjust polling pace appropriately + current_pace = get_backoff_delay(attempt) + + if job_status in ("COMPLETED", "FAILED", "CANCELLED"): + if tracker: + tracker.update_status(job_status) + response = await asyncio.to_thread(job._fetch_job) + return JobOutput(**response) except Exception as e: if job and job.job_id: @@ -390,9 +422,15 @@ class JobOutput(BaseModel): error: Optional[str] = "" def model_post_init(self, __context): - log_group = f"Worker:{self.workerId}" - log.info(f"{log_group} | Delay Time: {self.delayTime} ms") - log.info(f"{log_group} | Execution Time: {self.executionTime} ms") + if is_rich_enabled(): + metrics_table = create_metrics_table( + self.delayTime, self.executionTime, self.workerId + ) + print_with_rich(metrics_table) + else: + log_group = f"Worker:{self.workerId}" + log.info(f"{log_group} | Delay Time: {self.delayTime} ms") + log.info(f"{log_group} | Execution Time: {self.executionTime} ms") class Status(str, Enum): diff --git a/src/tetra_rp/core/utils/rich_ui.py b/src/tetra_rp/core/utils/rich_ui.py new file mode 100644 index 0000000..d7729cc --- /dev/null +++ b/src/tetra_rp/core/utils/rich_ui.py @@ -0,0 +1,502 @@ +""" +Rich UI components for enhanced logging and progress display in Tetra. +This module provides Rich-based alternatives to standard logging output. +""" + +import builtins +import logging +import os +import time +from contextlib import contextmanager +from typing import Optional, Dict, Any, Generator, List, Union +from enum import Enum + +try: + from rich.console import Console + from rich.logging import RichHandler + from rich.panel import Panel + from rich.table import Table + from rich.status import Status + + RICH_AVAILABLE = True +except ImportError: + RICH_AVAILABLE = False + + # Create dummy classes for type hints when Rich is not available + class Console: + pass + + class Status: + pass + + +class TetraStatus(str, Enum): + """Status types for visual styling""" + + READY = "READY" + INITIALIZING = "INITIALIZING" + RUNNING = "RUNNING" + COMPLETED = "COMPLETED" + FAILED = "FAILED" + CANCELLED = "CANCELLED" + THROTTLED = "THROTTLED" + UNHEALTHY = "UNHEALTHY" + UNKNOWN = "UNKNOWN" + + +def is_rich_enabled() -> bool: + """Check if Rich UI should be enabled based on environment and availability""" + return ( + RICH_AVAILABLE and os.environ.get("LOG_LEVEL", "INFO").upper() == "INFO" + # os.environ.get("TETRA_RICH_UI", "false").lower() in ("true", "1", "yes") + ) + + +class RichUIManager: + """Central manager for Rich UI components""" + + def __init__(self): + self.console = Console() if RICH_AVAILABLE else None + self._enabled = is_rich_enabled() + self._captured_prints: List[str] = [] + self._original_print = None + self._print_capturing = False + + @property + def enabled(self) -> bool: + return self._enabled and self.console is not None + + def get_console(self) -> Optional["Console"]: + return self.console if self.enabled else None + + def start_print_capture(self) -> None: + """Start capturing print() calls""" + if not self.enabled or self._print_capturing: + return + + self._original_print = builtins.print + self._captured_prints.clear() + self._print_capturing = True + + def captured_print(*args, **kwargs): + # Convert print arguments to string + output = " ".join(str(arg) for arg in args) + + # Handle common print kwargs + end = kwargs.get("end", "\n") + sep = kwargs.get("sep", " ") + if len(args) > 1: + output = sep.join(str(arg) for arg in args) + + self._captured_prints.append(output + end.rstrip()) + + # Also send to original print for fallback scenarios + if not self.enabled: + self._original_print(*args, **kwargs) + + builtins.print = captured_print + + def stop_print_capture(self) -> List[str]: + """Stop capturing and return captured prints""" + if not self._print_capturing: + return [] + + if self._original_print: + builtins.print = self._original_print + + self._print_capturing = False + captured = self._captured_prints.copy() + self._captured_prints.clear() + return captured + + +# Global Rich UI manager instance +rich_ui = RichUIManager() + + +class RichLoggingFilter(logging.Filter): + """Filter to suppress verbose third-party logs when Rich UI is active""" + + def filter(self, record): + # Suppress asyncio selector debug messages + if record.name == "asyncio" and ( + "selector" in record.getMessage().lower() + or "using selector" in record.getMessage().lower() + ): + return False + # Suppress other verbose third-party logs + if record.levelno <= logging.INFO and record.name.startswith( + ("urllib3", "requests", "boto3", "botocore") + ): + return False + # Suppress all DEBUG level logs when Rich UI is active (except errors/warnings) + if record.levelno <= logging.DEBUG: + return False + return True + + +def get_rich_handler() -> logging.Handler: + """Get Rich logging handler if available, otherwise return standard handler""" + if rich_ui.enabled: + handler = RichHandler( + console=rich_ui.console, + show_time=True, + show_path=False, + markup=True, + rich_tracebacks=True, + ) + # Add filter to suppress verbose logs when Rich UI is active + handler.addFilter(RichLoggingFilter()) + return handler + else: + # Fallback to standard handler + handler = logging.StreamHandler() + handler.setFormatter( + logging.Formatter("%(asctime)s | %(levelname)-5s | %(message)s") + ) + return handler + + +def get_status_color(status: str) -> str: + """Get Rich color for status""" + if not rich_ui.enabled: + return "" + + color_map = { + "READY": "green", + "COMPLETED": "green", + "RUNNING": "blue", + "IN_QUEUE": "cyan", + "INITIALIZING": "yellow", + "FAILED": "red", + "CANCELLED": "red", + "THROTTLED": "orange3", + "UNHEALTHY": "red", + "UNKNOWN": "dim", + } + return color_map.get(status.upper(), "white") + + +def format_status_text(status: str, message: str = "") -> str: + """Format status text with color if Rich is enabled""" + if not rich_ui.enabled: + return f"{status}: {message}" if message else status + + color = get_status_color(status) + formatted_status = f"[{color}]{status}[/{color}]" + return f"{formatted_status}: {message}" if message else formatted_status + + +def create_deployment_panel( + endpoint_name: str, endpoint_id: str, console_url: str +) -> Union[str, "Panel"]: + """Create a deployment summary panel""" + if not rich_ui.enabled or not RICH_AVAILABLE: + return f"Deployed: {endpoint_name} ({endpoint_id})" + + table = Table.grid(padding=1) + table.add_column(style="bold cyan", no_wrap=True) + table.add_column() + + table.add_row("Endpoint:", f"[green]{endpoint_name}[/green]") + table.add_row("ID:", f"[dim]{endpoint_id}[/dim]") + table.add_row("Console:", f"[link={console_url}]{console_url}[/link]") + + panel = Panel( + table, + title="[bold green]🚀 Deployment Successful[/bold green]", + border_style="green", + ) + return panel + + +def create_reused_resource_panel( + endpoint_name: str, endpoint_id: str, console_url: str +) -> Union[str, "Panel"]: + """Create a panel for reused existing resources""" + if not rich_ui.enabled or not RICH_AVAILABLE: + return f"Reusing: {endpoint_name} ({endpoint_id})" + + table = Table.grid(padding=1) + table.add_column(style="bold cyan", no_wrap=True) + table.add_column() + + table.add_row("Endpoint:", f"[blue]{endpoint_name}[/blue]") + table.add_row("ID:", f"[dim]{endpoint_id}[/dim]") + table.add_row("Console:", f"[link={console_url}]{console_url}[/link]") + + panel = Panel( + table, + title="[bold blue]♻️ Using Existing Resource[/bold blue]", + border_style="blue", + ) + return panel + + +def create_metrics_table( + delay_time: int, execution_time: int, worker_id: str +) -> Union[str, "Panel"]: + """Create a metrics display table""" + if not rich_ui.enabled or not RICH_AVAILABLE: + return f"Worker:{worker_id} | Delay: {delay_time}ms | Execution: {execution_time}ms" + + table = Table(show_header=True, header_style="bold magenta") + table.add_column("Metric", style="cyan", no_wrap=True) + table.add_column("Value", style="green") + + table.add_row("Worker ID", f"[dim]{worker_id}[/dim]") + table.add_row("Delay Time", f"{delay_time:,} ms") + table.add_row("Execution Time", f"{execution_time:,} ms") + + return Panel( + table, title="[bold blue]📊 Job Metrics[/bold blue]", border_style="blue" + ) + + +@contextmanager +def job_progress_tracker( + job_id: str, endpoint_name: str +) -> Generator[Optional["JobProgressTracker"], None, None]: + """Context manager for tracking job progress with Rich UI""" + if rich_ui.enabled and rich_ui.console is not None: + tracker = JobProgressTracker(job_id, endpoint_name, rich_ui.console) + try: + yield tracker + finally: + tracker.stop() + else: + yield None + + +class JobProgressTracker: + """Tracks job progress with Rich live status display""" + + def __init__(self, job_id: str, endpoint_name: str, console: "Console"): + self.job_id = job_id + self.endpoint_name = endpoint_name + self.console = console + self.start_time = time.time() + self.last_status = "UNKNOWN" + self.attempt_count = 0 + self.current_status_display = None + self.status_printed = False + + def update_status(self, status: str, message: str = "") -> None: + """Update the job status display""" + if not rich_ui.enabled or not RICH_AVAILABLE: + return + + elapsed = int(time.time() - self.start_time) + + if status != self.last_status: + # Clean up previous status display + if self.current_status_display: + self.current_status_display.stop() + self.current_status_display = None + + self.attempt_count = 0 + self.last_status = status + self.status_printed = False + + # Handle different status types + if status in ["IN_QUEUE", "INITIALIZING", "RUNNING"]: + # Choose appropriate emoji and spinner based on status + if status == "IN_QUEUE": + emoji = "⏳" + spinner = "simpleDotsScrolling" + elif status == "INITIALIZING": + emoji = "⚡" + spinner = "dots12" + else: # RUNNING + emoji = "🏁" + spinner = "dots" + + status_text = ( + f"[{get_status_color(status)}]{status}[/{get_status_color(status)}]" + ) + full_message = f"{emoji} {status_text}" + if message: + full_message += f" {message}" + + # Create live status display + self.current_status_display = Status( + full_message, spinner=spinner, console=self.console + ) + self.current_status_display.start() + else: + # Final status - print and finish + color = get_status_color(status) + self.console.print( + f"[{color}]●[/{color}] Job {self.job_id}: {status} ({elapsed}s)" + ) + else: + self.attempt_count += 1 + + def show_progress_indicator(self) -> None: + """Update the live status with progress indication""" + if not rich_ui.enabled or not self.current_status_display: + return + + # For live status, the spinner handles the animation automatically + # We can optionally update the message to show progress + if self.attempt_count > 0 and self.attempt_count % 5 == 0: # Every 5th attempt + elapsed = int(time.time() - self.start_time) + status_text = f"[{get_status_color(self.last_status)}]{self.last_status}[/{get_status_color(self.last_status)}]" + + if self.last_status == "IN_QUEUE": + emoji = "🕑" + message = f"Waiting for worker... ({elapsed}s)" + elif self.last_status == "INITIALIZING": + emoji = "⚡" + message = f"Starting up worker... ({elapsed}s)" + else: # RUNNING + emoji = "⚙️" + message = f"Executing function... ({elapsed}s)" + + full_message = f"{emoji} {status_text} {message}" + self.current_status_display.update(full_message) + + def stop(self) -> None: + """Stop the status display""" + if self.current_status_display: + self.current_status_display.stop() + self.current_status_display = None + + +def print_with_rich(message: Any, style: str = "") -> None: + """Print message with Rich styling if available""" + if rich_ui.enabled and rich_ui.console: + rich_ui.console.print(message, style=style) + else: + # Strip Rich markup for plain output if it's a string + if isinstance(message, str): + import re + + clean_message = re.sub(r"\[/?[^\]]*\]", "", message) + print(clean_message) + else: + print(message) + + +def create_health_display(health_data: Dict[str, Any]) -> Union[str, "Panel"]: + """Create health status display""" + if not rich_ui.enabled or not RICH_AVAILABLE: + return f"Health: {health_data}" + + table = Table(show_header=True, header_style="bold cyan") + table.add_column("Status", style="bold") + table.add_column("Count", justify="right", style="green") + + workers = health_data.get("workers", {}) + for status, count in workers.items(): + if count > 0: + color = get_status_color(status) + table.add_row(f"[{color}]{status.title()}[/{color}]", str(count)) + + return Panel( + table, + title="[bold yellow]🏥 Endpoint Health[/bold yellow]", + border_style="yellow", + ) + + +def create_user_output_panel( + output_lines: List[str], source: str = "Local" +) -> Union[str, "Panel"]: + """Create a panel for user print() output""" + if not rich_ui.enabled or not RICH_AVAILABLE or not output_lines: + return "" + + # Filter out empty lines and format content + content_lines = [line for line in output_lines if line.strip()] + + if not content_lines: + return "" + + # Create the content text + content = "\n".join(content_lines) + + # Choose icon and color based on source + if source.lower() == "remote": + icon = "🔧" + border_color = "blue" + title_color = "bold blue" + else: + icon = "💬" + border_color = "green" + title_color = "bold green" + + return Panel( + content, + title=f"[{title_color}]{icon} {source} Output[/{title_color}]", + border_style=border_color, + padding=(0, 1), + ) + + +def display_remote_output(stdout_lines: List[str]) -> None: + """Display remote function output in a Rich panel""" + if not rich_ui.enabled or not stdout_lines: + return + + panel = create_user_output_panel(stdout_lines, "Remote") + if panel: + print_with_rich(panel) + + +@contextmanager +def capture_local_prints() -> Generator[None, None, None]: + """Context manager to capture and display local print() calls""" + if not rich_ui.enabled: + yield + return + + rich_ui.start_print_capture() + try: + yield + finally: + captured = rich_ui.stop_print_capture() + if captured: + panel = create_user_output_panel(captured, "Local") + if panel: + print_with_rich(panel) + + +def format_api_info(endpoint_name: str, endpoint_id: str, api_path: str) -> None: + """Display API endpoint info in Rich format""" + if rich_ui.enabled and rich_ui.console: + rich_ui.console.print( + f"🔗 [cyan]{endpoint_name}[/cyan]:[dim]{endpoint_id}[/dim] [green]→[/green] [bold yellow]{api_path}[/bold yellow]" + ) + else: + print(f"{endpoint_name}:{endpoint_id} | API {api_path}") + + +def format_job_status(job_id: str, status: str) -> None: + """Display job status in Rich format""" + if rich_ui.enabled and rich_ui.console: + color = get_status_color(status) + short_job_id = job_id.split("-")[0] if "-" in job_id else job_id[:8] + rich_ui.console.print( + f"⚡ Job [dim]{short_job_id}[/dim]: [{color}]{status}[/{color}]" + ) + else: + print(f"Job:{job_id} | Status: {status}") + + +def format_console_url(url: str) -> None: + """Display console URL in Rich format""" + if rich_ui.enabled and rich_ui.console: + rich_ui.console.print(f"🌐 [link={url}]Console Dashboard[/link]") + else: + print(f"URL: {url}") + + +def format_endpoint_created(endpoint_id: str, endpoint_name: str) -> None: + """Display endpoint creation in Rich format""" + if rich_ui.enabled and rich_ui.console: + rich_ui.console.print( + f"✨ Created endpoint [cyan]{endpoint_name}[/cyan] [dim]({endpoint_id})[/dim]" + ) + else: + print(f"Created endpoint: {endpoint_id} - {endpoint_name}") diff --git a/src/tetra_rp/logger.py b/src/tetra_rp/logger.py index c27c21c..98b935a 100644 --- a/src/tetra_rp/logger.py +++ b/src/tetra_rp/logger.py @@ -3,28 +3,46 @@ import sys from typing import Union, Optional +# Import Rich UI components with fallback +try: + from .core.utils.rich_ui import get_rich_handler, is_rich_enabled + + RICH_UI_AVAILABLE = True +except ImportError: + RICH_UI_AVAILABLE = False + def setup_logging( level: Union[int, str] = logging.INFO, stream=sys.stdout, fmt: Optional[str] = None ): """ Sets up the root logger with a stream handler and basic formatting. + Uses Rich handler if available and enabled, otherwise falls back to standard logging. Does nothing if handlers are already configured. """ if isinstance(level, str): level = getattr(logging, level.upper(), logging.INFO) - if fmt is None: - if level == logging.DEBUG: - fmt = "%(asctime)s | %(levelname)-5s | %(name)s | %(filename)s:%(lineno)d | %(message)s" - else: - # Default format for INFO level and above - fmt = "%(asctime)s | %(levelname)-5s | %(message)s" - root_logger = logging.getLogger() if not root_logger.hasHandlers(): - handler = logging.StreamHandler(stream) - handler.setFormatter(logging.Formatter(fmt)) + # Use Rich handler if available and enabled + if RICH_UI_AVAILABLE and is_rich_enabled(): + handler = get_rich_handler() + # When Rich UI is enabled, reduce log verbosity to focus on Rich output + if level <= logging.INFO: + level = logging.WARNING # Only show warnings and errors + else: + # Fallback to standard handler + if fmt is None: + if level == logging.DEBUG: + fmt = "%(asctime)s | %(levelname)-5s | %(name)s | %(filename)s:%(lineno)d | %(message)s" + else: + # Default format for INFO level and above + fmt = "%(asctime)s | %(levelname)-5s | %(message)s" + + handler = logging.StreamHandler(stream) + handler.setFormatter(logging.Formatter(fmt)) + root_logger.setLevel(level) root_logger.addHandler(handler) @@ -32,3 +50,23 @@ def setup_logging( env_level = os.environ.get("LOG_LEVEL") if env_level: root_logger.setLevel(env_level.upper()) + elif RICH_UI_AVAILABLE and is_rich_enabled(): + # Suppress routine logs when Rich UI is active, unless explicitly overridden + if not os.environ.get("LOG_LEVEL"): + root_logger.setLevel(logging.WARNING) + # Also suppress specific noisy loggers + for logger_name in [ + "tetra_rp", + "serverless", + "resource_manager", + "LiveServerlessStub", + "asyncio", + ]: + specific_logger = logging.getLogger(logger_name) + specific_logger.setLevel(logging.WARNING) + + # Add global filter to catch any remaining debug messages + from .core.utils.rich_ui import RichLoggingFilter + + global_filter = RichLoggingFilter() + root_logger.addFilter(global_filter) diff --git a/src/tetra_rp/stubs/live_serverless.py b/src/tetra_rp/stubs/live_serverless.py index c8bbf67..78e031d 100644 --- a/src/tetra_rp/stubs/live_serverless.py +++ b/src/tetra_rp/stubs/live_serverless.py @@ -6,6 +6,7 @@ import traceback import cloudpickle import logging +from tetra_rp.core.utils.rich_ui import display_remote_output, is_rich_enabled from ..core.resources import LiveServerless from ..protos.remote_execution import ( FunctionRequest, @@ -94,8 +95,14 @@ def handle_response(self, response: FunctionResponse): raise ValueError("Invalid response from server") if response.stdout: - for line in response.stdout.splitlines(): - log.info(f"Remote | {line}") + stdout_lines = response.stdout.splitlines() + if is_rich_enabled(): + # Display remote output in Rich panel + display_remote_output(stdout_lines) + else: + # Fallback to standard logging + for line in stdout_lines: + log.info(f"Remote | {line}") if response.success: if response.result is None: diff --git a/tetra-examples b/tetra-examples index 98fb64b..e804315 160000 --- a/tetra-examples +++ b/tetra-examples @@ -1 +1 @@ -Subproject commit 98fb64bf195cbf56a77e03335de623b98b01c4f9 +Subproject commit e804315ee8712c73559086ff394b63a71be144d4 diff --git a/uv.lock b/uv.lock index b7c8a23..607b378 100644 --- a/uv.lock +++ b/uv.lock @@ -2350,9 +2350,15 @@ source = { editable = "." } dependencies = [ { name = "cloudpickle" }, { name = "python-dotenv" }, + { name = "rich" }, { name = "runpod" }, ] +[package.optional-dependencies] +rich = [ + { name = "rich" }, +] + [package.dev-dependencies] dev = [ { name = "mypy" }, @@ -2370,8 +2376,11 @@ test = [ requires-dist = [ { name = "cloudpickle", specifier = ">=3.1.1" }, { name = "python-dotenv", specifier = ">=1.0.0" }, + { name = "rich", specifier = ">=14.0.0" }, + { name = "rich", marker = "extra == 'rich'", specifier = ">=13.0.0" }, { name = "runpod" }, ] +provides-extras = ["rich"] [package.metadata.requires-dev] dev = [