From dd6659229b92cfafd9f2b6f281be7c514926d238 Mon Sep 17 00:00:00 2001 From: gbischof Date: Wed, 9 Jul 2025 16:14:19 -0400 Subject: [PATCH 1/7] add a sync websocket client --- sync_client.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 sync_client.py diff --git a/sync_client.py b/sync_client.py new file mode 100644 index 0000000..649d59a --- /dev/null +++ b/sync_client.py @@ -0,0 +1,39 @@ +from websockets.sync.client import connect +import json +import msgpack +import os + +class NodePlaceholder: + def __init__(self, node_id, envelope_format="json", base_url="localhost:8000"): + self.node_id = node_id + self.envelope_format = envelope_format + self.base_url = base_url + + def stream(self, seq_num=None): + websocket_url = f"ws://{self.base_url}/stream/single/{self.node_id}?envelope_format={self.envelope_format}" + if seq_num is not None: + websocket_url += f"&seq_num={seq_num}" + + with connect(websocket_url) as websocket: + try: + while True: + message = websocket.recv() + if isinstance(message, bytes) and self.envelope_format == "msgpack": + yield msgpack.unpackb(message) + else: + yield json.loads(message) + except Exception: + return + + +def main(): + REDIS_WS_API_URL = os.getenv("REDIS_WS_API_URL", "localhost:8000") + + node = NodePlaceholder(node_id="481980", envelope_format="msgpack", base_url=REDIS_WS_API_URL) + + for message in node.stream(): + print(f"Received: {message}") + + +if __name__ == "__main__": + main() From 1ef95b02758639b1bffa29e04effee77f607bfa3 Mon Sep 17 00:00:00 2001 From: gbischof Date: Wed, 9 Jul 2025 16:19:55 -0400 Subject: [PATCH 2/7] use sync client iterator --- sync_client.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/sync_client.py b/sync_client.py index 649d59a..e8cd376 100644 --- a/sync_client.py +++ b/sync_client.py @@ -15,15 +15,11 @@ def stream(self, seq_num=None): websocket_url += f"&seq_num={seq_num}" with connect(websocket_url) as websocket: - try: - while True: - message = websocket.recv() - if isinstance(message, bytes) and self.envelope_format == "msgpack": - yield msgpack.unpackb(message) - else: - yield json.loads(message) - except Exception: - return + for message in websocket: + if isinstance(message, bytes) and self.envelope_format == "msgpack": + yield msgpack.unpackb(message) + else: + yield json.loads(message) def main(): From c6f03dda38dc96e549443d3d6ab034aba16aea87 Mon Sep 17 00:00:00 2001 From: gbischof Date: Wed, 9 Jul 2025 16:26:44 -0400 Subject: [PATCH 3/7] add locust sync_client_test.py --- locust/sync_client_test.py | 131 +++++++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 locust/sync_client_test.py diff --git a/locust/sync_client_test.py b/locust/sync_client_test.py new file mode 100644 index 0000000..ebffeae --- /dev/null +++ b/locust/sync_client_test.py @@ -0,0 +1,131 @@ +import os +from locust import HttpUser, task, between, events +import numpy as np +import time +import json +import msgpack +import logging +import httpx +from sync_client import NodePlaceholder +import threading + + +class WriterUser(HttpUser): + wait_time = between(0.1, 0.2) # Wait 0.1-0.2 seconds between writes + weight = int(os.getenv("WRITER_WEIGHT", 1)) + + def on_start(self): + """Initialize user state""" + # Use a fixed node_id so StreamingUser can connect to it + self.node_id = 481980 + self.message_count = 0 + + @task(10) # Run 10x as often as cleanup + def write_data(self): + # Create data with incrementing value + binary_data = (np.ones(5) * time.time()).tobytes() + + # Post data and check response + response = self.client.post( + f"/upload/{self.node_id}", + data=binary_data, + headers={"Content-Type": "application/octet-stream"}, + ) + + # Log status like writing_client + if response.status_code == httpx.codes.OK: + logging.info(f"Wrote message {self.message_count} to node {self.node_id}") + self.message_count += 1 + else: + logging.error( + f"Failed to write message {self.message_count}: {response.status_code}" + ) + + @task + def cleanup(self): + """Periodically delete the node like the real client""" + if self.message_count > 20: + self.client.delete(f"/upload/{self.node_id}") + self.message_count = 0 + logging.info(f"Cleaned up node {self.node_id}") + + +class StreamingUser(HttpUser): + """User that streams data from test-redis-ws using sync_client""" + + wait_time = between(0.1, 0.2) + weight = int(os.getenv("STREAMING_WEIGHT", 1)) + + def on_start(self): + """Connect to the streaming endpoint using sync_client""" + # Use the same node_id as WriterUser + self.node_id = 481980 + self.envelope_format = "msgpack" # or "json" + self.streaming = True + + # Extract host from client base_url + base_url = self.client.base_url.replace("http://", "").replace("https://", "") + self.node = NodePlaceholder(str(self.node_id), envelope_format=self.envelope_format, base_url=base_url) + + # Start streaming in a separate thread + self.stream_thread = threading.Thread(target=self.stream_messages) + self.stream_thread.daemon = True + self.stream_thread.start() + + def on_stop(self): + """Stop streaming when user stops""" + self.streaming = False + if hasattr(self, 'stream_thread'): + self.stream_thread.join(timeout=2) + + def stream_messages(self): + """Stream messages using sync_client in a separate thread""" + try: + for message in self.node.stream(): + if not self.streaming: + break + + self.process_message(message) + + except Exception as e: + logging.error(f"Error in streaming: {e}") + events.request.fire( + request_type="WS", + name="stream_connection", + response_time=0, + response_length=0, + exception=e, + context={} + ) + + def process_message(self, message): + """Process websocket messages and measure latency""" + try: + received_time = time.time() + + # Pull out timestamp from the payload + payload = message.get("payload", []) + if payload and len(payload) > 0: + write_time = float(payload[0]) + latency_ms = (received_time - write_time) * 1000 + + logging.info( + f"WS latency (server sequence {message.get('sequence')}): {latency_ms:.1f}ms" + ) + + events.request.fire( + request_type="WS", + name="write_to_websocket_delivery", + response_time=latency_ms, + response_length=len(str(message)), + exception=None, + context={} + ) + + except Exception as e: + logging.error(f"Error processing message: {e}") + + @task + def keep_alive(self): + """Dummy task to keep the user active while listening for messages""" + pass \ No newline at end of file From 09f541751e8bc0dbd6b764ac113d8d73542043b2 Mon Sep 17 00:00:00 2001 From: gbischof Date: Wed, 9 Jul 2025 16:44:20 -0400 Subject: [PATCH 4/7] add locust test --- locust/sync_client_test.py | 59 ++++++++++++++------------------------ 1 file changed, 21 insertions(+), 38 deletions(-) diff --git a/locust/sync_client_test.py b/locust/sync_client_test.py index ebffeae..b5613ef 100644 --- a/locust/sync_client_test.py +++ b/locust/sync_client_test.py @@ -1,13 +1,14 @@ import os +import sys from locust import HttpUser, task, between, events import numpy as np import time -import json -import msgpack import logging import httpx + +# Add parent directory to path to import sync_client +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from sync_client import NodePlaceholder -import threading class WriterUser(HttpUser): @@ -61,42 +62,10 @@ def on_start(self): # Use the same node_id as WriterUser self.node_id = 481980 self.envelope_format = "msgpack" # or "json" - self.streaming = True # Extract host from client base_url base_url = self.client.base_url.replace("http://", "").replace("https://", "") self.node = NodePlaceholder(str(self.node_id), envelope_format=self.envelope_format, base_url=base_url) - - # Start streaming in a separate thread - self.stream_thread = threading.Thread(target=self.stream_messages) - self.stream_thread.daemon = True - self.stream_thread.start() - - def on_stop(self): - """Stop streaming when user stops""" - self.streaming = False - if hasattr(self, 'stream_thread'): - self.stream_thread.join(timeout=2) - - def stream_messages(self): - """Stream messages using sync_client in a separate thread""" - try: - for message in self.node.stream(): - if not self.streaming: - break - - self.process_message(message) - - except Exception as e: - logging.error(f"Error in streaming: {e}") - events.request.fire( - request_type="WS", - name="stream_connection", - response_time=0, - response_length=0, - exception=e, - context={} - ) def process_message(self, message): """Process websocket messages and measure latency""" @@ -126,6 +95,20 @@ def process_message(self, message): logging.error(f"Error processing message: {e}") @task - def keep_alive(self): - """Dummy task to keep the user active while listening for messages""" - pass \ No newline at end of file + def stream_messages(self): + """Stream messages using sync_client""" + try: + for message in self.node.stream(): + self.process_message(message) + break # Process one message per task execution + + except Exception as e: + logging.error(f"Error in streaming: {e}") + events.request.fire( + request_type="WS", + name="stream_connection", + response_time=0, + response_length=0, + exception=e, + context={} + ) From ae872917d0e7aa1ec6475b6f28c6beae89d2e778 Mon Sep 17 00:00:00 2001 From: gbischof Date: Wed, 9 Jul 2025 17:03:58 -0400 Subject: [PATCH 5/7] add sync_client task --- pixi.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pixi.toml b/pixi.toml index 6af57f5..c70705a 100644 --- a/pixi.toml +++ b/pixi.toml @@ -8,6 +8,7 @@ version = "0.1.0" [tasks] serve = "python server.py" stream = "python streaming_client.py" +sync_client = "python sync_client.py" write = "python writing_client.py" test = "pytest tests/" From dbf2308e3e515e4622c8437adea4be099dff0441 Mon Sep 17 00:00:00 2001 From: gbischof Date: Wed, 9 Jul 2025 17:08:15 -0400 Subject: [PATCH 6/7] touch up --- sync_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sync_client.py b/sync_client.py index e8cd376..5a0fdd3 100644 --- a/sync_client.py +++ b/sync_client.py @@ -26,7 +26,8 @@ def main(): REDIS_WS_API_URL = os.getenv("REDIS_WS_API_URL", "localhost:8000") node = NodePlaceholder(node_id="481980", envelope_format="msgpack", base_url=REDIS_WS_API_URL) - + + # Client will be in this loop until breaking or the writer closes the connection. for message in node.stream(): print(f"Received: {message}") From ddb3e75a1bb227e595bed6ee9d9e087f3260216f Mon Sep 17 00:00:00 2001 From: gbischof Date: Wed, 9 Jul 2025 17:27:12 -0400 Subject: [PATCH 7/7] remove locust test --- locust/sync_client_test.py | 114 ------------------------------------- sync_client.py | 5 +- 2 files changed, 4 insertions(+), 115 deletions(-) delete mode 100644 locust/sync_client_test.py diff --git a/locust/sync_client_test.py b/locust/sync_client_test.py deleted file mode 100644 index b5613ef..0000000 --- a/locust/sync_client_test.py +++ /dev/null @@ -1,114 +0,0 @@ -import os -import sys -from locust import HttpUser, task, between, events -import numpy as np -import time -import logging -import httpx - -# Add parent directory to path to import sync_client -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from sync_client import NodePlaceholder - - -class WriterUser(HttpUser): - wait_time = between(0.1, 0.2) # Wait 0.1-0.2 seconds between writes - weight = int(os.getenv("WRITER_WEIGHT", 1)) - - def on_start(self): - """Initialize user state""" - # Use a fixed node_id so StreamingUser can connect to it - self.node_id = 481980 - self.message_count = 0 - - @task(10) # Run 10x as often as cleanup - def write_data(self): - # Create data with incrementing value - binary_data = (np.ones(5) * time.time()).tobytes() - - # Post data and check response - response = self.client.post( - f"/upload/{self.node_id}", - data=binary_data, - headers={"Content-Type": "application/octet-stream"}, - ) - - # Log status like writing_client - if response.status_code == httpx.codes.OK: - logging.info(f"Wrote message {self.message_count} to node {self.node_id}") - self.message_count += 1 - else: - logging.error( - f"Failed to write message {self.message_count}: {response.status_code}" - ) - - @task - def cleanup(self): - """Periodically delete the node like the real client""" - if self.message_count > 20: - self.client.delete(f"/upload/{self.node_id}") - self.message_count = 0 - logging.info(f"Cleaned up node {self.node_id}") - - -class StreamingUser(HttpUser): - """User that streams data from test-redis-ws using sync_client""" - - wait_time = between(0.1, 0.2) - weight = int(os.getenv("STREAMING_WEIGHT", 1)) - - def on_start(self): - """Connect to the streaming endpoint using sync_client""" - # Use the same node_id as WriterUser - self.node_id = 481980 - self.envelope_format = "msgpack" # or "json" - - # Extract host from client base_url - base_url = self.client.base_url.replace("http://", "").replace("https://", "") - self.node = NodePlaceholder(str(self.node_id), envelope_format=self.envelope_format, base_url=base_url) - - def process_message(self, message): - """Process websocket messages and measure latency""" - try: - received_time = time.time() - - # Pull out timestamp from the payload - payload = message.get("payload", []) - if payload and len(payload) > 0: - write_time = float(payload[0]) - latency_ms = (received_time - write_time) * 1000 - - logging.info( - f"WS latency (server sequence {message.get('sequence')}): {latency_ms:.1f}ms" - ) - - events.request.fire( - request_type="WS", - name="write_to_websocket_delivery", - response_time=latency_ms, - response_length=len(str(message)), - exception=None, - context={} - ) - - except Exception as e: - logging.error(f"Error processing message: {e}") - - @task - def stream_messages(self): - """Stream messages using sync_client""" - try: - for message in self.node.stream(): - self.process_message(message) - break # Process one message per task execution - - except Exception as e: - logging.error(f"Error in streaming: {e}") - events.request.fire( - request_type="WS", - name="stream_connection", - response_time=0, - response_length=0, - exception=e, - context={} - ) diff --git a/sync_client.py b/sync_client.py index 5a0fdd3..81e3744 100644 --- a/sync_client.py +++ b/sync_client.py @@ -3,6 +3,8 @@ import msgpack import os +import time + class NodePlaceholder: def __init__(self, node_id, envelope_format="json", base_url="localhost:8000"): self.node_id = node_id @@ -27,7 +29,8 @@ def main(): node = NodePlaceholder(node_id="481980", envelope_format="msgpack", base_url=REDIS_WS_API_URL) - # Client will be in this loop until breaking or the writer closes the connection. + # Client will be in this loop until breaking or the writer + # closes the connection and the client receives outstanding messages. for message in node.stream(): print(f"Received: {message}")