-
Notifications
You must be signed in to change notification settings - Fork 156
Expand file tree
/
Copy pathQueryScheduler.h
More file actions
114 lines (83 loc) · 3.5 KB
/
QueryScheduler.h
File metadata and controls
114 lines (83 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#pragma once
#include "DBQuery.h"
struct QueryScheduler : NonCopyable {
std::function<void(lmdb::txn &txn, const Subscription &sub, uint64_t levId, std::string_view eventPayload)> onEvent;
std::function<void(lmdb::txn &txn, const Subscription &sub, const std::vector<uint64_t> &levIds)> onEventBatch;
std::function<void(lmdb::txn &txn, Subscription &sub, uint64_t total, std::string hllHex)> onComplete;
// If false, then levIds returned to above callbacks can be stale (because they were deleted)
// If false, then onEvent's eventPayload will always be ""
bool ensureExists = true;
using ConnQueries = flat_hash_map<SubId, DBQuery*>;
flat_hash_map<uint64_t, ConnQueries> conns; // connId -> subId -> DBQuery*
std::deque<DBQuery*> running;
std::vector<uint64_t> levIdBatch;
bool addSub(lmdb::txn &txn, Subscription &&sub) {
sub.latestEventId = getMostRecentLevId(txn);
{
auto *existing = findQuery(sub.connId, sub.subId);
if (existing) removeSub(sub.connId, sub.subId);
}
auto res = conns.try_emplace(sub.connId);
auto &connQueries = res.first->second;
if (connQueries.size() >= cfg().relay__maxSubsPerConnection) {
return false;
}
DBQuery *q = new DBQuery(sub);
connQueries.try_emplace(q->sub.subId, q);
running.push_front(q);
return true;
}
DBQuery *findQuery(uint64_t connId, const SubId &subId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return nullptr;
auto f2 = f1->second.find(subId);
if (f2 == f1->second.end()) return nullptr;
return f2->second;
}
void removeSub(uint64_t connId, const SubId &subId) {
auto *query = findQuery(connId, subId);
if (!query) return;
query->dead = true;
conns[connId].erase(subId);
if (conns[connId].empty()) conns.erase(connId);
}
void closeConn(uint64_t connId) {
auto f1 = conns.find(connId);
if (f1 == conns.end()) return;
for (auto &[k, v] : f1->second) v->dead = true;
conns.erase(connId);
}
void process(lmdb::txn &txn) {
if (running.empty()) return;
DBQuery *q = running.front();
running.pop_front();
if (q->dead) {
delete q;
return;
}
auto eventPayloadCursor = lmdb::cursor::open(txn, env.dbi_EventPayload);
bool complete = q->process(txn, [&](const auto &sub, uint64_t levId){
std::string_view eventPayload;
if (ensureExists) {
std::string_view key = lmdb::to_sv<uint64_t>(levId);
if (!eventPayloadCursor.get(key, eventPayload, MDB_SET_KEY)) return; // If not found, was deleted while scan was paused
}
if (onEvent) onEvent(txn, sub, levId, eventPayload);
if (onEventBatch) levIdBatch.push_back(levId);
}, cfg().relay__queryTimesliceBudgetMicroseconds, cfg().relay__logging__dbScanPerf);
if (onEventBatch) {
onEventBatch(txn, q->sub, levIdBatch);
levIdBatch.clear();
}
if (complete) {
auto connId = q->sub.connId;
removeSub(connId, q->sub.subId);
std::string hllHex;
if (q->hllOffset >= 0) hllHex = q->hll.encodeHex();
if (onComplete) onComplete(txn, q->sub, q->sentEventsFull.size(), std::move(hllHex));
delete q;
} else {
running.push_back(q);
}
}
};