-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
162 lines (120 loc) · 4.46 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import argparse
import json
import logging
from contextlib import suppress
from dataclasses import asdict
from functools import partial
from pydantic import ValidationError, NonNegativeFloat
import trio
from pydantic.dataclasses import dataclass
from trio_websocket import ConnectionClosed, serve_websocket
RESPONSE_INTERVAL = 1
INIT_BOUNDS_COORD = 0.0
buses = {}
@dataclass
class Bus:
busId: str
route: str
lat: NonNegativeFloat
lng: NonNegativeFloat
@dataclass
class WindowBounds:
south_lat: NonNegativeFloat
north_lat: NonNegativeFloat
west_lng: NonNegativeFloat
east_lng: NonNegativeFloat
def update(self, south_lat, north_lat, west_lng, east_lng):
self.south_lat = south_lat
self.north_lat = north_lat
self.west_lng = west_lng
self.east_lng = east_lng
def is_inside(self, lat, lng):
if self.north_lat > lat > self.south_lat and self.east_lng > lng > self.west_lng:
return True
return False
async def serve_buses(request):
try:
ws = await request.accept()
while True:
message = await ws.get_message()
result, error_msg = validate_bus_data(message)
if not result:
await ws.send_message(error_msg)
continue
buses[result.busId] = result
except ConnectionClosed:
pass
async def serve_browsers(request):
try:
ws = await request.accept()
bounds = WindowBounds(INIT_BOUNDS_COORD, INIT_BOUNDS_COORD, INIT_BOUNDS_COORD, INIT_BOUNDS_COORD)
async with trio.open_nursery() as nursery:
nursery.start_soon(talk_to_browser, ws, bounds)
nursery.start_soon(listen_browser, ws, bounds)
except ConnectionClosed:
pass
async def talk_to_browser(ws, bounds):
while True:
await send_buses(ws, bounds)
await trio.sleep(RESPONSE_INTERVAL)
async def listen_browser(ws, bounds):
while True:
message = await ws.get_message()
logging.debug(message)
result, error_msg = validate_bounds_data(message)
if not result:
await ws.send_message(error_msg)
continue
bounds.update(result.south_lat, result.north_lat, result.west_lng, result.east_lng)
def validate_bounds_data(message):
try:
new_bounds_fields = json.loads(message)
except json.decoder.JSONDecodeError:
return None, '{"errors": ["Requires valid JSON"], "msgType": "Errors"}'
try:
new_bounds = WindowBounds(**new_bounds_fields.get('data', {}))
except ValidationError as e:
return None, str({"errors": e.errors(), "msgType": "Errors"})
return new_bounds, None
def validate_bus_data(message):
try:
new_bus_fields = json.loads(message)
except json.decoder.JSONDecodeError:
return None, '{"errors": ["Requires valid JSON"], "msgType": "Errors"}'
try:
new_bus = Bus(**new_bus_fields)
except ValidationError as e:
return None, str({"errors": e.errors(), "msgType": "Errors"})
return new_bus, None
async def send_buses(ws, bounds):
buses_in_bounds = [asdict(bus) for bus in buses.values() if bounds.is_inside(bus.lat, bus.lng)]
logging.debug(f'{len(buses_in_bounds)} buses inside bounds')
data = {
"msgType": "Buses",
"buses": buses_in_bounds
}
await ws.send_message(json.dumps(data, ensure_ascii=False))
def is_inside(bounds, lat, lng):
if bounds['north_lat'] > lat > bounds['south_lat'] and bounds['east_lng'] > lng > bounds['west_lng']:
return True
return False
def prepare_args():
parser = argparse.ArgumentParser()
parser.add_argument("--bus_port", type=int, default=8080)
parser.add_argument("--browser_port", type=int, default=8000)
parser.add_argument("-v", "--verbose", action=argparse.BooleanOptionalAction, help="Enable logging")
return parser.parse_args()
async def main():
async with trio.open_nursery() as nursery:
serve_ws = partial(serve_websocket, ssl_context=None)
nursery.start_soon(serve_ws, serve_buses, '127.0.0.1', args.bus_port)
nursery.start_soon(serve_ws, serve_browsers, '127.0.0.1', args.browser_port)
if __name__ == '__main__':
args = prepare_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('trio-websocket').setLevel(logging.INFO)
else:
logging.disable(level=logging.DEBUG)
with suppress(KeyboardInterrupt):
trio.run(main)