Skip to content

Commit c37bd98

Browse files
[#148] Continuous query support (#151)
* [#148] Continuous query support
1 parent 8d0a6da commit c37bd98

8 files changed

Lines changed: 614 additions & 8 deletions

File tree

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
[id='continuous-queries_{context}']
2+
= Continuous queries
3+
4+
Continuous queries let you receive real-time notifications when cache entries join, leave, or are updated relative to an Ickle query.
5+
Unlike regular listeners, continuous queries filter events on the server side, so your client only receives events for entries that match the query.
6+
7+
== Registering a continuous query
8+
9+
Register a continuous query with `addContinuousQuery()`, providing an Ickle query string and optional named parameters:
10+
11+
[source,javascript,options="nowrap",subs=attributes+]
12+
----
13+
const cq = await client.addContinuousQuery(
14+
'FROM tutorial.InstaPost p WHERE p.user = :userName',
15+
{params: {userName: 'belen_esteban'}}
16+
);
17+
----
18+
19+
== Handling events
20+
21+
Subscribe to the three event types on the returned handle:
22+
23+
[source,javascript,options="nowrap",subs=attributes+]
24+
----
25+
cq.on('joining', function(key, value, projection) {
26+
console.log('Entry joined the result set');
27+
});
28+
29+
cq.on('leaving', function(key, value, projection) {
30+
console.log('Entry left the result set');
31+
});
32+
33+
cq.on('updated', function(key, value, projection) {
34+
console.log('Matching entry was updated');
35+
});
36+
----
37+
38+
* **joining** -- fired when a new entry matches the query, or an existing entry is modified to match.
39+
* **leaving** -- fired when an entry no longer matches the query, either because it was removed or modified.
40+
* **updated** -- fired when an entry that already matches the query is modified but still matches.
41+
42+
== Removing a continuous query
43+
44+
[source,javascript,options="nowrap",subs=attributes+]
45+
----
46+
await client.removeContinuousQuery(cq);
47+
----
48+
49+
== Requirements
50+
51+
* The cache must use Protostream (`application/x-protostream`) encoding.
52+
* Protobuf schemas must be registered on the server before registering the continuous query.
53+
* The client must call `registerProtostreamRoot()` and `registerProtostreamType()` for the types used in the query.

lib/codec.js

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -821,11 +821,48 @@
821821
protobuf.loadSync(path.join(`${__dirname}/protostream/message-wrapping.proto`),root); //loaded the wrappedMessage.proto to the root
822822
var QueryRequest = root.lookupType('org.infinispan.query.remote.client.QueryRequest');
823823
var QueryResponse = root.lookupType('org.infinispan.query.remote.client.QueryResponse');
824+
var ContinuousQueryResult = root.lookupType('org.infinispan.query.remote.client.ContinuousQueryResult');
824825

825826
return {
826827
QueryRequest,
827-
QueryResponse
828+
QueryResponse,
829+
ContinuousQueryResult
828830
};
829831
}());
830832

833+
var CQ_RESULT_TYPES = ['leaving', 'joining', 'updated'];
834+
835+
/**
836+
* Wrap a scalar value as a WrappedMessage byte array.
837+
* @param {string|number|boolean} value - The value to wrap.
838+
* @returns {Buffer} The encoded WrappedMessage bytes.
839+
*/
840+
exports.wrapScalar = function(value) {
841+
return WrappedMessage.encode(createWrappedMessage(value, _.identity)).finish();
842+
};
843+
844+
/**
845+
* Decode a ContinuousQueryResult from raw protobuf bytes.
846+
* @param {Buffer} bytes - The raw ContinuousQueryResult bytes.
847+
* @returns {Object} Decoded result with resultType, key, value, projection.
848+
*/
849+
exports.decodeContinuousQueryResult = function(bytes) {
850+
var msg = Query.ContinuousQueryResult.decode(bytes);
851+
return {
852+
resultType: CQ_RESULT_TYPES[msg.resultType] || 'leaving',
853+
key: msg.key,
854+
value: msg.value,
855+
projection: msg.projection
856+
};
857+
};
858+
859+
/**
860+
* Decode a WrappedMessage from raw bytes.
861+
* @param {Buffer} bytes - The raw WrappedMessage bytes.
862+
* @returns {Object} The decoded WrappedMessage object.
863+
*/
864+
exports.decodeWrappedMessage = function(bytes) {
865+
return WrappedMessage.decode(bytes);
866+
};
867+
831868
}.call(this));

lib/infinispan.js

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,58 @@
824824
return false;
825825
});
826826
},
827+
/**
828+
* Continuous query options.
829+
*
830+
* @typedef {Object} ContinuousQueryOptions
831+
* @property {Object} [params] - Named parameter bindings for the Ickle query.
832+
* @since 0.16
833+
*/
834+
/**
835+
* Register a continuous query that watches for cache changes
836+
* matching the given Ickle query.
837+
*
838+
* @param {String} queryString Ickle query string.
839+
* @param {ContinuousQueryOptions=} opts Optional CQ options.
840+
* @returns {Promise<Object>}
841+
* A promise completed with a ContinuousQuery handle.
842+
* Use cq.on('joining', fn), cq.on('leaving', fn), cq.on('updated', fn)
843+
* to receive events.
844+
* @memberof Client#
845+
* @since 0.16
846+
*/
847+
addContinuousQuery: function(queryString, opts) {
848+
var ctx = transport.context(SMALL);
849+
logger.debugf('Invoke addContinuousQuery(msgId=%d,query=%s)', ctx.id, queryString);
850+
return listen.addContinuousQueryListener(transport, ctx, queryString, opts);
851+
},
852+
/**
853+
* Remove a continuous query.
854+
*
855+
* @param {Object} cq ContinuousQuery handle returned by addContinuousQuery.
856+
* @returns {Promise}
857+
* A promise completed when the continuous query has been removed.
858+
* @memberof Client#
859+
* @since 0.16
860+
*/
861+
removeContinuousQuery: function(cq) {
862+
var listenerId = cq.getListenerId();
863+
var ctx = transport.context(SMALL);
864+
logger.debugf('Invoke removeContinuousQuery(msgId=%d,listenerId=%s)', ctx.id, listenerId);
865+
var conn = listen.findConnectionListener(listenerId);
866+
if (!f.existy(conn))
867+
return Promise.reject(
868+
new Error(`No server connection for CQ listener (listenerId=${ listenerId })`));
869+
870+
var remote = futurePinned(ctx, 0x27, p.encodeListenerId(listenerId), p.complete(p.hasSuccess), conn);
871+
return remote.then(function (success) {
872+
if (success) {
873+
listen.removeListeners(listenerId);
874+
return true;
875+
}
876+
return false;
877+
});
878+
},
827879
/**
828880
* Add script to server(s).
829881
*

lib/listeners.js

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
var events = require('events');
1212

13+
var CQ_FACTORY = 'continuous-query-filter-converter-factory';
14+
1315
module.exports = listeners;
1416

1517
/**
@@ -132,8 +134,10 @@
132134
dispatchEvent: function(event, listenerId, bytebuf, emitFunc) {
133135
return function() {
134136
var l = listeners.get(listenerId);
135-
if (f.existy(l))
136-
return emitFunc(event, l.emitter, bytebuf, listenerId);
137+
if (f.existy(l)) {
138+
var emit = l.cqEmit || emitFunc;
139+
return emit(event, l.emitter, bytebuf, listenerId);
140+
}
137141

138142
logger.error('No emitter exists for listener %s', listenerId);
139143
return true;
@@ -152,7 +156,117 @@
152156
setProtocol: function(newProtocol) {
153157
protocol = newProtocol;
154158
},
159+
/**
160+
* Register a continuous query listener.
161+
* @param {Object} transport Transport instance.
162+
* @param {Object} ctx Request context.
163+
* @param {string} queryString Ickle query string.
164+
* @param {Object} [opts] Options with optional params map.
165+
* @returns {Promise<Object>} ContinuousQuery handle.
166+
*/
167+
addContinuousQueryListener: function(transport, ctx, queryString, opts) {
168+
opts = opts || {};
169+
var listenerId = _.uniqueId('cq_');
170+
logger.debugl(function() {
171+
return ['Invoke addContinuousQueryListener(msgId=%d,query=%s,listenerId=%s)',
172+
ctx.id, queryString, listenerId]; });
173+
174+
var cqParams = buildCQParams(queryString, opts.params);
175+
176+
var listenerOpts = {
177+
includeState: true,
178+
useRawData: true,
179+
filterFactory: { name: CQ_FACTORY, params: cqParams },
180+
converterFactory: { name: CQ_FACTORY, params: cqParams }
181+
};
182+
183+
var encodeListenerAddCommon = protocol.encodeListenerAdd(listenerId, listenerOpts)();
184+
var encodeListenerAddInterests = protocol.encodeListenerInterests(listenerOpts);
185+
var encodeListenerAdd = function() {
186+
return f.cat(encodeListenerAddCommon, encodeListenerAddInterests);
187+
};
188+
189+
var bufferedEvents = [];
190+
var buffering = true;
191+
192+
var cqEmitFn = function(event, emitter, bytebuf) {
193+
var payloadLength = codec.decodeVariableBytes()(bytebuf);
194+
if (!f.existy(payloadLength) || !f.existy(payloadLength.answer))
195+
return false;
196+
var payload = payloadLength.answer;
197+
var wrapped = codec.decodeWrappedMessage(payload);
198+
if (!f.existy(wrapped) || !f.existy(wrapped.wrappedMessage))
199+
return true;
200+
var cqResult = codec.decodeContinuousQueryResult(wrapped.wrappedMessage);
201+
if (buffering) {
202+
bufferedEvents.push(cqResult);
203+
} else {
204+
emitter.emit(cqResult.resultType, cqResult.key, cqResult.value, cqResult.projection);
205+
}
206+
return true;
207+
};
208+
209+
var preWrite = function(conn) {
210+
var emitter = new events.EventEmitter();
211+
listeners.put(listenerId, {
212+
id: listenerId,
213+
emitter: emitter,
214+
conn: conn,
215+
cqEmit: cqEmitFn
216+
});
217+
};
218+
219+
var remote = futurePreWrite(transport, ctx, 0x25
220+
, encodeListenerAdd, protocol.complete(protocol.hasSuccess)
221+
, listenerOpts, preWrite);
222+
223+
return remote
224+
.then(function(success) {
225+
if (success) {
226+
var l = listeners.get(listenerId);
227+
var emitter = l.emitter;
228+
return {
229+
on: function(event, callback) {
230+
emitter.on(event, callback);
231+
if (buffering) {
232+
buffering = false;
233+
bufferedEvents.forEach(function(ev) {
234+
emitter.emit(ev.resultType, ev.key, ev.value, ev.projection);
235+
});
236+
bufferedEvents = [];
237+
}
238+
return this;
239+
},
240+
getListenerId: function() { return listenerId; }
241+
};
242+
}
243+
listen.removeListeners(listenerId);
244+
return undefined;
245+
})
246+
.catch(function(err) {
247+
listen.removeListeners(listenerId);
248+
throw err;
249+
});
250+
},
155251
};
252+
253+
/**
254+
* Build CQ factory params from query and named params.
255+
* @param {string} queryString Ickle query.
256+
* @param {Object} [params] Named parameter map.
257+
* @returns {Buffer[]} Array of wrapped param byte arrays.
258+
*/
259+
function buildCQParams(queryString, params) {
260+
var result = [codec.wrapScalar(queryString)];
261+
if (f.existy(params)) {
262+
_.each(params, function(value, name) {
263+
result.push(codec.wrapScalar(name));
264+
result.push(codec.wrapScalar(value));
265+
});
266+
}
267+
return result;
268+
}
269+
156270
return listen;
157271
}
158272

lib/protocols.js

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -699,18 +699,31 @@
699699

700700
var steps = [
701701
codec.encodeString(listenerId), // listener id
702-
codec.encodeUByte(includeState), // include state
703-
codec.encodeUByte(0) // TODO filter factory name
702+
codec.encodeUByte(includeState) // include state
704703
];
705704

705+
// Filter factory
706+
if (_.has(opts, 'filterFactory') && _.has(opts.filterFactory, 'name')) {
707+
steps.push(codec.encodeString(opts.filterFactory.name));
708+
var filterParams = opts.filterFactory.params || [];
709+
steps.push(codec.encodeUByte(filterParams.length));
710+
filterParams.forEach(function(p) { steps.push(codec.encodeBytesWithLength(p)); });
711+
} else {
712+
steps.push(codec.encodeUByte(0));
713+
}
714+
715+
// Converter factory
706716
if (_.has(opts, 'converterFactory') && _.has(opts.converterFactory, 'name')) {
707717
steps.push(codec.encodeString(opts.converterFactory.name));
708-
steps.push(codec.encodeUByte(0)); // TODO add converter parameter support
718+
var converterParams = opts.converterFactory.params || [];
719+
steps.push(codec.encodeUByte(converterParams.length));
720+
converterParams.forEach(function(p) { steps.push(codec.encodeBytesWithLength(p)); });
709721
} else {
710-
steps.push(codec.encodeUByte(0)); // no converter
722+
steps.push(codec.encodeUByte(0));
711723
}
712724

713-
steps.push(codec.encodeUByte(0)); // raw data disabled
725+
// Raw data
726+
steps.push(codec.encodeUByte(hasOpt(opts, 'useRawData') ? 1 : 0));
714727

715728
return function() {
716729
return steps;

0 commit comments

Comments
 (0)