Skip to content

Commit ad2bb9b

Browse files
committed
playground lmdb timeseries
1 parent 6d346d6 commit ad2bb9b

File tree

5 files changed

+151
-3
lines changed

5 files changed

+151
-3
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from pathlib import Path
2+
import asyncio
3+
import contextlib
4+
import sys
5+
6+
from hat import aio
7+
8+
from hat.event import common
9+
import hat.event.backends.lmdb
10+
11+
12+
def main():
13+
with contextlib.suppress(asyncio.CancelledError):
14+
aio.run_asyncio(async_main())
15+
16+
17+
async def async_main():
18+
db_path = Path(sys.argv[1])
19+
20+
server_id = 1
21+
group_id = 0
22+
point_count = 2000
23+
interval = 5
24+
retention = 24 * 60 * 60
25+
26+
conf = {'db_path': str(db_path),
27+
'identifier': None,
28+
'flush_period': 1,
29+
'cleanup_period': 1,
30+
'conditions': [],
31+
'latest': {'subscriptions': []},
32+
'timeseries': [{'order_by': 'SOURCE_TIMESTAMP',
33+
'subscriptions': [['eds', 'timeseries',
34+
str(group_id), '?']],
35+
'limit': {'duration': retention}}]}
36+
37+
backend = await aio.call(hat.event.backends.lmdb.info.create, conf, None,
38+
None)
39+
40+
try:
41+
t = 0
42+
session_id = 0
43+
44+
while t < retention:
45+
session_id += 1
46+
events = [common.Event(id=common.EventId(server_id, session_id,
47+
point_id + 1),
48+
type=('eds', 'timeseries', str(group_id),
49+
str(point_id)),
50+
timestamp=common.Timestamp(t, 0),
51+
source_timestamp=common.Timestamp(t, 0),
52+
payload=None)
53+
for point_id in range(point_count)]
54+
await backend.register(events)
55+
56+
t += interval
57+
58+
finally:
59+
await aio.uncancellable(backend.async_close())
60+
61+
62+
if __name__ == '__main__':
63+
main()
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#!/bin/sh
2+
3+
set -e
4+
5+
RUN_PATH=$(dirname "$(realpath "$0")")
6+
PLAYGROUND_PATH=$RUN_PATH/../..
7+
. $PLAYGROUND_PATH/env.sh
8+
9+
10+
db_path=$DATA_PATH/lmdb_timeseries.db
11+
12+
rm -f $db_path
13+
exec $PYTHON $RUN_PATH/create.py $db_path
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from pathlib import Path
2+
import asyncio
3+
import contextlib
4+
import cProfile
5+
import sys
6+
import time
7+
8+
from hat import aio
9+
10+
from hat.event import common
11+
import hat.event.backends.lmdb
12+
13+
14+
def main():
15+
with contextlib.suppress(asyncio.CancelledError):
16+
aio.run_asyncio(async_main())
17+
18+
19+
async def async_main():
20+
db_path = Path(sys.argv[1])
21+
22+
group_id = 0
23+
max_results = 10_000
24+
25+
conf = {'db_path': str(db_path),
26+
'identifier': None,
27+
'flush_period': 1,
28+
'cleanup_period': 1,
29+
'conditions': [],
30+
'latest': {'subscriptions': []},
31+
'timeseries': [{'order_by': 'SOURCE_TIMESTAMP',
32+
'subscriptions': [['eds', 'timeseries',
33+
str(group_id), '?']]}]}
34+
35+
backend = await aio.call(hat.event.backends.lmdb.info.create, conf, None,
36+
None)
37+
38+
try:
39+
t1 = time.monotonic()
40+
41+
with cProfile.Profile() as pr:
42+
await backend.query(
43+
common.QueryTimeseriesParams(
44+
event_types=[('eds', 'timeseries', str(group_id), '0')],
45+
order_by=common.OrderBy.SOURCE_TIMESTAMP,
46+
order=common.Order.DESCENDING,
47+
max_results=max_results + 1))
48+
49+
t2 = time.monotonic()
50+
51+
print(t2 - t1)
52+
53+
pr.dump_stats(str(db_path.with_suffix('.profile')))
54+
55+
finally:
56+
await aio.uncancellable(backend.async_close())
57+
58+
59+
if __name__ == '__main__':
60+
main()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/bin/sh
2+
3+
set -e
4+
5+
RUN_PATH=$(dirname "$(realpath "$0")")
6+
PLAYGROUND_PATH=$RUN_PATH/../..
7+
. $PLAYGROUND_PATH/env.sh
8+
9+
10+
db_path=$DATA_PATH/lmdb_timeseries.db
11+
12+
exec $PYTHON $RUN_PATH/query.py $db_path

src_py/hat/event/backends/lmdb/backend.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ async def register(self,
171171
self._registered_count += len(events)
172172

173173
if self._registered_count > max_registered_count:
174-
await self._flush_queue.put(self._loop.create_future())
174+
await self._flush_queue.put(None)
175175

176176
if self._registered_events_cb:
177177
await aio.call(self._registered_events_cb, events)
@@ -236,7 +236,7 @@ async def cleanup():
236236

237237
while futures:
238238
future = futures.popleft()
239-
if not future.done():
239+
if future and not future.done():
240240
future.set_result(None)
241241

242242
except Exception as e:
@@ -250,7 +250,7 @@ async def cleanup():
250250
futures.append(self._flush_queue.get_nowait())
251251

252252
for future in futures:
253-
if not future.done():
253+
if future and not future.done():
254254
future.set_exception(common.BackendClosedError())
255255

256256
await aio.uncancellable(cleanup())

0 commit comments

Comments
 (0)