diff --git a/pyproject.toml b/pyproject.toml index 06b04c13fb..c7428e2056 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ pyfuse3 = ["pyfuse3 >= 3.1.1"] nofuse = [] s3 = ["borgstore[s3] ~= 0.3.0"] sftp = ["borgstore[sftp] ~= 0.3.0"] +cockpit = ["textual>=6.8.0"] # might also work with older versions, untested [project.urls] "Homepage" = "https://borgbackup.org/" diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index 999ba8e1c4..cce8686348 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -337,6 +337,7 @@ def build_parser(self): parser.add_argument( "-V", "--version", action="version", version="%(prog)s " + __version__, help="show version number and exit" ) + parser.add_argument("--cockpit", dest="cockpit", action="store_true", help="Start the Borg TUI") parser.common_options.add_common_group(parser, "_maincommand", provide_defaults=True) common_parser = argparse.ArgumentParser(add_help=False, prog=self.prog) @@ -646,6 +647,23 @@ def main(): # pragma: no cover print(msg, file=sys.stderr) print(tb, file=sys.stderr) sys.exit(EXIT_ERROR) + + if args.cockpit: + # Cockpit TUI operation + try: + from ..cockpit.app import BorgCockpitApp + except ImportError as err: + print(f"ImportError: {err}", file=sys.stderr) + print("The Borg Cockpit feature has some additional requirements.", file=sys.stderr) + print("Please install them using: pip install 'borgbackup[cockpit]'", file=sys.stderr) + sys.exit(EXIT_ERROR) + + app = BorgCockpitApp() + app.borg_args = [arg for arg in sys.argv[1:] if arg != "--cockpit"] + app.run() + sys.exit(EXIT_SUCCESS) # borg subprocess RC was already shown on the TUI + + # normal borg CLI operation try: with sig_int: exit_code = archiver.run(args) diff --git a/src/borg/cockpit/__init__.py b/src/borg/cockpit/__init__.py new file mode 100644 index 0000000000..56d6b13a07 --- /dev/null +++ b/src/borg/cockpit/__init__.py @@ -0,0 +1,5 @@ +""" +Borg Cockpit - Terminal User Interface for BorgBackup. + +This module contains the TUI implementation using Textual. +""" diff --git a/src/borg/cockpit/app.py b/src/borg/cockpit/app.py new file mode 100644 index 0000000000..7d68961825 --- /dev/null +++ b/src/borg/cockpit/app.py @@ -0,0 +1,124 @@ +""" +Borg Cockpit - Application Entry Point. +""" + +import asyncio +import time + +from textual.app import App, ComposeResult +from textual.widgets import Header, Footer +from textual.containers import Horizontal, Container + +from .theme import theme + + +class BorgCockpitApp(App): + """The main TUI Application class for Borg Cockpit.""" + + from .. import __version__ as BORG_VERSION + + TITLE = f"Cockpit for BorgBackup {BORG_VERSION}" + CSS_PATH = "cockpit.tcss" + BINDINGS = [("q", "quit", "Quit"), ("ctrl+c", "quit", "Quit"), ("t", "toggle_translator", "Toggle Translator")] + + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + from .widgets import LogoPanel, StatusPanel, StandardLog + + yield Header(show_clock=True) + + with Container(id="main-grid"): + with Horizontal(id="top-row"): + yield LogoPanel(id="logopanel") + yield StatusPanel(id="status") + + yield StandardLog(id="standard-log") + + yield Footer() + + def get_theme_variable_defaults(self): + # make these variables available to ALL themes + return { + "pulsar-color": "#ffffff", + "pulsar-dim-color": "#000000", + "star-color": "#888888", + "star-bright-color": "#ffffff", + "logo-color": "#00dd00", + } + + def on_load(self) -> None: + """Initialize theme before UI.""" + self.register_theme(theme) + self.theme = theme.name + + def on_mount(self) -> None: + """Initialize components.""" + from .runner import BorgRunner + + self.query_one("#logo").styles.animate("opacity", 1, duration=1) + self.query_one("#slogan").styles.animate("opacity", 1, duration=1) + + self.start_time = time.monotonic() + self.process_running = True + args = getattr(self, "borg_args", ["--version"]) # Default to safe command if none passed + self.runner = BorgRunner(args, self.handle_log_event) + self.runner_task = asyncio.create_task(self.runner.start()) + + # Speed tracking + self.total_lines_processed = 0 + self.last_lines_processed = 0 + self.speed_timer = self.set_interval(1.0, self.compute_speed) + + def compute_speed(self) -> None: + """Calculate and update speed (lines per second).""" + current_lines = self.total_lines_processed + lines_per_second = float(current_lines - self.last_lines_processed) + self.last_lines_processed = current_lines + + status_panel = self.query_one("#status") + status_panel.update_speed(lines_per_second / 1000) + if self.process_running: + status_panel.elapsed_time = time.monotonic() - self.start_time + + async def on_unmount(self) -> None: + """Cleanup resources on app shutdown.""" + if hasattr(self, "runner"): + await self.runner.stop() + + async def action_quit(self) -> None: + """Handle quit action.""" + if hasattr(self, "speed_timer"): + self.speed_timer.stop() + if hasattr(self, "runner"): + await self.runner.stop() + if hasattr(self, "runner_task"): + await self.runner_task + self.query_one("#logo").styles.animate("opacity", 0, duration=2) + self.query_one("#slogan").styles.animate("opacity", 0, duration=2) + await asyncio.sleep(2) # give the user a chance the see the borg RC + self.exit() + + def action_toggle_translator(self) -> None: + """Toggle the universal translator.""" + from .translator import TRANSLATOR + + TRANSLATOR.toggle() + # Refresh dynamic UI elements + self.query_one("#status").refresh_ui_labels() + self.query_one("#standard-log").update_title() + self.query_one("#slogan").update_slogan() + + def handle_log_event(self, data: dict): + """Process a event from BorgRunner.""" + msg_type = data.get("type", "log") + + if msg_type == "stream_line": + self.total_lines_processed += 1 + line = data.get("line", "") + widget = self.query_one("#standard-log") + widget.add_line(line) + + elif msg_type == "process_finished": + self.process_running = False + rc = data.get("rc", 0) + self.query_one("#status").rc = rc diff --git a/src/borg/cockpit/cockpit.tcss b/src/borg/cockpit/cockpit.tcss new file mode 100644 index 0000000000..8b8f401649 --- /dev/null +++ b/src/borg/cockpit/cockpit.tcss @@ -0,0 +1,201 @@ +/* Borg Cockpit Stylesheet */ + +Screen { + background: $surface; +} + +Header { + dock: top; + background: $primary; + color: $secondary; + text-style: bold; +} + +Header * { + background: $primary; + color: $secondary; + text-style: bold; +} + +.header--clock, .header--title, .header--icon { + background: $primary; + color: $secondary; + text-style: bold; +} + +.header--clock { + dock: right; +} + +Footer { + background: $background; + color: $primary; + dock: bottom; +} + +.footer--key { + background: $background; + color: $primary; + text-style: bold; +} + +.footer--description { + background: $background; + color: $primary; + text-style: bold; +} + +.footer--highlight { + background: $primary; + color: $secondary; +} + +#standard-log-content { + scrollbar-background: $background; + scrollbar-color: $primary; + /* Hide horizontal scrollbar and clip long lines at the right */ + overflow-x: hidden; + text-wrap: nowrap; +} + +#standard-log { + border: double $primary; +} + +#main-grid { + /* Simple vertical stack: top row content-sized, log fills remaining space */ + layout: vertical; + /* Fill available area between header and footer */ + height: 1fr; + /* Allow shrinking when space is tight */ + min-height: 0; + margin: 0 1; +} + +#top-row { + border: double $primary; + /* If content grows too large, scroll rather than pushing the log off-screen */ + overflow-y: auto; + /* Adjust this if status or logo panel shall get more/less height. */ + height: 16; +} + +#logopanel { + width: 50%; + /* Stretch to the full height of the top row so the separator spans fully */ + height: 100%; + border-right: double $primary; + text-align: center; + layers: base overlay; + /* Make logo panel not influence row height beyond status; clip overflow */ + overflow: hidden; +} + +Starfield { + layer: base; + width: 100%; + /* Size to content and get clipped by the panel */ + height: 100%; + min-height: 0; +} + +Pulsar { + layer: overlay; + width: 3; + height: 3; + content-align: center middle; + color: $pulsar-color; + transition: color 4s linear; +} + +Slogan { + layer: overlay; + width: auto; + height: 1; + content-align: center middle; + color: #00ff00; + transition: color 1s linear; + opacity: 0; + max-height: 100%; + overflow: hidden; +} + +Logo { + layer: overlay; + width: auto; + /* Size to its intrinsic content, clipped by the panel */ + height: auto; + opacity: 0; + max-height: 100%; + overflow: hidden; +} + +Slogan.dim { + color: #005500; +} + +Pulsar.dim { + color: $pulsar-dim-color; +} + +#status { + width: 50%; + /* Let height be determined by content so the row can size to content */ + height: auto; + /* Prevent internal content from forcing excessive height; allow scrolling */ + overflow-y: auto; +} + +/* Ensure the log always keeps at least 5 rows visible */ +#standard-log { + min-height: 5; + /* Explicitly claim the remaining space in the grid */ + height: 1fr; +} + +/* Within the log panel (a Vertical container), keep the title to 1 line and let content fill the rest */ +#standard-log-title { + height: 1; +} + +#standard-log-content { + /* Allow the RichLog to expand within the log panel */ + height: 1fr; +} + +.panel-title { + background: $primary; + color: $secondary; + padding: 0 1; + text-style: bold; +} + +#speed-sparkline { + width: 100%; + height: 4; + margin-bottom: 1; +} + +.status { + color: $primary; +} + +.errors-ok { + color: $success; +} + +.errors-warning { + color: $warning; +} + +.rc-ok { + color: $success; +} + +.rc-warning { + color: $warning; +} + +.rc-error { + color: $error; +} diff --git a/src/borg/cockpit/runner.py b/src/borg/cockpit/runner.py new file mode 100644 index 0000000000..4fee387141 --- /dev/null +++ b/src/borg/cockpit/runner.py @@ -0,0 +1,74 @@ +""" +Borg Runner - Manages Borg subprocess execution and output parsing. +""" + +import asyncio +import logging +import os +import sys +from typing import Optional, Callable, List + + +class BorgRunner: + """ + Manages the execution of the borg subprocess and parses its JSON output. + """ + + def __init__(self, command: List[str], log_callback: Callable[[dict], None]): + self.command = command + self.log_callback = log_callback + self.process: Optional[asyncio.subprocess.Process] = None + self.logger = logging.getLogger(__name__) + + async def start(self): + """ + Starts the Borg subprocess and processes its output. + """ + if self.process is not None: + self.logger.warning("Borg process already running.") + return + + cmd = [sys.executable, "-m", "borg"] + self.command + + self.logger.info(f"Starting Borg process: {cmd}") + + env = os.environ.copy() + env["PYTHONUNBUFFERED"] = "1" + + try: + self.process = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env + ) + + async def read_stream(stream, stream_name): + while True: + line = await stream.readline() + if not line: + break + decoded_line = line.decode("utf-8", errors="replace").rstrip() + if decoded_line: + self.log_callback({"type": "stream_line", "stream": stream_name, "line": decoded_line}) + + # Read both streams concurrently + await asyncio.gather(read_stream(self.process.stdout, "stdout"), read_stream(self.process.stderr, "stderr")) + + rc = await self.process.wait() + self.log_callback({"type": "process_finished", "rc": rc}) + + except Exception as e: + self.logger.error(f"Failed to run Borg process: {e}") + self.log_callback({"type": "process_finished", "rc": -1, "error": str(e)}) + finally: + self.process = None + + async def stop(self): + """ + Stops the Borg subprocess if it is running. + """ + if self.process and self.process.returncode is None: + self.logger.info("Terminating Borg process...") + try: + self.process.terminate() + await self.process.wait() + except ProcessLookupError: + pass # Process already dead diff --git a/src/borg/cockpit/theme.py b/src/borg/cockpit/theme.py new file mode 100644 index 0000000000..463558520f --- /dev/null +++ b/src/borg/cockpit/theme.py @@ -0,0 +1,29 @@ +""" +Borg Theme Definition. +""" + +from textual.theme import Theme + +theme = Theme( + name="borg", + primary="#00FF00", + secondary="#000000", # text on top of $primary background + error="#FF0000", + warning="#FFA500", + success="#00FF00", + accent="#00FF00", # highlighted interactive elements + foreground="#00FF00", # default text color + background="#000000", + surface="#000000", # bg col of lowest layer + panel="#444444", # bg col of panels, containers, cards, sidebars, modal dialogs, etc. + dark=True, + variables={ + "block-cursor-text-style": "none", + "input-selection-background": "#00FF00 35%", + "pulsar-color": "#ffffff", + "pulsar-dim-color": "#000000", + "star-color": "#888888", + "star-bright-color": "#ffffff", + "logo-color": "#00dd00", + }, +) diff --git a/src/borg/cockpit/translator.py b/src/borg/cockpit/translator.py new file mode 100644 index 0000000000..2302d451c4 --- /dev/null +++ b/src/borg/cockpit/translator.py @@ -0,0 +1,55 @@ +""" +Universal Translator - Converts standard English into Borg Speak. +""" + +BORG_DICTIONARY = { # English -> Borg + # UI Strings + "**** You're welcome! ****": "You will be assimilated! ", + "Files: ": "Drones: ", + "Unchanged: ": "Unchanged: ", + "Modified: ": "Modified: ", + "Added: ": "Assimilated: ", + "Other: ": "Other: ", + "Errors: ": "Escaped: ", + "RC: ": "Termination Code: ", + "Log": "Subspace Transmissions", +} + + +class UniversalTranslator: + """ + Handles translation of log messages. + """ + + def __init__(self, enabled: bool = True): + # self.enabled is the opposite of "Translator active" on the TUI, + # because in the source, we translate English to Borg. + self.enabled = enabled # True: English -> Borg + + def toggle(self): + """Toggle translation state.""" + self.enabled = not self.enabled + return self.enabled + + def translate(self, message: str) -> str: + """Translate a message if enabled.""" + if not self.enabled: + return message + + # Full matching first + if message in BORG_DICTIONARY: + return BORG_DICTIONARY[message] + + # Substring matching next + for key, value in BORG_DICTIONARY.items(): + if key in message: + return message.replace(key, value) + + return message + + +# Global Instance +TRANSLATOR = UniversalTranslator(enabled=False) + +# Global translation function +T = TRANSLATOR.translate diff --git a/src/borg/cockpit/widgets.py b/src/borg/cockpit/widgets.py new file mode 100644 index 0000000000..efae7b42b9 --- /dev/null +++ b/src/borg/cockpit/widgets.py @@ -0,0 +1,430 @@ +""" +Borg Cockpit - UI Widgets. +""" + +import random +import time + +from rich.markup import escape +from textual.app import ComposeResult +from textual.reactive import reactive +from textual.widgets import Static, RichLog +from textual.containers import Vertical, Container +from ..helpers import classify_ec +from .translator import T, TRANSLATOR + + +class StatusPanel(Static): + elapsed_time = reactive(0.0, init=False) + files_count = reactive(0, init=False) # unchanged + modified + added + other + error + unchanged_count = reactive(0, init=False) + modified_count = reactive(0, init=False) + added_count = reactive(0, init=False) + other_count = reactive(0, init=False) + error_count = reactive(0, init=False) + rc = reactive(None, init=False) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.speed_history = [0.0] * SpeedSparkline.HISTORY_SIZE + + def compose(self) -> ComposeResult: + with Vertical(): + yield SpeedSparkline(self.speed_history, id="speed-sparkline") + yield Static(T("Speed: 0/s"), id="status-speed") + + with Vertical(id="statuses"): + yield Static(T("Elapsed: 00d 00:00:00"), classes="status", id="status-elapsed") + yield Static(T("Files: 0"), classes="status", id="status-files") + yield Static(T("Unchanged: 0"), classes="status", id="status-unchanged") + yield Static(T("Modified: 0"), classes="status", id="status-modified") + yield Static(T("Added: 0"), classes="status", id="status-added") + yield Static(T("Other: 0"), classes="status", id="status-other") + yield Static(T("Errors: 0"), classes="status error-ok", id="status-errors") + yield Static(T("RC: RUNNING"), classes="status", id="status-rc") + + def update_speed(self, kfiles_per_second: float): + self.speed_history.append(kfiles_per_second) + self.speed_history = self.speed_history[-SpeedSparkline.HISTORY_SIZE :] + # Use our custom update method + self.query_one("#speed-sparkline").update_data(self.speed_history) + self.query_one("#status-speed").update(T(f"Speed: {int(kfiles_per_second * 1000)}/s")) + + def watch_error_count(self, count: int) -> None: + sw = self.query_one("#status-errors") + if count == 0: + sw.remove_class("errors-warning") + sw.add_class("errors-ok") + else: + sw.remove_class("errors-ok") + sw.add_class("errors-warning") + sw.update(T(f"Errors: {count}")) + + def watch_files_count(self, count: int) -> None: + self.query_one("#status-files").update(T(f"Files: {count}")) + + def watch_unchanged_count(self, count: int) -> None: + self.query_one("#status-unchanged").update(T(f"Unchanged: {count}")) + + def watch_modified_count(self, count: int) -> None: + self.query_one("#status-modified").update(T(f"Modified: {count}")) + + def watch_added_count(self, count: int) -> None: + self.query_one("#status-added").update(T(f"Added: {count}")) + + def watch_other_count(self, count: int) -> None: + self.query_one("#status-other").update(T(f"Other: {count}")) + + def watch_rc(self, rc: int): + label = self.query_one("#status-rc") + if rc is None: + label.update(T("RC: RUNNING")) + return + + label.remove_class("rc-ok") + label.remove_class("rc-warning") + label.remove_class("rc-error") + + status = classify_ec(rc) + if status == "success": + label.add_class("rc-ok") + elif status == "warning": + label.add_class("rc-warning") + else: # error, signal + label.add_class("rc-error") + + label.update(T(f"RC: {rc}")) + + def watch_elapsed_time(self, elapsed: float) -> None: + if TRANSLATOR.enabled: + # There seems to be no official formula for stardates, so we make something up. + # When showing the stardate, it is an absolute time, not relative "elapsed time". + ut = time.time() + sd = (ut - 1735689600) / 60.0 # Minutes since 2025-01-01 00:00.00 UTC + msg = f"Stardate {sd:.1f}" + else: + seconds = int(elapsed) + days, seconds = divmod(seconds, 86400) + h, m, s = seconds // 3600, (seconds % 3600) // 60, seconds % 60 + msg = f"Elapsed: {days:02d}d {h:02d}:{m:02d}:{s:02d}" + self.query_one("#status-elapsed").update(msg) + + def refresh_ui_labels(self): + """Update static UI labels with current translation.""" + self.watch_elapsed_time(self.elapsed_time) + self.query_one("#status-files").update(T(f"Files: {self.files_count}")) + self.query_one("#status-unchanged").update(T(f"Unchanged: {self.unchanged_count}")) + self.query_one("#status-modified").update(T(f"Modified: {self.modified_count}")) + self.query_one("#status-added").update(T(f"Added: {self.added_count}")) + self.query_one("#status-other").update(T(f"Other: {self.other_count}")) + self.query_one("#status-errors").update(T(f"Errors: {self.error_count}")) + + if self.rc is not None: + self.watch_rc(self.rc) + else: + self.query_one("#status-rc").update(T("RC: RUNNING")) + + +class StandardLog(Vertical): + def compose(self) -> ComposeResult: + yield Static(T("Log"), classes="panel-title", id="standard-log-title") + yield RichLog(id="standard-log-content", highlight=False, markup=True, auto_scroll=True, max_lines=None) + + def update_title(self): + self.query_one("#standard-log-title").update(T("Log")) + + def add_line(self, line: str): + # TODO: make this more generic, use json output from borg. + # currently, this is only really useful for borg create/extract --list + line = line.rstrip() + if len(line) == 0: + return + + markup_tag = None + if len(line) >= 2: + if line[1] == " " and line[0] in "EAMUdcbs+-": + # looks like from borg create/extract --list + status_panel = self.app.query_one("#status") + status_panel.files_count += 1 + status = line[0] + if status == "E": + status_panel.error_count += 1 + elif status in "U-": + status_panel.unchanged_count += 1 + elif status in "M": + status_panel.modified_count += 1 + elif status in "A+": + status_panel.added_count += 1 + elif status in "dcbs": + status_panel.other_count += 1 + + markup_tag = { + "E": "red", # Error + "A": "white", # Added regular file (cache miss, slow!) + "M": "white", # Modified regular file (cache hit, but different, slow!) + "U": "green", # Updated regular file (cache hit) + "d": "green", # directory + "c": "green", # char device + "b": "green", # block device + "s": "green", # socket + "-": "white", # excluded + "+": "green", # included + }.get(status) + + log_widget = self.query_one("#standard-log-content") + + safe_line = escape(line) + if markup_tag: + safe_line = f"[{markup_tag}]{safe_line}[/]" + + log_widget.write(safe_line) + + +class Starfield(Static): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Generate a unique seed for this instance to ensure random + # distribution per session but stable appearance during resize. + self._seed = random.randint(0, 1000000) # nosec B311 - UI-only randomness, not for crypto + + def on_mount(self) -> None: + self.call_after_refresh(self._update_art) + + def on_resize(self, event) -> None: + self._update_art() + + def _update_art(self) -> None: + """Render starfield.""" + w, h = self.size + # Don't try to render if too small + if w < 10 or h < 5: + return + + # Use our instance seed to keep stars "static" (same pattern) during resize + random.seed(self._seed) + + star_density = 0.1 + big_star_chance = 0.1 + + from .theme import theme + + star_color = f"[{theme.variables['star-color']}]" + star_bright_color = f"[{theme.variables['star-bright-color']}]" + + # 1. Create canvas (Starfield) + canvas = [[(" ", "")] * w for _ in range(h)] + for y in range(h): + for x in range(w): + if random.random() < star_density: # nosec B311 - visual effect randomness + if random.random() < big_star_chance: # nosec B311 - visual effect randomness + char = "*" + color = star_bright_color + else: + char = random.choice([".", "·"]) # nosec B311 - visual effect randomness + color = star_color + canvas[y][x] = (char, color) + + # 2. Render to string + c_reset = "[/]" + final_lines = [] + for row in canvas: + line_str = "" + for char, color in row: + if char == " ": + line_str += " " + else: + line_str += f"{color}{escape(char)}{c_reset}" + final_lines.append(line_str) + + art_str = "\n".join(final_lines) + self.update(art_str) + + +class Pulsar(Static): + PULSAR_ART = "\n".join([" │ ", "─*─", " │ "]) + H = 3 + W = 3 + + def on_mount(self) -> None: + self.set_interval(4.0, self.pulse) + self.update_art() + + def pulse(self) -> None: + self.toggle_class("dim") + + def update_art(self) -> None: + self.update(self.PULSAR_ART) + + +class Slogan(Static): + SLOGAN = "**** You're welcome! ****" + H = 1 + W = len(SLOGAN) + + def on_mount(self) -> None: + self.update(self.SLOGAN) + self.set_interval(1.0, self.pulse) + + def pulse(self) -> None: + self.toggle_class("dim") + + def update_slogan(self): + self.update(T(self.SLOGAN)) + + +class Logo(Static): + BORG_ART = [ + "██████╗ ██████╗ ██████╗ ██████╗ ", + "██╔══██╗██╔═══██╗██╔══██╗██╔════╝ ", + "██████╔╝██║ ██║██████╔╝██║ ███╗", + "██╔══██╗██║ ██║██╔══██╗██║ ██║", + "██████╔╝╚██████╔╝██║ ██║╚██████╔╝", + "╚═════╝ ╚═════╝ ╚═╝ ╚═╝ ╚═════╝ ", + ] + H = len(BORG_ART) + W = max(len(line) for line in BORG_ART) + + def on_mount(self) -> None: + from .theme import theme + + logo_color = theme.variables["logo-color"] + + lines = [] + for line in self.BORG_ART: + lines.append(f"[bold {logo_color}]{escape(line)}[/]") + self.update("\n".join(lines)) + + +class LogoPanel(Container): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._seed = random.randint(0, 1000000) # nosec B311 - UI-only randomness, not for crypto + + def compose(self) -> ComposeResult: + yield Starfield() + yield Logo(id="logo") + yield Slogan(id="slogan") + yield Pulsar() + + def on_resize(self, event) -> None: + w, h = self.size + # Needs enough space to position reasonably + if w > 4 and h > 4: + random.seed(self._seed) + + # Exclusion Zone Calculation + # -------------------------- + + # Logo top-left + logo_y = (h - Logo.H) // 2 - 1 + logo_x = (w - Logo.W) // 2 + + # Slogan top-left + slogan_y = logo_y + Logo.H + 2 + slogan_x = (w - Slogan.W) // 2 + + # Forbidden area + # -------------- + # Combined rect over Logo and Slogan + f_y1 = logo_y + f_y2 = slogan_y + Slogan.H + f_x1 = min(logo_x, slogan_x) + f_x2 = max(logo_x + Logo.W, slogan_x + Slogan.W) + + # Update Logo and Slogan position + # Note: In the overlay layer, widgets stack vertically. + # Logo is at y=0 (height Logo.H). + # Slogan is at y=Logo.H (height Slogan.H). + # Pulsar is at y=Logo.H+Slogan.H (height Pulsar.H) + # We must subtract these flow positions from the desired absolute positions. + self.query_one(Logo).styles.offset = (logo_x, logo_y) + self.query_one(Slogan).styles.offset = (slogan_x, slogan_y - Logo.H) + + # Pulsar: styles.offset moves the top-left corner. + # So if offset is (px, py), it occupies x=[px, px+Pulsar.W), y=[py, py+Pulsar.H). + + # Find a valid Pulsar position + for _ in range(20): + # Random position + max_x = max(0, w - Pulsar.W) + max_y = max(0, h - Pulsar.H) + + px = random.randint(0, max_x) # nosec B311 - visual placement randomness + py = random.randint(0, max_y) # nosec B311 - visual placement randomness + + # Pulsar Rect: + p_x1, p_y1 = px, py + p_x2, p_y2 = px + Pulsar.W, py + Pulsar.H + + # Check intersection with forbidden rect + overlap_x = (p_x1 < f_x2) and (p_x2 > f_x1) + overlap_y = (p_y1 < f_y2) and (p_y2 > f_y1) + + if overlap_x and overlap_y: + continue # Try again + + # No overlap! + offset_x, offset_y = px, py - (Logo.H + Slogan.H) + break + else: + # Fallback if no safe spot found (e.g. screen too small): + # Place top-left or keep last valid. random 0,0 is safe-ish. + offset_x, offset_y = 0, 0 - (Logo.H + Slogan.H) + self.query_one(Pulsar).styles.offset = (offset_x, offset_y) + + +class SpeedSparkline(Static): + """ + Custom 4-line height sparkline. + """ + + HISTORY_SIZE = 99 + BLOCKS = [".", " ", "▂", "▃", "▄", "▅", "▆", "▇", "█"] + + def __init__(self, data: list[float] = None, **kwargs): + super().__init__(**kwargs) + self._data = data or [] + + def update_data(self, data: list[float]): + self._data = data + self.refresh_chart() + + def refresh_chart(self): + if not self._data: + self.update("") + return + + width = self.size.width or self.HISTORY_SIZE + # Slice data to width + dataset = self._data[-width:] + if not dataset: + self.update("") + return + + max_val = max(dataset) if dataset else 1.0 + max_val = max(max_val, 1.0) # Avoid div by zero + + # We have 4 lines, each can take 8 levels. Total 32 levels. + # Normalize each data point to 0..32 + + lines = [[], [], [], []] + + for val in dataset: + # Scale to 0-32 + scaled = (val / max_val) * 32 + + # Generate 4 stacked chars + for i in range(4): + # i=0 is top line, i=3 is bottom line + # Thresholds: Top(24), Mid-High(16), Mid-Low(8), Low(0) + threshold = (3 - i) * 8 + level = int(scaled - threshold) + level = max(0, min(8, level)) + lines[i].append(self.BLOCKS[level]) + + # Join lines + rows = ["".join(line) for line in lines] + self.update("\n".join(rows)) + + def on_resize(self, event): + self.refresh_chart()