Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ version = "0.1.0"
[tasks]
serve = "python server.py"
stream = "python streaming_client.py"
sync_client = "python sync_client.py"
write = "python writing_client.py"
test = "pytest tests/"

Expand Down
39 changes: 39 additions & 0 deletions sync_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from websockets.sync.client import connect
import json
import msgpack
import os

import time

class NodePlaceholder:
def __init__(self, node_id, envelope_format="json", base_url="localhost:8000"):
self.node_id = node_id
self.envelope_format = envelope_format
self.base_url = base_url

def stream(self, seq_num=None):
websocket_url = f"ws://{self.base_url}/stream/single/{self.node_id}?envelope_format={self.envelope_format}"
if seq_num is not None:
websocket_url += f"&seq_num={seq_num}"

with connect(websocket_url) as websocket:
for message in websocket:
if isinstance(message, bytes) and self.envelope_format == "msgpack":
yield msgpack.unpackb(message)
else:
yield json.loads(message)


def main():
REDIS_WS_API_URL = os.getenv("REDIS_WS_API_URL", "localhost:8000")

node = NodePlaceholder(node_id="481980", envelope_format="msgpack", base_url=REDIS_WS_API_URL)

# Client will be in this loop until breaking or the writer
# closes the connection and the client receives outstanding messages.
for message in node.stream():
print(f"Received: {message}")


if __name__ == "__main__":
main()