Skip to content

Commit 144f713

Browse files
committed
ft: ARSN-65 oplog pattern library
Snapshot-scan-oplog pattern with state persistence for applications requiring reading the oplog
1 parent 7b45124 commit 144f713

File tree

12 files changed

+1564
-2
lines changed

12 files changed

+1564
-2
lines changed
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
/*
2+
* Main interface for bucketd oplog management
3+
*/
4+
const async = require('async');
5+
const { RESTClient: BucketClient } = require('bucketclient');
6+
const { jsutil, errors } = require('arsenal');
7+
const LogConsumer = require('arsenal').storage.metadata.bucketclient.LogConsumer;
8+
const { isMasterKey } = require('arsenal/lib/versioning/Version');
9+
const OplogInterface = require('./OplogInterface');
10+
11+
class BucketdOplogInterface extends OplogInterface {
12+
13+
constructor(params) {
14+
super(params);
15+
this.stopAt = -1;
16+
this.backendRetryTimes = 3;
17+
this.backendRetryInterval = 300;
18+
this.bucketdOplogQuerySize = 20;
19+
let bkBootstrap = ['localhost:9000'];
20+
if (params && params.bootstrap !== undefined) {
21+
bkBootstrap = params.bootstrap;
22+
}
23+
if (params && params.stopAt !== undefined) {
24+
this.stopAt = params.stopAt;
25+
}
26+
this.bkClient = new BucketClient(bkBootstrap);
27+
}
28+
29+
start(filter, cb) {
30+
if (!(filter.filterType === 'bucket' ||
31+
filter.filterType === 'raftSession')) {
32+
return cb(errors.NotImplemented);
33+
}
34+
const filterName = filter.filterName;
35+
async.waterfall([
36+
/*
37+
* In this step we get the raftId for filterName
38+
*/
39+
next => {
40+
if (filter.filterType === 'raftSession') {
41+
return next(null, filter.raftSession.raftId);
42+
}
43+
this.logger.info('obtaining raftId',
44+
{ filterName });
45+
async.retry(
46+
{
47+
times: this.backendRetryTimes,
48+
interval: this.backendRetryInterval,
49+
},
50+
done => {
51+
this.bkClient.getBucketInformation(
52+
filter.bucket.bucketName,
53+
null,
54+
(err, info) => {
55+
if (err) {
56+
this.logger.info('retrying getBucketInformation', { err, filterName });
57+
return done(err);
58+
}
59+
return done(null, JSON.parse(info));
60+
});
61+
},
62+
(err, res) => {
63+
if (err) {
64+
this.logger.error('getBucketInformation too many failures', { err, filterName });
65+
return next(err);
66+
}
67+
return next(null, res.raftSessionId);
68+
});
69+
return undefined;
70+
},
71+
/*
72+
* In this step we get the stored offset if we have it
73+
*/
74+
(raftId, next) => {
75+
let cseq = undefined;
76+
this.persist.load(filterName, this.persistData, (err, offset) => {
77+
if (err) {
78+
return next(err);
79+
}
80+
cseq = offset;
81+
return next(null, raftId, cseq);
82+
});
83+
},
84+
/*
85+
* In this step we acquire the offset if we don't already have it
86+
*/
87+
(raftId, cseq, next) => {
88+
if (cseq !== undefined) {
89+
this.logger.info(`skipping cseq acquisition (cseq=${cseq})`,
90+
{ filterName });
91+
return next(null, raftId, cseq, true);
92+
}
93+
this.logger.info('cseq acquisition',
94+
{ filterName });
95+
async.retry(
96+
{
97+
times: this.backendRetryTimes,
98+
interval: this.backendRetryInterval,
99+
},
100+
done => {
101+
this.bkClient.getRaftLog(
102+
raftId,
103+
1,
104+
1,
105+
true,
106+
null,
107+
(err, stream) => {
108+
if (err) {
109+
this.logger.info('retrying getRaftLog', { err, filterName });
110+
return done(err);
111+
}
112+
const chunks = [];
113+
stream.on('data', chunk => {
114+
chunks.push(chunk);
115+
});
116+
stream.on('end', () => {
117+
const info = JSON.parse(Buffer.concat(chunks));
118+
return done(null, info);
119+
});
120+
return undefined;
121+
});
122+
},
123+
(err, res) => {
124+
if (err) {
125+
this.logger.error('getRaftLog too many failures', { err, filterName });
126+
return next(err);
127+
}
128+
return next(null, raftId, res.info.cseq, false);
129+
});
130+
return undefined;
131+
},
132+
/*
133+
* In this step we init the state (e.g. scan)
134+
*/
135+
(raftId, cseq, skipInit, next) => {
136+
if (skipInit) {
137+
this.logger.info(`skipping state initialization cseq=${cseq}`,
138+
{ filterName });
139+
return next(null, raftId, cseq);
140+
}
141+
this.logger.info(`initializing state cseq=${cseq}`,
142+
{ filterName });
143+
this.persistData.initState(err => {
144+
if (err) {
145+
return next(err);
146+
}
147+
this.persist.save(
148+
filterName, this.persistData, cseq, err => {
149+
if (err) {
150+
return next(err);
151+
}
152+
return next(null, raftId, cseq);
153+
});
154+
return undefined;
155+
});
156+
return undefined;
157+
},
158+
/*
159+
* In this step we loop over the oplog
160+
*/
161+
(raftId, cseq, next) => {
162+
this.logger.info(`reading oplog raftId=${raftId} cseq=${cseq}`,
163+
{ filterName });
164+
// only way to get out of the loop in all cases
165+
const nextOnce = jsutil.once(next);
166+
let doStop = false;
167+
// resume reading the oplog from cseq. changes are idempotent
168+
const logConsumer = new LogConsumer({
169+
bucketClient: this.bkClient,
170+
raftSession: raftId,
171+
});
172+
let _cseq = cseq;
173+
async.until(
174+
() => doStop,
175+
_next => {
176+
logConsumer.readRecords({
177+
startSeq: _cseq,
178+
limit: this.bucketdOplogQuerySize,
179+
}, (err, record) => {
180+
if (err) {
181+
this.logger.error('readRecords error', { err, filterName });
182+
return setTimeout(() => _next(), 5000);
183+
}
184+
if (!record.log) {
185+
// nothing to read
186+
return setTimeout(() => _next(), 5000);
187+
}
188+
const seqs = [];
189+
record.log.on('data', chunk => {
190+
seqs.push(chunk);
191+
});
192+
record.log.on('end', () => {
193+
const addQueue = [];
194+
const delQueue = [];
195+
for (let i = 0; i < seqs.length; i++) {
196+
if (filter.filterType === 'raftSession' ||
197+
(filter.filterType === 'bucket' &&
198+
seqs[i].db === filter.bucket.bucketName)) {
199+
for (let j = 0; j < seqs[i].entries.length; j++) {
200+
const _item = {};
201+
_item.bucketName = seqs[i].db;
202+
_item.key = seqs[i].entries[j].key;
203+
if (seqs[i].entries[j].type !== undefined &&
204+
seqs[i].entries[j].type === 'del') {
205+
if (!isMasterKey(_item.key)) {
206+
// ignore for now
207+
return;
208+
}
209+
delQueue.push(_item);
210+
} else {
211+
_item.value = Object.assign({}, seqs[i].entries[j].value);
212+
addQueue.push(_item);
213+
}
214+
}
215+
}
216+
}
217+
this.persistData.updateState(
218+
addQueue, delQueue, err => {
219+
if (err) {
220+
return _next(err);
221+
}
222+
_cseq += seqs.length;
223+
this.persist.save(
224+
filterName, this.persistData, _cseq, err => {
225+
if (err) {
226+
return _next(err);
227+
}
228+
if (_cseq > this.stopAt) {
229+
doStop = true;
230+
}
231+
return _next();
232+
});
233+
return undefined;
234+
});
235+
});
236+
return undefined;
237+
});
238+
}, err => {
239+
if (err) {
240+
return nextOnce(err);
241+
}
242+
return nextOnce();
243+
});
244+
},
245+
], err => {
246+
if (err) {
247+
return cb(err);
248+
}
249+
this.logger.info('returning',
250+
{ filterName });
251+
return cb();
252+
});
253+
return undefined;
254+
}
255+
}
256+
257+
module.exports = BucketdOplogInterface;

0 commit comments

Comments
 (0)