forked from KxSystems/kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_offsetc.q
36 lines (31 loc) · 1001 Bytes
/
test_offsetc.q
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
\l kfk.q
kfk_cfg:(!) . flip(
(`metadata.broker.list;`localhost:9092);
(`group.id;`0);
(`queue.buffering.max.ms;`1);
(`fetch.wait.max.ms;`10);
(`statistics.interval.ms;`10000);
(`enable.auto.commit;`false);
(`enable.auto.offset.store;`false)
);
client:.kfk.Consumer[kfk_cfg];
TOPIC:`test
data:();
.kfk.consumecb:{[msg]
msg[`data]:"c"$msg[`data];
msg[`rcvtime]:.z.p;
data,::enlist msg;}
.kfk.offsetcb:{[cid;err;offsets]show (cid;err;offsets);}
show .kfk.AssignOffsets[client;TOPIC;(1#0i)!1#.kfk.OFFSET.END] // start replaying from the end
.kfk.Sub[client;TOPIC;(1#0i)!1#.kfk.OFFSET.END];
\t 30000
.z.ts:{
seen:exec last offset by partition from data;
show "Position:";
show .kfk.PositionOffsets[client;TOPIC;seen];
show "Before commited:";
show .kfk.CommittedOffsets[client;TOPIC;seen];
.kfk.CommitOffsets[client;TOPIC;seen;0b]; // commit whatever is storred
show "After commited:";
show .kfk.CommittedOffsets[client;TOPIC;seen];
}