From 7eab4d943d856d875bca262496f1ca310e7485af Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Fri, 23 May 2025 05:42:22 +0000 Subject: [PATCH 01/21] poc multiresolution --- runner/app/cfg/uvicorn_logging_config.json | 6 ++-- runner/app/live/pipelines/comfyui.py | 3 +- .../pipelines/comfyui_default_workflow.json | 6 ++-- runner/app/live/streamer/streamer.py | 6 ++-- runner/app/live/trickle/decoder.py | 30 +++++-------------- runner/app/live/trickle/encoder.py | 2 +- 6 files changed, 20 insertions(+), 33 deletions(-) diff --git a/runner/app/cfg/uvicorn_logging_config.json b/runner/app/cfg/uvicorn_logging_config.json index 9055e4049..0363ae38d 100644 --- a/runner/app/cfg/uvicorn_logging_config.json +++ b/runner/app/cfg/uvicorn_logging_config.json @@ -29,17 +29,17 @@ "handlers": [ "default" ], - "level": "INFO", + "level": "DEBUG", "propagate": false }, "uvicorn.error": { - "level": "INFO" + "level": "DEBUG" }, "uvicorn.access": { "handlers": [ "access" ], - "level": "INFO", + "level": "DEBUG", "propagate": false } } diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 2fe9655c3..1ec0c45d8 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -1,5 +1,6 @@ import os import json +import PIL import torch import asyncio from typing import Union @@ -63,7 +64,7 @@ async def initialize(self, **params): # Warm up the pipeline dummy_frame = VideoFrame(None, 0, 0) - dummy_frame.side_data.input = torch.randn(1, 512, 512, 3) + dummy_frame.side_data.input = torch.randn(1, 704, 384, 3) for _ in range(WARMUP_RUNS): self.client.put_video_input(dummy_frame) diff --git a/runner/app/live/pipelines/comfyui_default_workflow.json b/runner/app/live/pipelines/comfyui_default_workflow.json index 36ecd9f77..fe997eeb6 100644 --- a/runner/app/live/pipelines/comfyui_default_workflow.json +++ b/runner/app/live/pipelines/comfyui_default_workflow.json @@ -20,7 +20,7 @@ }, "3": { "inputs": { - "unet_name": "static-dreamshaper8_SD15_$stat-b-1-h-512-w-512_00001_.engine", + "unet_name": "static-dreamshaper8_SD15_$stat-b-1-h-704-w-384_00001_.engine", "model_type": "SD15" }, "class_type": "TensorRTLoader", @@ -146,8 +146,8 @@ }, "16": { "inputs": { - "width": 512, - "height": 512, + "width": 384, + "height": 704, "batch_size": 1 }, "class_type": "EmptyLatentImage", diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index 2fa7b1b38..e4377c537 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -55,11 +55,11 @@ async def start(self, params: dict): run_in_background("ingress_loop", self.run_ingress_loop()), run_in_background("egress_loop", self.run_egress_loop()), run_in_background("report_status_loop", self.report_status_loop()), - run_in_background("control_loop", self.run_control_loop()), - ] + ] # auxiliary tasks that are not critical to the supervisor, but which we want to run # TODO: maybe remove this since we had to move the control loop to main tasks - self.auxiliary_tasks: list[asyncio.Task] = [] + self.auxiliary_tasks: list[asyncio.Task] = [run_in_background("control_loop", self.run_control_loop()), + ] self.tasks_supervisor_task = run_in_background( "tasks_supervisor", self.tasks_supervisor() ) diff --git a/runner/app/live/trickle/decoder.py b/runner/app/live/trickle/decoder.py index 1f94f30a0..71c20a8ee 100644 --- a/runner/app/live/trickle/decoder.py +++ b/runner/app/live/trickle/decoder.py @@ -107,28 +107,14 @@ def decode_av(pipe_input, frame_callback, put_metadata): # not delayed, so use prev pts to allow more jitter next_pts_time = next_pts_time + frame_interval - h = 512 - w = int((512 * frame.width / frame.height) / 2) * 2 # force divisible by 2 - if frame.height > frame.width: - w = 512 - h = int((512 * frame.height / frame.width) / 2) * 2 - frame = reformatter.reformat(frame, format='rgba', width=w, height=h) - - image = frame.to_image() - if image.mode != "RGB": - image = image.convert("RGB") - width, height = image.size - if (width, height) != (512, 512): - # Crop to the center square if image not already square - square_size = 512 - start_x = width // 2 - square_size // 2 - start_y = height // 2 - square_size // 2 - image = image.crop((start_x, start_y, start_x + square_size, start_y + square_size)) - - image_np = np.array(image).astype(np.float32) / 255.0 - tensor = torch.tensor(image_np).unsqueeze(0) - - avframe = InputFrame.from_av_video(tensor, frame.pts, frame.time_base) + # h = 512 + # w = int((512 * frame.width / frame.height) / 2) * 2 # force divisible by 2 + # if frame.height > frame.width: + # w = 512 + # h = int((512 * frame.height / frame.width) / 2) * 2 + + frame = reformatter.reformat(frame, format='rgba', width=384, height=704) + avframe = InputFrame.from_av_video(frame) avframe.log_timestamps["frame_init"] = time.time() frame_callback(avframe) continue diff --git a/runner/app/live/trickle/encoder.py b/runner/app/live/trickle/encoder.py index 648c78952..d21c75424 100644 --- a/runner/app/live/trickle/encoder.py +++ b/runner/app/live/trickle/encoder.py @@ -56,7 +56,7 @@ def custom_io_open(url: str, flags: int, options: dict): if video_meta and video_codec: # Add a new stream to the output using the desired video codec - video_opts = { 'video_size':'512x512', 'bf':'0' } + video_opts = { 'video_size':'384x704', 'bf':'0' } if video_codec == 'libx264': video_opts = video_opts | { 'preset':'superfast', 'tune':'zerolatency', 'forced-idr':'1' } output_video_stream = output_container.add_stream(video_codec, options=video_opts) From 3d15ccb75c498f4032c42f8de166f5ab3d1196fb Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Fri, 23 May 2025 06:01:40 +0000 Subject: [PATCH 02/21] warmup pipeline based on workflow prompt --- runner/app/live/pipelines/comfyui.py | 45 ++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 1ec0c45d8..695099333 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -62,9 +62,19 @@ async def initialize(self, **params): await self.client.set_prompts([new_params.prompt]) self.params = new_params - # Warm up the pipeline + # Get dimensions from the workflow + if isinstance(new_params.prompt, dict): + width, height = ComfyUtils.get_latent_image_dimensions(new_params.prompt) + if width is None or height is None: + width, height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT # Default dimensions if not found in workflow + logging.warning(f"Could not find dimensions in workflow, using default {width}x{height}") + + width, height = width or ComfyUtils.DEFAULT_WIDTH, height or ComfyUtils.DEFAULT_HEIGHT + + # Warm up the pipeline with the workflow dimensions + logging.info(f"Warming up pipeline with dimensions: {width}x{height}") dummy_frame = VideoFrame(None, 0, 0) - dummy_frame.side_data.input = torch.randn(1, 704, 384, 3) + dummy_frame.side_data.input = torch.randn(1, height, width, 3) for _ in range(WARMUP_RUNS): self.client.put_video_input(dummy_frame) @@ -103,3 +113,34 @@ async def stop(self): logging.info("Stopping ComfyUI pipeline") await self.client.cleanup() logging.info("ComfyUI pipeline stopped") + +class ComfyUtils: + DEFAULT_WIDTH = 512 + DEFAULT_HEIGHT = 512 + + @staticmethod + def get_latent_image_dimensions(workflow: dict) -> tuple[int, int]: + """Get dimensions from the EmptyLatentImage node in the workflow. + + Args: + workflow: The workflow JSON dictionary + + Returns: + Tuple of (width, height) from the latent image. Returns default dimensions if not found or on error. + """ + try: + for node_id, node in workflow.items(): + if node.get("class_type") == "EmptyLatentImage": + inputs = node.get("inputs", {}) + width = inputs.get("width") + height = inputs.get("height") + if width is not None and height is not None: + return width, height + logging.warning("Incomplete dimensions in latent image node") + break + except Exception as e: + logging.warning(f"Failed to extract dimensions from workflow: {e}") + + # Return defaults if dimensions not found or on any error + logging.info(f"Using default dimensions {ComfyUtils.DEFAULT_WIDTH}x{ComfyUtils.DEFAULT_HEIGHT}") + return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT From 6cd09d36b04f4434c12c22ca6f70230442499989 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 27 May 2025 15:35:12 +0000 Subject: [PATCH 03/21] wip params update --- runner/app/live/api/api.py | 12 +++++++++- runner/app/live/pipelines/comfyui.py | 2 ++ runner/app/live/streamer/protocol/trickle.py | 24 +++++++++++++++++++- runner/app/live/streamer/streamer.py | 7 +++++- runner/app/live/trickle/encoder.py | 6 +++-- runner/app/live/trickle/media.py | 8 +++---- 6 files changed, 50 insertions(+), 9 deletions(-) diff --git a/runner/app/live/api/api.py b/runner/app/live/api/api.py index c656a9403..a35efb318 100644 --- a/runner/app/live/api/api.py +++ b/runner/app/live/api/api.py @@ -136,7 +136,17 @@ async def handle_start_stream(request: web.Request): params.stream_id, ) - await streamer.start(params.params) + # Get resolution from params dictionary with defaults + stream_params = { + 'width': params.params.get('width', 384), + 'height': params.params.get('height', 704) + } + + # Merge with any other params + stream_params.update(params.params) + logging.info(f"Starting stream with params: {stream_params}") + + await streamer.start(params.params, stream_params) request.app["streamer"] = streamer await protocol.emit_monitoring_event({ "type": "runner_receive_stream_request", diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 695099333..2b153feed 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -26,6 +26,8 @@ class Config: extra = "forbid" prompt: Union[str, dict] = DEFAULT_WORKFLOW_JSON + width: int = 384 + height: int = 704 @field_validator('prompt') @classmethod diff --git a/runner/app/live/streamer/protocol/trickle.py b/runner/app/live/streamer/protocol/trickle.py index e2178e182..850bb2aa0 100644 --- a/runner/app/live/streamer/protocol/trickle.py +++ b/runner/app/live/streamer/protocol/trickle.py @@ -23,6 +23,8 @@ def __init__(self, subscribe_url: str, publish_url: str, control_url: Optional[s self.events_publisher = None self.subscribe_task = None self.publish_task = None + self.width = 384 # Default values + self.height = 704 async def start(self): self.subscribe_queue = queue.Queue[InputFrame]() @@ -32,7 +34,7 @@ async def start(self): media.run_subscribe(self.subscribe_url, self.subscribe_queue.put, metadata_cache.put, self.emit_monitoring_event) ) self.publish_task = asyncio.create_task( - media.run_publish(self.publish_url, self.publish_queue.get, metadata_cache.get, self.emit_monitoring_event) + media.run_publish(self.publish_url, self.publish_queue.get, metadata_cache.get, self.emit_monitoring_event, width=self.width, height=self.height) ) if self.control_url and self.control_url.strip() != "": self.control_subscriber = TrickleSubscriber(self.control_url) @@ -124,6 +126,26 @@ async def control_loop(self, done: asyncio.Event) -> AsyncGenerator[dict, None]: # Ignore periodic keepalive messages continue + # Handle resolution changes + if 'width' in data or 'height' in data: + new_width = data.get('width', self.width) + new_height = data.get('height', self.height) + if new_width != self.width or new_height != self.height: + logging.info(f"Updating resolution from {self.width}x{self.height} to {new_width}x{new_height}") + self.width = new_width + self.height = new_height + # Restart publish task with new resolution + if self.publish_task: + self.publish_task.cancel() + try: + await self.publish_task + except asyncio.CancelledError: + pass + metadata_cache = LastValueCache[dict]() + self.publish_task = asyncio.create_task( + media.run_publish(self.publish_url, self.publish_queue.get, metadata_cache.get, self.emit_monitoring_event, width=self.width, height=self.height) + ) + logging.info("Received control message with params: %s", data) yield data diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index e4377c537..4f2bfc996 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -37,8 +37,13 @@ def __init__( self.request_id = request_id self.manifest_id = manifest_id self.stream_id = stream_id + self.width = 384 # Default values + self.height = 704 + + async def start(self, params: dict, stream_params: dict): + self.width = stream_params.get('width', self.width) + self.height = stream_params.get('height', self.height) - async def start(self, params: dict): if self.tasks_supervisor_task: raise RuntimeError("Streamer already started") diff --git a/runner/app/live/trickle/encoder.py b/runner/app/live/trickle/encoder.py index d21c75424..157ab0500 100644 --- a/runner/app/live/trickle/encoder.py +++ b/runner/app/live/trickle/encoder.py @@ -26,7 +26,9 @@ def encode_av( output_callback, get_metadata, video_codec: Optional[str] ='libx264', - audio_codec: Optional[str] ='libfdk_aac' + audio_codec: Optional[str] ='libfdk_aac', + width: Optional[int] = 384, + height: Optional[int] = 704 ): logging.info("Starting encoder") @@ -56,7 +58,7 @@ def custom_io_open(url: str, flags: int, options: dict): if video_meta and video_codec: # Add a new stream to the output using the desired video codec - video_opts = { 'video_size':'384x704', 'bf':'0' } + video_opts = { 'video_size':f'{width}x{height}', 'bf':'0' } if video_codec == 'libx264': video_opts = video_opts | { 'preset':'superfast', 'tune':'zerolatency', 'forced-idr':'1' } output_video_stream = output_container.add_stream(video_codec, options=video_opts) diff --git a/runner/app/live/trickle/media.py b/runner/app/live/trickle/media.py index 72ae59004..234d03bbe 100644 --- a/runner/app/live/trickle/media.py +++ b/runner/app/live/trickle/media.py @@ -112,13 +112,13 @@ def decode_runner(): loop = asyncio.get_running_loop() await loop.run_in_executor(None, decode_runner) -def encode_in(task_pipes, task_lock, image_generator, sync_callback, get_metadata, **kwargs): +def encode_in(task_pipes, task_lock, image_generator, sync_callback, get_metadata, width, height, **kwargs): # encode_av has a tendency to crash, so restart as necessary retryCount = 0 last_retry_time = time.time() while retryCount < MAX_ENCODER_RETRIES: try: - encode_av(image_generator, sync_callback, get_metadata, **kwargs) + encode_av(image_generator, sync_callback, get_metadata, width=width, height=height, **kwargs) break # clean exit except Exception as exc: current_time = time.time() @@ -146,7 +146,7 @@ def encode_in(task_pipes, task_lock, image_generator, sync_callback, get_metadat logging.exception("Error closing pipe on task list", stack_info=True) logging.info(f"Closed pipes - {pipe_count}/{total_pipes}") -async def run_publish(publish_url: str, image_generator, get_metadata, monitoring_callback): +async def run_publish(publish_url: str, image_generator, get_metadata, monitoring_callback, width, height): first_segment = True publisher = None @@ -205,7 +205,7 @@ def task_done(t2:asyncio.Task): return task_done2(task, pipe_writer) task.add_done_callback(task_done) - encode_thread = threading.Thread(target=encode_in, args=(live_pipes, live_tasks_lock, image_generator, sync_callback, get_metadata), kwargs={"audio_codec":"libopus"}) + encode_thread = threading.Thread(target=encode_in, args=(live_pipes, live_tasks_lock, image_generator, sync_callback, get_metadata), kwargs={"audio_codec":"libopus", "width": width, "height": height}) encode_thread.start() logging.debug("run_publish: encoder thread started") From 1389508e7f914b96a0c105b8df05edc086aaf347 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 27 May 2025 18:53:10 +0000 Subject: [PATCH 04/21] Pass output resolution from decoder to encoder, remove defaults --- runner/app/live/streamer/protocol/trickle.py | 41 ++++++++++---------- runner/app/live/trickle/decoder.py | 4 +- runner/app/live/trickle/encoder.py | 6 +-- runner/app/live/trickle/media.py | 16 ++++---- 4 files changed, 35 insertions(+), 32 deletions(-) diff --git a/runner/app/live/streamer/protocol/trickle.py b/runner/app/live/streamer/protocol/trickle.py index 850bb2aa0..47391fd4e 100644 --- a/runner/app/live/streamer/protocol/trickle.py +++ b/runner/app/live/streamer/protocol/trickle.py @@ -31,10 +31,10 @@ async def start(self): self.publish_queue = queue.Queue[OutputFrame]() metadata_cache = LastValueCache[dict]() # to pass video metadata from decoder to encoder self.subscribe_task = asyncio.create_task( - media.run_subscribe(self.subscribe_url, self.subscribe_queue.put, metadata_cache.put, self.emit_monitoring_event) + media.run_subscribe(self.subscribe_url, self.subscribe_queue.put, metadata_cache.put, self.emit_monitoring_event, self.width, self.height) ) self.publish_task = asyncio.create_task( - media.run_publish(self.publish_url, self.publish_queue.get, metadata_cache.get, self.emit_monitoring_event, width=self.width, height=self.height) + media.run_publish(self.publish_url, self.publish_queue.get, metadata_cache.get, self.emit_monitoring_event) ) if self.control_url and self.control_url.strip() != "": self.control_subscriber = TrickleSubscriber(self.control_url) @@ -127,24 +127,25 @@ async def control_loop(self, done: asyncio.Event) -> AsyncGenerator[dict, None]: continue # Handle resolution changes - if 'width' in data or 'height' in data: - new_width = data.get('width', self.width) - new_height = data.get('height', self.height) - if new_width != self.width or new_height != self.height: - logging.info(f"Updating resolution from {self.width}x{self.height} to {new_width}x{new_height}") - self.width = new_width - self.height = new_height - # Restart publish task with new resolution - if self.publish_task: - self.publish_task.cancel() - try: - await self.publish_task - except asyncio.CancelledError: - pass - metadata_cache = LastValueCache[dict]() - self.publish_task = asyncio.create_task( - media.run_publish(self.publish_url, self.publish_queue.get, metadata_cache.get, self.emit_monitoring_event, width=self.width, height=self.height) - ) + # TODO: This should be on the input (encode, subscribe), not output + # if 'width' in data or 'height' in data: + # new_width = data.get('width', self.width) + # new_height = data.get('height', self.height) + # if new_width != self.width or new_height != self.height: + # logging.info(f"Updating resolution from {self.width}x{self.height} to {new_width}x{new_height}") + # self.width = new_width + # self.height = new_height + # # Restart publish task with new resolution + # if self.publish_task: + # self.publish_task.cancel() + # try: + # await self.publish_task + # except asyncio.CancelledError: + # pass + # metadata_cache = LastValueCache[dict]() + # self.publish_task = asyncio.create_task( + # media.run_publish(self.publish_url, self.publish_queue.get, metadata_cache.get, self.emit_monitoring_event) + # ) logging.info("Received control message with params: %s", data) yield data diff --git a/runner/app/live/trickle/decoder.py b/runner/app/live/trickle/decoder.py index 71c20a8ee..70897bf26 100644 --- a/runner/app/live/trickle/decoder.py +++ b/runner/app/live/trickle/decoder.py @@ -11,7 +11,7 @@ MAX_FRAMERATE=24 -def decode_av(pipe_input, frame_callback, put_metadata): +def decode_av(pipe_input, frame_callback, put_metadata, output_width, output_height): """ Reads from a pipe (or file-like object). @@ -56,6 +56,8 @@ def decode_av(pipe_input, frame_callback, put_metadata): "sar": video_stream.codec_context.sample_aspect_ratio, "dar": video_stream.codec_context.display_aspect_ratio, "format": str(video_stream.codec_context.format), + "output_width": output_width, + "output_height": output_height, } if video_metadata is None and audio_metadata is None: diff --git a/runner/app/live/trickle/encoder.py b/runner/app/live/trickle/encoder.py index 157ab0500..38a924cec 100644 --- a/runner/app/live/trickle/encoder.py +++ b/runner/app/live/trickle/encoder.py @@ -27,8 +27,6 @@ def encode_av( get_metadata, video_codec: Optional[str] ='libx264', audio_codec: Optional[str] ='libfdk_aac', - width: Optional[int] = 384, - height: Optional[int] = 704 ): logging.info("Starting encoder") @@ -58,7 +56,9 @@ def custom_io_open(url: str, flags: int, options: dict): if video_meta and video_codec: # Add a new stream to the output using the desired video codec - video_opts = { 'video_size':f'{width}x{height}', 'bf':'0' } + output_width = video_meta['output_width'] + output_height = video_meta['output_height'] + video_opts = { 'video_size':f'{output_width}x{output_height}', 'bf':'0' } if video_codec == 'libx264': video_opts = video_opts | { 'preset':'superfast', 'tune':'zerolatency', 'forced-idr':'1' } output_video_stream = output_container.add_stream(video_codec, options=video_opts) diff --git a/runner/app/live/trickle/media.py b/runner/app/live/trickle/media.py index 234d03bbe..2c401cc65 100644 --- a/runner/app/live/trickle/media.py +++ b/runner/app/live/trickle/media.py @@ -16,12 +16,12 @@ MAX_ENCODER_RETRIES = 3 ENCODER_RETRY_RESET_SECONDS = 120 # reset retry counter after 2 minutes -async def run_subscribe(subscribe_url: str, image_callback, put_metadata, monitoring_callback): +async def run_subscribe(subscribe_url: str, image_callback, put_metadata, monitoring_callback, output_width, output_height): # TODO add some pre-processing parameters, eg image size try: in_pipe, out_pipe = os.pipe() write_fd = await AsyncifyFdWriter(out_pipe) - parse_task = asyncio.create_task(decode_in(in_pipe, image_callback, put_metadata, write_fd)) + parse_task = asyncio.create_task(decode_in(in_pipe, image_callback, put_metadata, write_fd, output_width, output_height)) subscribe_task = asyncio.create_task(subscribe(subscribe_url, write_fd, monitoring_callback)) await asyncio.gather(subscribe_task, parse_task) logging.info("run_subscribe complete") @@ -74,13 +74,13 @@ async def AsyncifyFdWriter(write_fd): writer = asyncio.StreamWriter(write_transport, write_protocol, None, loop) return writer -async def decode_in(in_pipe, frame_callback, put_metadata, write_fd): +async def decode_in(in_pipe, frame_callback, put_metadata, write_fd, output_width, output_height): def decode_runner(): retry_count = 0 last_retry_time = time.time() while retry_count < MAX_DECODER_RETRIES: try: - decode_av(f"pipe:{in_pipe}", frame_callback, put_metadata) + decode_av(f"pipe:{in_pipe}", frame_callback, put_metadata, output_width, output_height) break # clean exit except Exception as e: msg = str(e) @@ -112,13 +112,13 @@ def decode_runner(): loop = asyncio.get_running_loop() await loop.run_in_executor(None, decode_runner) -def encode_in(task_pipes, task_lock, image_generator, sync_callback, get_metadata, width, height, **kwargs): +def encode_in(task_pipes, task_lock, image_generator, sync_callback, get_metadata, **kwargs): # encode_av has a tendency to crash, so restart as necessary retryCount = 0 last_retry_time = time.time() while retryCount < MAX_ENCODER_RETRIES: try: - encode_av(image_generator, sync_callback, get_metadata, width=width, height=height, **kwargs) + encode_av(image_generator, sync_callback, get_metadata, **kwargs) break # clean exit except Exception as exc: current_time = time.time() @@ -146,7 +146,7 @@ def encode_in(task_pipes, task_lock, image_generator, sync_callback, get_metadat logging.exception("Error closing pipe on task list", stack_info=True) logging.info(f"Closed pipes - {pipe_count}/{total_pipes}") -async def run_publish(publish_url: str, image_generator, get_metadata, monitoring_callback, width, height): +async def run_publish(publish_url: str, image_generator, get_metadata, monitoring_callback): first_segment = True publisher = None @@ -205,7 +205,7 @@ def task_done(t2:asyncio.Task): return task_done2(task, pipe_writer) task.add_done_callback(task_done) - encode_thread = threading.Thread(target=encode_in, args=(live_pipes, live_tasks_lock, image_generator, sync_callback, get_metadata), kwargs={"audio_codec":"libopus", "width": width, "height": height}) + encode_thread = threading.Thread(target=encode_in, args=(live_pipes, live_tasks_lock, image_generator, sync_callback, get_metadata), kwargs={"audio_codec":"libopus"}) encode_thread.start() logging.debug("run_publish: encoder thread started") From 9ac60ae86a2569c70dbb2fa10f90bb0599be3a0d Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 27 May 2025 21:19:59 +0000 Subject: [PATCH 05/21] (wip) dynamic resolution --- runner/app/cfg/uvicorn_logging_config.json | 6 +- runner/app/live/api/api.py | 12 +-- runner/app/live/pipelines/comfyui.py | 99 ++++++++++++++-------- runner/app/live/streamer/process.py | 49 +++++++++++ runner/app/live/streamer/streamer.py | 11 ++- runner/app/live/trickle/decoder.py | 2 +- 6 files changed, 126 insertions(+), 53 deletions(-) diff --git a/runner/app/cfg/uvicorn_logging_config.json b/runner/app/cfg/uvicorn_logging_config.json index 0363ae38d..9055e4049 100644 --- a/runner/app/cfg/uvicorn_logging_config.json +++ b/runner/app/cfg/uvicorn_logging_config.json @@ -29,17 +29,17 @@ "handlers": [ "default" ], - "level": "DEBUG", + "level": "INFO", "propagate": false }, "uvicorn.error": { - "level": "DEBUG" + "level": "INFO" }, "uvicorn.access": { "handlers": [ "access" ], - "level": "DEBUG", + "level": "INFO", "propagate": false } } diff --git a/runner/app/live/api/api.py b/runner/app/live/api/api.py index a35efb318..c656a9403 100644 --- a/runner/app/live/api/api.py +++ b/runner/app/live/api/api.py @@ -136,17 +136,7 @@ async def handle_start_stream(request: web.Request): params.stream_id, ) - # Get resolution from params dictionary with defaults - stream_params = { - 'width': params.params.get('width', 384), - 'height': params.params.get('height', 704) - } - - # Merge with any other params - stream_params.update(params.params) - logging.info(f"Starting stream with params: {stream_params}") - - await streamer.start(params.params, stream_params) + await streamer.start(params.params) request.app["streamer"] = streamer await protocol.emit_monitoring_event({ "type": "runner_receive_stream_request", diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 2b153feed..a52bbad00 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -1,6 +1,5 @@ import os import json -import PIL import torch import asyncio from typing import Union @@ -15,19 +14,54 @@ COMFY_UI_WORKSPACE_ENV = "COMFY_UI_WORKSPACE" WARMUP_RUNS = 1 +class ComfyUtils: + DEFAULT_WIDTH = 384 + DEFAULT_HEIGHT = 704 + @staticmethod + def get_default_workflow_json(): + _default_workflow_path = pathlib.Path(__file__).parent.absolute() / "comfyui_default_workflow.json" + with open(_default_workflow_path, 'r') as f: + return json.load(f) + + @staticmethod + def get_latent_image_dimensions(workflow: dict | None) -> tuple[int, int]: + """Get dimensions from the EmptyLatentImage node in the workflow. + + Args: + workflow: The workflow JSON dictionary + + Returns: + Tuple of (width, height) from the latent image. Returns default dimensions if not found or on error. + """ -_default_workflow_path = pathlib.Path(__file__).parent.absolute() / "comfyui_default_workflow.json" -with open(_default_workflow_path, 'r') as f: - DEFAULT_WORKFLOW_JSON = json.load(f) + if workflow is None: + return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT + + try: + for node_id, node in workflow.items(): + if node.get("class_type") == "EmptyLatentImage": + inputs = node.get("inputs", {}) + width = inputs.get("width") + height = inputs.get("height") + if width is not None and height is not None: + return width, height + logging.warning("Incomplete dimensions in latent image node") + break + except Exception as e: + logging.warning(f"Failed to extract dimensions from workflow: {e}") + + # Return defaults if dimensions not found or on any error + logging.info(f"Using default dimensions {ComfyUtils.DEFAULT_WIDTH}x{ComfyUtils.DEFAULT_HEIGHT}") + return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT +# Get the default workflow json during startup +DEFAULT_WORKFLOW_JSON = ComfyUtils.get_default_workflow_json() class ComfyUIParams(BaseModel): class Config: extra = "forbid" prompt: Union[str, dict] = DEFAULT_WORKFLOW_JSON - width: int = 384 - height: int = 704 @field_validator('prompt') @classmethod @@ -56,6 +90,8 @@ def __init__(self): self.client = ComfyStreamClient(cwd=comfy_ui_workspace) self.params: ComfyUIParams self.video_incoming_frames: asyncio.Queue[VideoOutput] = asyncio.Queue() + self.width = ComfyUtils.DEFAULT_WIDTH + self.height = ComfyUtils.DEFAULT_HEIGHT async def initialize(self, **params): new_params = ComfyUIParams(**params) @@ -63,7 +99,7 @@ async def initialize(self, **params): # TODO: currently its a single prompt, but need to support multiple prompts await self.client.set_prompts([new_params.prompt]) self.params = new_params - + # Get dimensions from the workflow if isinstance(new_params.prompt, dict): width, height = ComfyUtils.get_latent_image_dimensions(new_params.prompt) @@ -116,33 +152,28 @@ async def stop(self): await self.client.cleanup() logging.info("ComfyUI pipeline stopped") -class ComfyUtils: - DEFAULT_WIDTH = 512 - DEFAULT_HEIGHT = 512 - - @staticmethod - def get_latent_image_dimensions(workflow: dict) -> tuple[int, int]: - """Get dimensions from the EmptyLatentImage node in the workflow. + # async def get_latent_image_dimensions(self, workflow: dict) -> tuple[int, int] | None: + # """Get dimensions from the EmptyLatentImage node in the workflow. - Args: - workflow: The workflow JSON dictionary + # Args: + # workflow: The workflow JSON dictionary - Returns: - Tuple of (width, height) from the latent image. Returns default dimensions if not found or on error. - """ - try: - for node_id, node in workflow.items(): - if node.get("class_type") == "EmptyLatentImage": - inputs = node.get("inputs", {}) - width = inputs.get("width") - height = inputs.get("height") - if width is not None and height is not None: - return width, height - logging.warning("Incomplete dimensions in latent image node") - break - except Exception as e: - logging.warning(f"Failed to extract dimensions from workflow: {e}") + # Returns: + # Tuple of (width, height) from the latent image. Returns default dimensions if not found or on error. + # """ + # try: + # for node_id, node in workflow.items(): + # if node.get("class_type") == "EmptyLatentImage": + # inputs = node.get("inputs", {}) + # width = inputs.get("width") + # height = inputs.get("height") + # if width is not None and height is not None: + # return width, height + # logging.warning("Incomplete dimensions in latent image node") + # break + # except Exception as e: + # logging.warning(f"Failed to extract dimensions from workflow: {e}") - # Return defaults if dimensions not found or on any error - logging.info(f"Using default dimensions {ComfyUtils.DEFAULT_WIDTH}x{ComfyUtils.DEFAULT_HEIGHT}") - return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT + # # Return defaults if dimensions not found or on any error + # logging.info(f"Using default dimensions {DEFAULT_WIDTH}x{DEFAULT_HEIGHT}") + # return DEFAULT_WIDTH, DEFAULT_HEIGHT diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index a11c0958b..d5977baba 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -166,6 +166,20 @@ async def _initialize_pipeline(self): with log_timing(f"PipelineProcess: Pipeline loading with {params}"): pipeline = load_pipeline(self.pipeline_name) + if params.get('prompt') is None: + # Parse resolution from the default workflow + width, height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT + else: + # Parse resolution from the provided workflow + prompt = params.get('prompt') + if prompt is type(dict): + width, height = ComfyUtils.get_latent_image_dimensions(prompt) + + # TODO pass through to streamer + # params.update({ + # 'width': width, + # 'height': height + # }) await pipeline.initialize(**params) return pipeline except Exception as e: @@ -332,3 +346,38 @@ def clear_queue(queue): queue.get_nowait() # Remove items without blocking except Exception as e: logging.error(f"Error while clearing queue: {e}") + +class ComfyUtils: + DEFAULT_WIDTH = 384 + DEFAULT_HEIGHT = 704 + + @staticmethod + def get_latent_image_dimensions(workflow: dict | None) -> tuple[int, int]: + """Get dimensions from the EmptyLatentImage node in the workflow. + + Args: + workflow: The workflow JSON dictionary + + Returns: + Tuple of (width, height) from the latent image. Returns default dimensions if not found or on error. + """ + + if workflow is None: + return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT + + try: + for node_id, node in workflow.items(): + if node.get("class_type") == "EmptyLatentImage": + inputs = node.get("inputs", {}) + width = inputs.get("width") + height = inputs.get("height") + if width is not None and height is not None: + return width, height + logging.warning("Incomplete dimensions in latent image node") + break + except Exception as e: + logging.warning(f"Failed to extract dimensions from workflow: {e}") + + # Return defaults if dimensions not found or on any error + logging.info(f"Using default dimensions {ComfyUtils.DEFAULT_WIDTH}x{ComfyUtils.DEFAULT_HEIGHT}") + return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index 4f2bfc996..bd2bea0ef 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -37,12 +37,15 @@ def __init__( self.request_id = request_id self.manifest_id = manifest_id self.stream_id = stream_id - self.width = 384 # Default values + self.width = 384 self.height = 704 - async def start(self, params: dict, stream_params: dict): - self.width = stream_params.get('width', self.width) - self.height = stream_params.get('height', self.height) + async def start(self, params: dict): + + #TODO: parse from request params + #if params.get('prompt'): + # prompt = params.get('prompt') + # self.width, self.height = ComfyUtils.get_latent_image_dimensions(prompt) if self.tasks_supervisor_task: raise RuntimeError("Streamer already started") diff --git a/runner/app/live/trickle/decoder.py b/runner/app/live/trickle/decoder.py index 70897bf26..ec050192d 100644 --- a/runner/app/live/trickle/decoder.py +++ b/runner/app/live/trickle/decoder.py @@ -115,7 +115,7 @@ def decode_av(pipe_input, frame_callback, put_metadata, output_width, output_hei # w = 512 # h = int((512 * frame.height / frame.width) / 2) * 2 - frame = reformatter.reformat(frame, format='rgba', width=384, height=704) + frame = reformatter.reformat(frame, format='rgba', width=output_width, height=output_height) avframe = InputFrame.from_av_video(frame) avframe.log_timestamps["frame_init"] = time.time() frame_callback(avframe) From 99387abda258a9070a3cbe2ebe9d1d1e1300328c Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 27 May 2025 23:34:50 +0000 Subject: [PATCH 06/21] use resolution from prompt --- runner/app/live/pipelines/comfyui.py | 75 +++----------------- runner/app/live/streamer/process.py | 53 ++------------ runner/app/live/streamer/protocol/trickle.py | 27 ++----- runner/app/live/streamer/streamer.py | 18 +++-- runner/app/live/utils/__init__.py | 3 + runner/app/live/utils/comfy_utils.py | 36 ++++++++++ 6 files changed, 66 insertions(+), 146 deletions(-) create mode 100644 runner/app/live/utils/__init__.py create mode 100644 runner/app/live/utils/comfy_utils.py diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index a52bbad00..86112ba36 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -9,53 +9,20 @@ from .interface import Pipeline from comfystream.client import ComfyStreamClient from trickle import VideoFrame, VideoOutput +from utils import ComfyUtils import logging COMFY_UI_WORKSPACE_ENV = "COMFY_UI_WORKSPACE" WARMUP_RUNS = 1 -class ComfyUtils: - DEFAULT_WIDTH = 384 - DEFAULT_HEIGHT = 704 - @staticmethod - def get_default_workflow_json(): - _default_workflow_path = pathlib.Path(__file__).parent.absolute() / "comfyui_default_workflow.json" - with open(_default_workflow_path, 'r') as f: - return json.load(f) - - @staticmethod - def get_latent_image_dimensions(workflow: dict | None) -> tuple[int, int]: - """Get dimensions from the EmptyLatentImage node in the workflow. - - Args: - workflow: The workflow JSON dictionary - - Returns: - Tuple of (width, height) from the latent image. Returns default dimensions if not found or on error. - """ - - if workflow is None: - return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT - - try: - for node_id, node in workflow.items(): - if node.get("class_type") == "EmptyLatentImage": - inputs = node.get("inputs", {}) - width = inputs.get("width") - height = inputs.get("height") - if width is not None and height is not None: - return width, height - logging.warning("Incomplete dimensions in latent image node") - break - except Exception as e: - logging.warning(f"Failed to extract dimensions from workflow: {e}") - - # Return defaults if dimensions not found or on any error - logging.info(f"Using default dimensions {ComfyUtils.DEFAULT_WIDTH}x{ComfyUtils.DEFAULT_HEIGHT}") - return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT + +def get_default_workflow_json(): + _default_workflow_path = pathlib.Path(__file__).parent.absolute() / "comfyui_default_workflow.json" + with open(_default_workflow_path, 'r') as f: + return json.load(f) # Get the default workflow json during startup -DEFAULT_WORKFLOW_JSON = ComfyUtils.get_default_workflow_json() +DEFAULT_WORKFLOW_JSON = get_default_workflow_json() class ComfyUIParams(BaseModel): class Config: @@ -150,30 +117,4 @@ async def update_params(self, **params): async def stop(self): logging.info("Stopping ComfyUI pipeline") await self.client.cleanup() - logging.info("ComfyUI pipeline stopped") - - # async def get_latent_image_dimensions(self, workflow: dict) -> tuple[int, int] | None: - # """Get dimensions from the EmptyLatentImage node in the workflow. - - # Args: - # workflow: The workflow JSON dictionary - - # Returns: - # Tuple of (width, height) from the latent image. Returns default dimensions if not found or on error. - # """ - # try: - # for node_id, node in workflow.items(): - # if node.get("class_type") == "EmptyLatentImage": - # inputs = node.get("inputs", {}) - # width = inputs.get("width") - # height = inputs.get("height") - # if width is not None and height is not None: - # return width, height - # logging.warning("Incomplete dimensions in latent image node") - # break - # except Exception as e: - # logging.warning(f"Failed to extract dimensions from workflow: {e}") - - # # Return defaults if dimensions not found or on any error - # logging.info(f"Using default dimensions {DEFAULT_WIDTH}x{DEFAULT_HEIGHT}") - # return DEFAULT_WIDTH, DEFAULT_HEIGHT + logging.info("ComfyUI pipeline stopped") \ No newline at end of file diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index d5977baba..0f691d0f2 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -11,6 +11,7 @@ from pipelines import load_pipeline, Pipeline from log import config_logging, config_logging_fields, log_timing from trickle import InputFrame, AudioFrame, VideoFrame, OutputFrame, VideoOutput, AudioOutput +from utils import ComfyUtils class PipelineProcess: @staticmethod @@ -166,20 +167,9 @@ async def _initialize_pipeline(self): with log_timing(f"PipelineProcess: Pipeline loading with {params}"): pipeline = load_pipeline(self.pipeline_name) - if params.get('prompt') is None: - # Parse resolution from the default workflow - width, height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT - else: - # Parse resolution from the provided workflow - prompt = params.get('prompt') - if prompt is type(dict): - width, height = ComfyUtils.get_latent_image_dimensions(prompt) - - # TODO pass through to streamer - # params.update({ - # 'width': width, - # 'height': height - # }) + + # TODO: We may need to call reset_stream when resolution is changed and start the pipeline again + # Changing the engine causes issues, maybe cleanup related await pipeline.initialize(**params) return pipeline except Exception as e: @@ -346,38 +336,3 @@ def clear_queue(queue): queue.get_nowait() # Remove items without blocking except Exception as e: logging.error(f"Error while clearing queue: {e}") - -class ComfyUtils: - DEFAULT_WIDTH = 384 - DEFAULT_HEIGHT = 704 - - @staticmethod - def get_latent_image_dimensions(workflow: dict | None) -> tuple[int, int]: - """Get dimensions from the EmptyLatentImage node in the workflow. - - Args: - workflow: The workflow JSON dictionary - - Returns: - Tuple of (width, height) from the latent image. Returns default dimensions if not found or on error. - """ - - if workflow is None: - return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT - - try: - for node_id, node in workflow.items(): - if node.get("class_type") == "EmptyLatentImage": - inputs = node.get("inputs", {}) - width = inputs.get("width") - height = inputs.get("height") - if width is not None and height is not None: - return width, height - logging.warning("Incomplete dimensions in latent image node") - break - except Exception as e: - logging.warning(f"Failed to extract dimensions from workflow: {e}") - - # Return defaults if dimensions not found or on any error - logging.info(f"Using default dimensions {ComfyUtils.DEFAULT_WIDTH}x{ComfyUtils.DEFAULT_HEIGHT}") - return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT diff --git a/runner/app/live/streamer/protocol/trickle.py b/runner/app/live/streamer/protocol/trickle.py index 47391fd4e..ddd0cb04e 100644 --- a/runner/app/live/streamer/protocol/trickle.py +++ b/runner/app/live/streamer/protocol/trickle.py @@ -7,7 +7,7 @@ from PIL import Image from trickle import media, TricklePublisher, TrickleSubscriber, InputFrame, OutputFrame, AudioFrame, AudioOutput - +from utils import ComfyUtils from .protocol import StreamProtocol from .last_value_cache import LastValueCache @@ -23,8 +23,8 @@ def __init__(self, subscribe_url: str, publish_url: str, control_url: Optional[s self.events_publisher = None self.subscribe_task = None self.publish_task = None - self.width = 384 # Default values - self.height = 704 + self.width = ComfyUtils.DEFAULT_WIDTH + self.height = ComfyUtils.DEFAULT_HEIGHT async def start(self): self.subscribe_queue = queue.Queue[InputFrame]() @@ -126,26 +126,7 @@ async def control_loop(self, done: asyncio.Event) -> AsyncGenerator[dict, None]: # Ignore periodic keepalive messages continue - # Handle resolution changes - # TODO: This should be on the input (encode, subscribe), not output - # if 'width' in data or 'height' in data: - # new_width = data.get('width', self.width) - # new_height = data.get('height', self.height) - # if new_width != self.width or new_height != self.height: - # logging.info(f"Updating resolution from {self.width}x{self.height} to {new_width}x{new_height}") - # self.width = new_width - # self.height = new_height - # # Restart publish task with new resolution - # if self.publish_task: - # self.publish_task.cancel() - # try: - # await self.publish_task - # except asyncio.CancelledError: - # pass - # metadata_cache = LastValueCache[dict]() - # self.publish_task = asyncio.create_task( - # media.run_publish(self.publish_url, self.publish_queue.get, metadata_cache.get, self.emit_monitoring_event) - # ) + # TODO: handle prompt changes with differing resolution logging.info("Received control message with params: %s", data) yield data diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index bd2bea0ef..90245d310 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -13,6 +13,7 @@ from .protocol.protocol import StreamProtocol from .status import timestamp_to_ms from trickle import AudioFrame, VideoFrame, OutputFrame, AudioOutput, VideoOutput +from utils import ComfyUtils fps_log_interval = 10 status_report_interval = 10 @@ -37,15 +38,18 @@ def __init__( self.request_id = request_id self.manifest_id = manifest_id self.stream_id = stream_id - self.width = 384 - self.height = 704 + self.width = ComfyUtils.DEFAULT_WIDTH + self.height = ComfyUtils.DEFAULT_HEIGHT async def start(self, params: dict): - - #TODO: parse from request params - #if params.get('prompt'): - # prompt = params.get('prompt') - # self.width, self.height = ComfyUtils.get_latent_image_dimensions(prompt) + # Parse expected input resolution from workflow prompt + try: + if params.get('prompt'): + prompt = params.get('prompt') + self.width, self.height = ComfyUtils.get_latent_image_dimensions(prompt) + except Exception as e: + logging.error(f"Error parsing resolution from prompt, using default dimensions {ComfyUtils.DEFAULT_WIDTH}x{ComfyUtils.DEFAULT_HEIGHT}: {e}") + self.width, self.height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT if self.tasks_supervisor_task: raise RuntimeError("Streamer already started") diff --git a/runner/app/live/utils/__init__.py b/runner/app/live/utils/__init__.py new file mode 100644 index 000000000..f04013e59 --- /dev/null +++ b/runner/app/live/utils/__init__.py @@ -0,0 +1,3 @@ +from .comfy_utils import ComfyUtils + +__all__ = ['ComfyUtils'] \ No newline at end of file diff --git a/runner/app/live/utils/comfy_utils.py b/runner/app/live/utils/comfy_utils.py new file mode 100644 index 000000000..018669297 --- /dev/null +++ b/runner/app/live/utils/comfy_utils.py @@ -0,0 +1,36 @@ +import logging + +class ComfyUtils: + DEFAULT_WIDTH = 512 + DEFAULT_HEIGHT = 512 + + @staticmethod + def get_latent_image_dimensions(workflow: dict | None) -> tuple[int, int]: + """Get dimensions from the EmptyLatentImage node in the workflow. + + Args: + workflow: The workflow JSON dictionary + + Returns: + Tuple of (width, height) from the latent image. Returns default dimensions if not found or on error. + """ + + if workflow is None: + return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT + + try: + for node_id, node in workflow.items(): + if node.get("class_type") == "EmptyLatentImage": + inputs = node.get("inputs", {}) + width = inputs.get("width") + height = inputs.get("height") + if width is not None and height is not None: + return width, height + logging.warning("Incomplete dimensions in latent image node") + break + except Exception as e: + logging.warning(f"Failed to extract dimensions from workflow: {e}") + + # Return defaults if dimensions not found or on any error + logging.info(f"Using default dimensions {ComfyUtils.DEFAULT_WIDTH}x{ComfyUtils.DEFAULT_HEIGHT}") + return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT \ No newline at end of file From 8b803a57a1324f6034f0b52bfff4f7d1b1933acb Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Wed, 28 May 2025 00:19:35 +0000 Subject: [PATCH 07/21] fix latent dimension parsing on string --- runner/app/live/utils/comfy_utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/runner/app/live/utils/comfy_utils.py b/runner/app/live/utils/comfy_utils.py index 018669297..17a8fcd62 100644 --- a/runner/app/live/utils/comfy_utils.py +++ b/runner/app/live/utils/comfy_utils.py @@ -1,11 +1,12 @@ import logging +import json class ComfyUtils: DEFAULT_WIDTH = 512 DEFAULT_HEIGHT = 512 @staticmethod - def get_latent_image_dimensions(workflow: dict | None) -> tuple[int, int]: + def get_latent_image_dimensions(workflow: dict | str | None) -> tuple[int, int]: """Get dimensions from the EmptyLatentImage node in the workflow. Args: @@ -18,6 +19,9 @@ def get_latent_image_dimensions(workflow: dict | None) -> tuple[int, int]: if workflow is None: return ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT + if isinstance(workflow, str): + workflow = json.loads(workflow) + try: for node_id, node in workflow.items(): if node.get("class_type") == "EmptyLatentImage": From 84690f0b5d57b102cfb4433bf3df10478b7a6ae8 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Wed, 28 May 2025 02:37:32 +0000 Subject: [PATCH 08/21] restart pipeline process on resolution change --- runner/app/live/streamer/process_guardian.py | 28 ++++++++++++++++++++ runner/app/live/streamer/protocol/trickle.py | 6 ++--- runner/app/live/streamer/streamer.py | 15 +++++++++-- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/runner/app/live/streamer/process_guardian.py b/runner/app/live/streamer/process_guardian.py index eb3fe933c..486046784 100644 --- a/runner/app/live/streamer/process_guardian.py +++ b/runner/app/live/streamer/process_guardian.py @@ -7,6 +7,7 @@ from trickle import InputFrame, OutputFrame from .process import PipelineProcess from .status import PipelineState, PipelineStatus, InferenceStatus, InputStatus +from utils import ComfyUtils FPS_LOG_INTERVAL = 10.0 @@ -19,6 +20,9 @@ class StreamerCallbacks(abc.ABC): @abc.abstractmethod def is_stream_running(self) -> bool: ... + + +class ProcessCallbacks(abc.ABC): @abc.abstractmethod async def emit_monitoring_event(self, event_data: dict) -> None: ... @@ -44,6 +48,7 @@ def __init__( ): self.pipeline = pipeline self.initial_params = params + self.width, self.height = ComfyUtils.get_latent_image_dimensions(params.get('prompt')) self.streamer: StreamerCallbacks = _NoopStreamerCallbacks() self.process: Optional[PipelineProcess] = None @@ -82,6 +87,27 @@ async def reset_stream( ): if not self.process: raise RuntimeError("Process not running") + + # Check if resolution has changed + width = params.pop("width", None) + height = params.pop("height", None) + if (width is None or height is None): + new_width, new_height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT + if params.get('prompt'): + try: + new_width, new_height = ComfyUtils.get_latent_image_dimensions(params.get('prompt')) + except Exception as e: + logging.error(f"Error parsing resolution from prompt, using default dimensions: {e}") + + # If resolution changed, we need to restart the process + if (new_width != self.width or new_height != self.height): + logging.info(f"Resolution changed from {self.width}x{self.height} to {new_width}x{new_height}, restarting process") + self.width = new_width + self.height = new_height + await self.process.stop() + # Create new process with current pipeline name and params + self.process = PipelineProcess.start(self.pipeline, params) + self.status.start_time = time.time() self.status.input_status = InputStatus() self.input_fps_counter.reset() @@ -89,6 +115,8 @@ async def reset_stream( self.streamer = streamer or _NoopStreamerCallbacks() self.process.reset_stream(request_id, manifest_id, stream_id) + self.process.update_params(params) + await self.update_params(params) self.status.update_state(PipelineState.ONLINE) diff --git a/runner/app/live/streamer/protocol/trickle.py b/runner/app/live/streamer/protocol/trickle.py index ddd0cb04e..ec0690172 100644 --- a/runner/app/live/streamer/protocol/trickle.py +++ b/runner/app/live/streamer/protocol/trickle.py @@ -12,7 +12,7 @@ from .last_value_cache import LastValueCache class TrickleProtocol(StreamProtocol): - def __init__(self, subscribe_url: str, publish_url: str, control_url: Optional[str] = None, events_url: Optional[str] = None): + def __init__(self, subscribe_url: str, publish_url: str, control_url: Optional[str] = None, events_url: Optional[str] = None, width: Optional[int] = ComfyUtils.DEFAULT_WIDTH, height: Optional[int] = ComfyUtils.DEFAULT_HEIGHT): self.subscribe_url = subscribe_url self.publish_url = publish_url self.control_url = control_url @@ -23,8 +23,8 @@ def __init__(self, subscribe_url: str, publish_url: str, control_url: Optional[s self.events_publisher = None self.subscribe_task = None self.publish_task = None - self.width = ComfyUtils.DEFAULT_WIDTH - self.height = ComfyUtils.DEFAULT_HEIGHT + self.width = width + self.height = height async def start(self): self.subscribe_queue = queue.Queue[InputFrame]() diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index 90245d310..1988dee45 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -26,6 +26,8 @@ def __init__( request_id: str, manifest_id: str, stream_id: str, + width: int = ComfyUtils.DEFAULT_WIDTH, + height: int = ComfyUtils.DEFAULT_HEIGHT, ): self.protocol = protocol self.process = process @@ -38,8 +40,8 @@ def __init__( self.request_id = request_id self.manifest_id = manifest_id self.stream_id = stream_id - self.width = ComfyUtils.DEFAULT_WIDTH - self.height = ComfyUtils.DEFAULT_HEIGHT + self.width = width + self.height = height async def start(self, params: dict): # Parse expected input resolution from workflow prompt @@ -47,6 +49,8 @@ async def start(self, params: dict): if params.get('prompt'): prompt = params.get('prompt') self.width, self.height = ComfyUtils.get_latent_image_dimensions(prompt) + params.update({"width": self.width, "height": self.height}) + logging.info(f"Streamer: Using dimensions {self.width}x{self.height} from workflow") except Exception as e: logging.error(f"Error parsing resolution from prompt, using default dimensions {ComfyUtils.DEFAULT_WIDTH}x{ComfyUtils.DEFAULT_HEIGHT}: {e}") self.width, self.height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT @@ -58,7 +62,14 @@ async def start(self, params: dict): self.request_id, self.manifest_id, self.stream_id, params, self ) + # Update dimensions from process after reset_stream + self.width = self.process.width + self.height = self.process.height + logging.info(f"Streamer: Updated dimensions to {self.width}x{self.height} from process") + self.stop_event.clear() + self.protocol.width = self.width + self.protocol.height = self.height await self.protocol.start() # We need a bunch of concurrent tasks to run the streamer. So we start them all in background and then also start From 087a41af8c06033f51d406f5b504017b58214ee3 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Wed, 28 May 2025 03:15:52 +0000 Subject: [PATCH 09/21] change default due to warm-up --- runner/app/live/utils/comfy_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runner/app/live/utils/comfy_utils.py b/runner/app/live/utils/comfy_utils.py index 17a8fcd62..2dc8b5147 100644 --- a/runner/app/live/utils/comfy_utils.py +++ b/runner/app/live/utils/comfy_utils.py @@ -2,8 +2,8 @@ import json class ComfyUtils: - DEFAULT_WIDTH = 512 - DEFAULT_HEIGHT = 512 + DEFAULT_WIDTH = 384 + DEFAULT_HEIGHT = 704 @staticmethod def get_latent_image_dimensions(workflow: dict | str | None) -> tuple[int, int]: From 78088b18fa97542db79ede3bf9736ec7325d4f62 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Wed, 28 May 2025 03:27:32 +0000 Subject: [PATCH 10/21] improve parsing of resolution for restarts --- runner/app/live/streamer/process_guardian.py | 2 +- runner/app/live/streamer/streamer.py | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/runner/app/live/streamer/process_guardian.py b/runner/app/live/streamer/process_guardian.py index 486046784..217aa1a11 100644 --- a/runner/app/live/streamer/process_guardian.py +++ b/runner/app/live/streamer/process_guardian.py @@ -104,7 +104,7 @@ async def reset_stream( logging.info(f"Resolution changed from {self.width}x{self.height} to {new_width}x{new_height}, restarting process") self.width = new_width self.height = new_height - await self.process.stop() + await self.stop() # Create new process with current pipeline name and params self.process = PipelineProcess.start(self.pipeline, params) diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index 1988dee45..51adb9236 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -44,16 +44,6 @@ def __init__( self.height = height async def start(self, params: dict): - # Parse expected input resolution from workflow prompt - try: - if params.get('prompt'): - prompt = params.get('prompt') - self.width, self.height = ComfyUtils.get_latent_image_dimensions(prompt) - params.update({"width": self.width, "height": self.height}) - logging.info(f"Streamer: Using dimensions {self.width}x{self.height} from workflow") - except Exception as e: - logging.error(f"Error parsing resolution from prompt, using default dimensions {ComfyUtils.DEFAULT_WIDTH}x{ComfyUtils.DEFAULT_HEIGHT}: {e}") - self.width, self.height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT if self.tasks_supervisor_task: raise RuntimeError("Streamer already started") From ed42e56a206a614c5273d939a1dc7f6e63052ecf Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Thu, 29 May 2025 01:44:12 +0000 Subject: [PATCH 11/21] fix pipeline restart with different resolution --- runner/app/live/pipelines/comfyui.py | 12 ++++++------ runner/app/live/streamer/process_guardian.py | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 86112ba36..c93972107 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -67,13 +67,13 @@ async def initialize(self, **params): await self.client.set_prompts([new_params.prompt]) self.params = new_params - # Get dimensions from the workflow - if isinstance(new_params.prompt, dict): - width, height = ComfyUtils.get_latent_image_dimensions(new_params.prompt) - if width is None or height is None: - width, height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT # Default dimensions if not found in workflow - logging.warning(f"Could not find dimensions in workflow, using default {width}x{height}") + # Attempt to get dimensions from the workflow + width, height = ComfyUtils.get_latent_image_dimensions(new_params.prompt) + if width is None or height is None: + width, height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT # Default dimensions if not found in workflow + logging.warning(f"Could not find dimensions in workflow, using default {width}x{height}") + # Fallback to default dimensions if not found in workflow width, height = width or ComfyUtils.DEFAULT_WIDTH, height or ComfyUtils.DEFAULT_HEIGHT # Warm up the pipeline with the workflow dimensions diff --git a/runner/app/live/streamer/process_guardian.py b/runner/app/live/streamer/process_guardian.py index 217aa1a11..ce0a434f5 100644 --- a/runner/app/live/streamer/process_guardian.py +++ b/runner/app/live/streamer/process_guardian.py @@ -107,6 +107,7 @@ async def reset_stream( await self.stop() # Create new process with current pipeline name and params self.process = PipelineProcess.start(self.pipeline, params) + #self.process.update_params(params) self.status.start_time = time.time() self.status.input_status = InputStatus() From c2af95c6037991a0c45b5a40ba1798b2e02d3f3c Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Thu, 29 May 2025 02:33:07 +0000 Subject: [PATCH 12/21] code cleanup --- runner/app/live/streamer/protocol/trickle.py | 2 -- runner/app/live/streamer/streamer.py | 1 - runner/app/live/trickle/encoder.py | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/runner/app/live/streamer/protocol/trickle.py b/runner/app/live/streamer/protocol/trickle.py index ec0690172..eee25c46f 100644 --- a/runner/app/live/streamer/protocol/trickle.py +++ b/runner/app/live/streamer/protocol/trickle.py @@ -126,8 +126,6 @@ async def control_loop(self, done: asyncio.Event) -> AsyncGenerator[dict, None]: # Ignore periodic keepalive messages continue - # TODO: handle prompt changes with differing resolution - logging.info("Received control message with params: %s", data) yield data diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index 51adb9236..da239c201 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -44,7 +44,6 @@ def __init__( self.height = height async def start(self, params: dict): - if self.tasks_supervisor_task: raise RuntimeError("Streamer already started") diff --git a/runner/app/live/trickle/encoder.py b/runner/app/live/trickle/encoder.py index 38a924cec..bf22c80f3 100644 --- a/runner/app/live/trickle/encoder.py +++ b/runner/app/live/trickle/encoder.py @@ -26,7 +26,7 @@ def encode_av( output_callback, get_metadata, video_codec: Optional[str] ='libx264', - audio_codec: Optional[str] ='libfdk_aac', + audio_codec: Optional[str] ='libfdk_aac' ): logging.info("Starting encoder") From e2786ae7ec475795a084b6ba3291172ec2a7d6e3 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Mon, 2 Jun 2025 20:15:50 +0000 Subject: [PATCH 13/21] fix decoder --- runner/app/live/trickle/decoder.py | 31 ++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/runner/app/live/trickle/decoder.py b/runner/app/live/trickle/decoder.py index ec050192d..5d4cf7d35 100644 --- a/runner/app/live/trickle/decoder.py +++ b/runner/app/live/trickle/decoder.py @@ -95,7 +95,6 @@ def decode_av(pipe_input, frame_callback, put_metadata, output_width, output_hei frame = cast(av.VideoFrame, frame) if frame.pts is None: continue - # drop frames that come in too fast # TODO also check timing relative to wall clock pts_time = frame.time @@ -108,15 +107,27 @@ def decode_av(pipe_input, frame_callback, put_metadata, output_width, output_hei else: # not delayed, so use prev pts to allow more jitter next_pts_time = next_pts_time + frame_interval - - # h = 512 - # w = int((512 * frame.width / frame.height) / 2) * 2 # force divisible by 2 - # if frame.height > frame.width: - # w = 512 - # h = int((512 * frame.height / frame.width) / 2) * 2 - - frame = reformatter.reformat(frame, format='rgba', width=output_width, height=output_height) - avframe = InputFrame.from_av_video(frame) + # Convert frame to image + image = frame.to_image() + if image.mode != "RGB": + image = image.convert("RGB") + width, height = image.size + + if output_width == output_height and width != height: + # Crop to center square if output is square but input isn't + square_size = min(width, height) + start_x = width // 2 - square_size // 2 + start_y = height // 2 - square_size // 2 + image = image.crop((start_x, start_y, start_x + square_size, start_y + square_size)) + elif (output_width, output_height) != (width, height): + # Resize if dimensions don't match output + image = image.resize((output_width, output_height)) + + # Convert to tensor + image_np = np.array(image).astype(np.float32) / 255.0 + tensor = torch.tensor(image_np).unsqueeze(0) + + avframe = InputFrame.from_av_video(tensor, frame.pts, frame.time_base) avframe.log_timestamps["frame_init"] = time.time() frame_callback(avframe) continue From 7794f178073db2a1f9ef28895efb153f70f8fe0f Mon Sep 17 00:00:00 2001 From: Elite Date: Tue, 3 Jun 2025 15:46:20 +0000 Subject: [PATCH 14/21] crop and resize input frames to workflow dimensions --- runner/app/live/trickle/decoder.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/runner/app/live/trickle/decoder.py b/runner/app/live/trickle/decoder.py index 5d4cf7d35..0eede088c 100644 --- a/runner/app/live/trickle/decoder.py +++ b/runner/app/live/trickle/decoder.py @@ -113,14 +113,25 @@ def decode_av(pipe_input, frame_callback, put_metadata, output_width, output_hei image = image.convert("RGB") width, height = image.size - if output_width == output_height and width != height: - # Crop to center square if output is square but input isn't - square_size = min(width, height) - start_x = width // 2 - square_size // 2 - start_y = height // 2 - square_size // 2 - image = image.crop((start_x, start_y, start_x + square_size, start_y + square_size)) - elif (output_width, output_height) != (width, height): - # Resize if dimensions don't match output + # Calculate aspect ratios + input_ratio = width / height + output_ratio = output_width / output_height + + if input_ratio != output_ratio: + # Need to crop to match output aspect ratio + if input_ratio > output_ratio: + # Input is wider than output - crop width + new_width = int(height * output_ratio) + start_x = (width - new_width) // 2 + image = image.crop((start_x, 0, start_x + new_width, height)) + else: + # Input is taller than output - crop height + new_height = int(width / output_ratio) + start_y = (height - new_height) // 2 + image = image.crop((0, start_y, width, start_y + new_height)) + + # Resize to final dimensions + if (output_width, output_height) != image.size: image = image.resize((output_width, output_height)) # Convert to tensor From aa54a4cb8da8268d9decc7580cd1694824309959 Mon Sep 17 00:00:00 2001 From: Elite Date: Tue, 10 Jun 2025 14:45:22 +0000 Subject: [PATCH 15/21] remove exception --- runner/app/live/streamer/process_guardian.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/app/live/streamer/process_guardian.py b/runner/app/live/streamer/process_guardian.py index ce0a434f5..5533ae2ed 100644 --- a/runner/app/live/streamer/process_guardian.py +++ b/runner/app/live/streamer/process_guardian.py @@ -339,7 +339,7 @@ async def _monitor_loop(self): # Hot fix: the comfyui pipeline process is having trouble shutting down and causes restarts not to recover. # So we skip the restart here and move the state to ERROR so the worker will restart the whole container. # TODO: Remove this exception once pipeline shutdown is fixed and restarting process is useful again. - raise Exception("Skipping process restart due to pipeline shutdown issues") + #raise Exception("Skipping process restart due to pipeline shutdown issues") await self._restart_process() except Exception: logging.exception("Failed to stop streamer and restart process. Moving to ERROR state", stack_info=True) From d5c1f8f9dba00e198dded58cf7c170c87e88abff Mon Sep 17 00:00:00 2001 From: Elite Date: Tue, 10 Jun 2025 15:18:28 +0000 Subject: [PATCH 16/21] refactor process.py to use self.pipeline --- runner/app/live/streamer/process.py | 46 ++++++++++++++--------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/runner/app/live/streamer/process.py b/runner/app/live/streamer/process.py index 0f691d0f2..25109e7b7 100644 --- a/runner/app/live/streamer/process.py +++ b/runner/app/live/streamer/process.py @@ -25,6 +25,7 @@ def start(pipeline_name: str, params: dict): def __init__(self, pipeline_name: str): self.pipeline_name = pipeline_name + self.pipeline = None # Initialize pipeline as None self.ctx = mp.get_context("spawn") self.input_queue = self.ctx.Queue(maxsize=2) @@ -166,12 +167,12 @@ async def _initialize_pipeline(self): logging.info("PipelineProcess: No params found in param_update_queue, loading with default params") with log_timing(f"PipelineProcess: Pipeline loading with {params}"): - pipeline = load_pipeline(self.pipeline_name) + self.pipeline = load_pipeline(self.pipeline_name) # TODO: We may need to call reset_stream when resolution is changed and start the pipeline again # Changing the engine causes issues, maybe cleanup related - await pipeline.initialize(**params) - return pipeline + await self.pipeline.initialize(**params) + return self.pipeline except Exception as e: self._report_error(f"Error loading pipeline: {e}") if not params: @@ -181,19 +182,19 @@ async def _initialize_pipeline(self): with log_timing( f"PipelineProcess: Pipeline loading with default params due to error with params: {params}" ): - pipeline = load_pipeline(self.pipeline_name) - await pipeline.initialize() - return pipeline + self.pipeline = load_pipeline(self.pipeline_name) + await self.pipeline.initialize() + return self.pipeline except Exception as e: self._report_error(f"Error loading pipeline with default params: {e}") raise async def _run_pipeline_loops(self): - pipeline = await self._initialize_pipeline() + await self._initialize_pipeline() self.pipeline_initialized.set() - input_task = asyncio.create_task(self._input_loop(pipeline)) - output_task = asyncio.create_task(self._output_loop(pipeline)) - param_task = asyncio.create_task(self._param_update_loop(pipeline)) + input_task = asyncio.create_task(self._input_loop()) + output_task = asyncio.create_task(self._output_loop()) + param_task = asyncio.create_task(self._param_update_loop()) async def wait_for_stop(): while not self.is_done(): @@ -209,17 +210,17 @@ async def wait_for_stop(): for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) - await self._cleanup_pipeline(pipeline) + await self._cleanup_pipeline() logging.info("PipelineProcess: _run_pipeline_loops finished.") - async def _input_loop(self, pipeline: Pipeline): + async def _input_loop(self): while not self.is_done(): try: input_frame = await asyncio.to_thread(self.input_queue.get, timeout=0.1) if isinstance(input_frame, VideoFrame): input_frame.log_timestamps["pre_process_frame"] = time.time() - await pipeline.put_video_frame(input_frame, self.request_id) + await self.pipeline.put_video_frame(input_frame, self.request_id) elif isinstance(input_frame, AudioFrame): self._try_queue_put(self.output_queue, AudioOutput([input_frame], self.request_id)) except queue.Empty: @@ -228,10 +229,10 @@ async def _input_loop(self, pipeline: Pipeline): except Exception as e: self._report_error(f"Error processing input frame: {e}") - async def _output_loop(self, pipeline: Pipeline): + async def _output_loop(self): while not self.is_done(): try: - output = await pipeline.get_processed_video_frame() + output = await self.pipeline.get_processed_video_frame() if isinstance(output, VideoOutput) and not output.tensor.is_cuda and torch.cuda.is_available(): output = output.replace_tensor(output.tensor.cuda()) output.log_timestamps["post_process_frame"] = time.time() @@ -239,19 +240,18 @@ async def _output_loop(self, pipeline: Pipeline): except Exception as e: self._report_error(f"Error processing output frame: {e}") - async def _param_update_loop(self, pipeline: Pipeline): + async def _param_update_loop(self): while not self.is_done(): try: params = await asyncio.to_thread(self.param_update_queue.get, timeout=0.1) if self._handle_logging_params(params): logging.info(f"PipelineProcess: Updating pipeline parameters: {params}") - await pipeline.update_params(**params) + await self.pipeline.update_params(**params) except queue.Empty: - # Timeout ensures the non-daemon threads from to_thread can exit if task is cancelled continue except Exception as e: - self._report_error(f"Error updating params: {e}") + self._report_error(f"Error updating parameters: {e}") def _report_error(self, error_msg: str): error_event = { @@ -261,12 +261,12 @@ def _report_error(self, error_msg: str): logging.error(error_msg) self._try_queue_put(self.error_queue, error_event) - async def _cleanup_pipeline(self, pipeline): - if pipeline is not None: + async def _cleanup_pipeline(self): + if self.pipeline: try: - await pipeline.stop() + await self.pipeline.stop() except Exception as e: - logging.error(f"Error stopping pipeline: {e}") + self._report_error(f"Error cleaning up pipeline: {e}") def _setup_logging(self): level = ( From b0b9859c6e1c8fc8199df8c350af1cd258fa7a5d Mon Sep 17 00:00:00 2001 From: Elite Date: Tue, 10 Jun 2025 15:18:47 +0000 Subject: [PATCH 17/21] improve cleanup for comfyui pipeline --- runner/app/live/pipelines/comfyui.py | 9 +++++++++ runner/app/live/streamer/process_guardian.py | 1 + 2 files changed, 10 insertions(+) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index c93972107..6f1ca1b39 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -116,5 +116,14 @@ async def update_params(self, **params): async def stop(self): logging.info("Stopping ComfyUI pipeline") + # Clear the video_incoming_frames queue + while not self.video_incoming_frames.empty(): + try: + frame = await self.video_incoming_frames.get_nowait() + # Ensure any CUDA tensors are properly handled + if frame.tensor is not None and frame.tensor.is_cuda: + frame.tensor = frame.tensor.cpu() + except asyncio.QueueEmpty: + break await self.client.cleanup() logging.info("ComfyUI pipeline stopped") \ No newline at end of file diff --git a/runner/app/live/streamer/process_guardian.py b/runner/app/live/streamer/process_guardian.py index 5533ae2ed..737dc39a7 100644 --- a/runner/app/live/streamer/process_guardian.py +++ b/runner/app/live/streamer/process_guardian.py @@ -104,6 +104,7 @@ async def reset_stream( logging.info(f"Resolution changed from {self.width}x{self.height} to {new_width}x{new_height}, restarting process") self.width = new_width self.height = new_height + await self.process._cleanup_pipeline() await self.stop() # Create new process with current pipeline name and params self.process = PipelineProcess.start(self.pipeline, params) From a48b5957a9c411943fdb611e38d7fbedbcfdfb53 Mon Sep 17 00:00:00 2001 From: Elite Date: Tue, 10 Jun 2025 15:25:17 +0000 Subject: [PATCH 18/21] bump comfyui-base --- runner/docker/Dockerfile.live-base-comfyui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index fb42d186d..ee5597250 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base@sha256:2d5ecad6bf24bb73831c6c87a33a7503f48a88ef2d490609068505a945d91146 +ARG BASE_IMAGE=livepeer/comfyui-base@sha256:e51975d3c8f90f8aafb10a38557fccba6bb6b37c4bc7b3b302c9771048fbc1c1 FROM ${BASE_IMAGE} # ----------------------------------------------------------------------------- From bc5191f36087cf35b4295092b3a4788c1bb47db8 Mon Sep 17 00:00:00 2001 From: Elite Date: Tue, 10 Jun 2025 18:28:43 +0000 Subject: [PATCH 19/21] add width/height params for resolution, fix noop pipeline cleanup --- runner/app/live/pipelines/comfyui.py | 38 +++++++++++++------ runner/app/live/pipelines/noop.py | 17 ++++++++- runner/app/live/streamer/process_guardian.py | 40 ++++++++++---------- 3 files changed, 63 insertions(+), 32 deletions(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 6f1ca1b39..183279140 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -59,6 +59,7 @@ def __init__(self): self.video_incoming_frames: asyncio.Queue[VideoOutput] = asyncio.Queue() self.width = ComfyUtils.DEFAULT_WIDTH self.height = ComfyUtils.DEFAULT_HEIGHT + self.pause_input = False async def initialize(self, **params): new_params = ComfyUIParams(**params) @@ -87,6 +88,8 @@ async def initialize(self, **params): logging.info("Pipeline initialization and warmup complete") async def put_video_frame(self, frame: VideoFrame, request_id: str): + if self.pause_input: + return tensor = frame.tensor if tensor.is_cuda: # Clone the tensor to be able to send it on comfystream internal queue @@ -115,15 +118,28 @@ async def update_params(self, **params): self.params = new_params async def stop(self): - logging.info("Stopping ComfyUI pipeline") - # Clear the video_incoming_frames queue - while not self.video_incoming_frames.empty(): - try: - frame = await self.video_incoming_frames.get_nowait() - # Ensure any CUDA tensors are properly handled - if frame.tensor is not None and frame.tensor.is_cuda: - frame.tensor = frame.tensor.cpu() - except asyncio.QueueEmpty: - break - await self.client.cleanup() + try: + self.pause_input = True + logging.info("Stopping ComfyUI pipeline") + await self.client.cleanup(unload_models=False) + # Wait for the pipeline to stop + await asyncio.sleep(1) + # Clear the video_incoming_frames queue + while not self.video_incoming_frames.empty(): + try: + frame = self.video_incoming_frames.get_nowait() + # Ensure any CUDA tensors are properly handled + if frame.tensor is not None and frame.tensor.is_cuda: + frame.tensor = frame.tensor.cpu() + except asyncio.QueueEmpty: + break + + # Force CUDA cache clear + if torch.cuda.is_available(): + torch.cuda.empty_cache() + except Exception as e: + logging.error(f"Error stopping ComfyUI pipeline: {e}") + finally: + self.pause_input = False + logging.info("ComfyUI pipeline stopped") \ No newline at end of file diff --git a/runner/app/live/pipelines/noop.py b/runner/app/live/pipelines/noop.py index 350543cc0..ca588471c 100644 --- a/runner/app/live/pipelines/noop.py +++ b/runner/app/live/pipelines/noop.py @@ -1,7 +1,7 @@ import logging import asyncio from PIL import Image - +import torch from .interface import Pipeline from trickle import VideoFrame, VideoOutput @@ -26,3 +26,18 @@ async def update_params(self, **params): async def stop(self): logging.info("Stopping pipeline") + + # Clear the frame queue and move any CUDA tensors to CPU + while not self.frame_queue.empty(): + try: + frame = self.frame_queue.get_nowait() + if frame.tensor.is_cuda: + frame.tensor.cpu() # Move tensor to CPU before deletion + except asyncio.QueueEmpty: + break + except Exception as e: + logging.error(f"Error clearing frame queue: {e}") + + # Force CUDA cache clear + if torch.cuda.is_available(): + torch.cuda.empty_cache() diff --git a/runner/app/live/streamer/process_guardian.py b/runner/app/live/streamer/process_guardian.py index 737dc39a7..459f07eff 100644 --- a/runner/app/live/streamer/process_guardian.py +++ b/runner/app/live/streamer/process_guardian.py @@ -89,26 +89,26 @@ async def reset_stream( raise RuntimeError("Process not running") # Check if resolution has changed - width = params.pop("width", None) - height = params.pop("height", None) - if (width is None or height is None): - new_width, new_height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT - if params.get('prompt'): - try: - new_width, new_height = ComfyUtils.get_latent_image_dimensions(params.get('prompt')) - except Exception as e: - logging.error(f"Error parsing resolution from prompt, using default dimensions: {e}") - - # If resolution changed, we need to restart the process - if (new_width != self.width or new_height != self.height): - logging.info(f"Resolution changed from {self.width}x{self.height} to {new_width}x{new_height}, restarting process") - self.width = new_width - self.height = new_height - await self.process._cleanup_pipeline() - await self.stop() - # Create new process with current pipeline name and params - self.process = PipelineProcess.start(self.pipeline, params) - #self.process.update_params(params) + new_width = params.pop("width", None) + new_height = params.pop("height", None) + if (new_width is None or new_height is None): + new_width, new_height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT + + if params.get('prompt'): + try: + new_width, new_height = ComfyUtils.get_latent_image_dimensions(params.get('prompt')) + except Exception as e: + logging.error(f"Error parsing resolution from prompt, using default dimensions: {e}") + + # If resolution changed, we need to restart the process + if (new_width != self.width or new_height != self.height): + logging.info(f"Resolution changed from {self.width}x{self.height} to {new_width}x{new_height}, restarting process") + self.width = new_width + self.height = new_height + await self.process._cleanup_pipeline() + await self.stop() + # Create new process with current pipeline name and params + self.process = PipelineProcess.start(self.pipeline, params) self.status.start_time = time.time() self.status.input_status = InputStatus() From b81d248e2d9e6b62c23e827cb6f2f0377680b05d Mon Sep 17 00:00:00 2001 From: Elite Date: Wed, 11 Jun 2025 18:00:00 +0000 Subject: [PATCH 20/21] unload tensor correctly --- runner/app/live/pipelines/comfyui.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index 183279140..e3b82646f 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -130,7 +130,7 @@ async def stop(self): frame = self.video_incoming_frames.get_nowait() # Ensure any CUDA tensors are properly handled if frame.tensor is not None and frame.tensor.is_cuda: - frame.tensor = frame.tensor.cpu() + frame.tensor.cpu() except asyncio.QueueEmpty: break From 7f42ab93764606de49bf58479a8162ab248ff2c9 Mon Sep 17 00:00:00 2001 From: Elite Date: Thu, 12 Jun 2025 03:58:21 +0000 Subject: [PATCH 21/21] set dimensions of pipeline with startup arg --- runner/app/live/infer.py | 8 +++++--- runner/app/live/pipelines/comfyui.py | 19 ++++++++++--------- runner/app/live/streamer/process_guardian.py | 15 +++++---------- runner/app/main.py | 17 +++++++++++++---- runner/app/pipelines/live_video_to_video.py | 13 ++++++++++++- runner/docker/Dockerfile.live-base-comfyui | 2 +- 6 files changed, 46 insertions(+), 28 deletions(-) diff --git a/runner/app/live/infer.py b/runner/app/live/infer.py index dcb30a60e..7bbb77e59 100644 --- a/runner/app/live/infer.py +++ b/runner/app/live/infer.py @@ -57,20 +57,22 @@ async def main( ): loop = asyncio.get_event_loop() loop.set_exception_handler(asyncio_exception_handler) - process = ProcessGuardian(pipeline, params or {}) # Only initialize the streamer if we have a protocol and URLs to connect to streamer = None if stream_protocol and subscribe_url and publish_url: + width = params.get('width') + height = params.get('height') if stream_protocol == "trickle": protocol = TrickleProtocol( - subscribe_url, publish_url, control_url, events_url + subscribe_url, publish_url, control_url, events_url, + width=width, height=height ) elif stream_protocol == "zeromq": protocol = ZeroMQProtocol(subscribe_url, publish_url) else: raise ValueError(f"Unsupported protocol: {stream_protocol}") - streamer = PipelineStreamer(protocol, process, request_id, stream_id) + streamer = PipelineStreamer(protocol, process, request_id, stream_id, width=width, height=height) api = None try: diff --git a/runner/app/live/pipelines/comfyui.py b/runner/app/live/pipelines/comfyui.py index e3b82646f..9e5dd5d50 100644 --- a/runner/app/live/pipelines/comfyui.py +++ b/runner/app/live/pipelines/comfyui.py @@ -2,7 +2,7 @@ import json import torch import asyncio -from typing import Union +from typing import Union, Optional, Tuple from pydantic import BaseModel, field_validator import pathlib @@ -29,6 +29,8 @@ class Config: extra = "forbid" prompt: Union[str, dict] = DEFAULT_WORKFLOW_JSON + width: Optional[int] = None + height: Optional[int] = None @field_validator('prompt') @classmethod @@ -68,14 +70,13 @@ async def initialize(self, **params): await self.client.set_prompts([new_params.prompt]) self.params = new_params - # Attempt to get dimensions from the workflow - width, height = ComfyUtils.get_latent_image_dimensions(new_params.prompt) - if width is None or height is None: - width, height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT # Default dimensions if not found in workflow - logging.warning(f"Could not find dimensions in workflow, using default {width}x{height}") - - # Fallback to default dimensions if not found in workflow - width, height = width or ComfyUtils.DEFAULT_WIDTH, height or ComfyUtils.DEFAULT_HEIGHT + # Get dimensions from params or environment variable + width = new_params.width + height = new_params.height + + # Fallback to default dimensions if not found + width = width or ComfyUtils.DEFAULT_WIDTH + height = height or ComfyUtils.DEFAULT_HEIGHT # Warm up the pipeline with the workflow dimensions logging.info(f"Warming up pipeline with dimensions: {width}x{height}") diff --git a/runner/app/live/streamer/process_guardian.py b/runner/app/live/streamer/process_guardian.py index 459f07eff..432f3b051 100644 --- a/runner/app/live/streamer/process_guardian.py +++ b/runner/app/live/streamer/process_guardian.py @@ -89,18 +89,12 @@ async def reset_stream( raise RuntimeError("Process not running") # Check if resolution has changed - new_width = params.pop("width", None) - new_height = params.pop("height", None) + new_width = params.get("width", None) + new_height = params.get("height", None) if (new_width is None or new_height is None): new_width, new_height = ComfyUtils.DEFAULT_WIDTH, ComfyUtils.DEFAULT_HEIGHT - - if params.get('prompt'): - try: - new_width, new_height = ComfyUtils.get_latent_image_dimensions(params.get('prompt')) - except Exception as e: - logging.error(f"Error parsing resolution from prompt, using default dimensions: {e}") - - # If resolution changed, we need to restart the process + + # If resolution changed, we need to restart the process (does not work for comfyui) if (new_width != self.width or new_height != self.height): logging.info(f"Resolution changed from {self.width}x{self.height} to {new_width}x{new_height}, restarting process") self.width = new_width @@ -108,6 +102,7 @@ async def reset_stream( await self.process._cleanup_pipeline() await self.stop() # Create new process with current pipeline name and params + params.update({"width": new_width, "height": new_height}) self.process = PipelineProcess.start(self.pipeline, params) self.status.start_time = time.time() diff --git a/runner/app/main.py b/runner/app/main.py index 372e407de..582fb3f5c 100644 --- a/runner/app/main.py +++ b/runner/app/main.py @@ -26,8 +26,17 @@ async def lifespan(app: FastAPI): pipeline = os.environ["PIPELINE"] model_id = os.environ["MODEL_ID"] - - app.pipeline = load_pipeline(pipeline, model_id) + dimensions = os.environ.get("DIMENSIONS", "512x512") + if dimensions is not None: + try: + width, height = map(int, dimensions.split("x")) + if width % 64 != 0 or height % 64 != 0: + raise ValueError(f"Width and height must be divisible by 64, got {width}x{height}") + except ValueError as e: + logger.error(f"Invalid DIMENSIONS format. Expected 'WIDTHxHEIGHT' but got '{dimensions}'") + raise + + app.pipeline = load_pipeline(pipeline, model_id, dimensions) app.include_router(load_route(pipeline)) app.hardware_info_service.log_gpu_compute_info() @@ -42,7 +51,7 @@ async def lifespan(app: FastAPI): logger.info("Shutting down") -def load_pipeline(pipeline: str, model_id: str) -> any: +def load_pipeline(pipeline: str, model_id: str, dimensions: str | None = None) -> any: match pipeline: case "text-to-image": from app.pipelines.text_to_image import TextToImagePipeline @@ -81,7 +90,7 @@ def load_pipeline(pipeline: str, model_id: str) -> any: case "live-video-to-video": from app.pipelines.live_video_to_video import LiveVideoToVideoPipeline - return LiveVideoToVideoPipeline(model_id) + return LiveVideoToVideoPipeline(model_id, dimensions) case "text-to-speech": from app.pipelines.text_to_speech import TextToSpeechPipeline diff --git a/runner/app/pipelines/live_video_to_video.py b/runner/app/pipelines/live_video_to_video.py index 09896b4ff..b828e9693 100644 --- a/runner/app/pipelines/live_video_to_video.py +++ b/runner/app/pipelines/live_video_to_video.py @@ -18,11 +18,16 @@ proc_status_important_fields = ["State", "VmRSS", "VmSize", "Threads", "voluntary_ctxt_switches", "nonvoluntary_ctxt_switches", "CoreDumping"] class LiveVideoToVideoPipeline(Pipeline): - def __init__(self, model_id: str): + def __init__(self, model_id: str, dimensions: str | None = None): self.version = os.getenv("VERSION", "undefined") self.model_id = model_id self.model_dir = get_model_dir() self.torch_device = get_torch_device() + self.dimensions = dimensions + if dimensions: + self.width, self.height = map(int, dimensions.split("x")) + else: + self.width, self.height = 512, 512 # Default values self.infer_script_path = ( Path(__file__).parent.parent / "live" / "infer.py" ) @@ -34,6 +39,9 @@ def __call__( # type: ignore ): if not self.process: raise RuntimeError("Pipeline process not running") + + # TODO: remove this once we have a better way to parse dimensions/dynamically reload process + params.update({"width": self.width, "height": self.height}) max_retries = 10 thrown_ex = None @@ -109,11 +117,14 @@ def start_process(self): logging.info("Starting pipeline process") cmd = [sys.executable, str(self.infer_script_path)] cmd.extend(["--pipeline", self.model_id]) # we use the model_id as the pipeline name for now + cmd.extend(["--initial-params", json.dumps({"width": self.width, "height": self.height})]) cmd.extend(["--http-port", "8888"]) # TODO: set torch device from self.torch_device env = os.environ.copy() env["HUGGINGFACE_HUB_CACHE"] = str(self.model_dir) + if self.dimensions: + env["DIMENSIONS"] = self.dimensions try: self.process = subprocess.Popen( diff --git a/runner/docker/Dockerfile.live-base-comfyui b/runner/docker/Dockerfile.live-base-comfyui index ee5597250..ab5395a3f 100644 --- a/runner/docker/Dockerfile.live-base-comfyui +++ b/runner/docker/Dockerfile.live-base-comfyui @@ -1,4 +1,4 @@ -ARG BASE_IMAGE=livepeer/comfyui-base@sha256:e51975d3c8f90f8aafb10a38557fccba6bb6b37c4bc7b3b302c9771048fbc1c1 +ARG BASE_IMAGE=livepeer/comfyui-base@sha256:a9ecd7be5cb93cd8d90e41e2e8759bb307b45b1dc20a19dfd40c3b4844e6097b FROM ${BASE_IMAGE} # -----------------------------------------------------------------------------