Skip to content

Commit b601ab2

Browse files
[#148] Fix CQ proto syntax ordering and event buffering
Move 'syntax = "proto3"' before 'package' declaration — Protostream requires the syntax statement to be the first non-comment line. Buffer CQ state events during listener registration so that includeState events are not lost before the caller attaches .on() handlers. Also propagate errors from addContinuousQuery instead of swallowing them.
1 parent fb6823d commit b601ab2

2 files changed

Lines changed: 23 additions & 5 deletions

File tree

lib/listeners.js

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,9 @@
186186
return f.cat(encodeListenerAddCommon, encodeListenerAddInterests);
187187
};
188188

189+
var bufferedEvents = [];
190+
var buffering = true;
191+
189192
var cqEmitFn = function(event, emitter, bytebuf) {
190193
var payloadLength = codec.decodeVariableBytes()(bytebuf);
191194
if (!f.existy(payloadLength) || !f.existy(payloadLength.answer))
@@ -195,7 +198,11 @@
195198
if (!f.existy(wrapped) || !f.existy(wrapped.wrappedMessage))
196199
return true;
197200
var cqResult = codec.decodeContinuousQueryResult(wrapped.wrappedMessage);
198-
emitter.emit(cqResult.resultType, cqResult.key, cqResult.value, cqResult.projection);
201+
if (buffering) {
202+
bufferedEvents.push(cqResult);
203+
} else {
204+
emitter.emit(cqResult.resultType, cqResult.key, cqResult.value, cqResult.projection);
205+
}
199206
return true;
200207
};
201208

@@ -219,15 +226,26 @@
219226
var l = listeners.get(listenerId);
220227
var emitter = l.emitter;
221228
return {
222-
on: function(event, callback) { emitter.on(event, callback); return this; },
229+
on: function(event, callback) {
230+
emitter.on(event, callback);
231+
if (buffering) {
232+
buffering = false;
233+
bufferedEvents.forEach(function(ev) {
234+
emitter.emit(ev.resultType, ev.key, ev.value, ev.projection);
235+
});
236+
bufferedEvents = [];
237+
}
238+
return this;
239+
},
223240
getListenerId: function() { return listenerId; }
224241
};
225242
}
226243
listen.removeListeners(listenerId);
227244
return undefined;
228245
})
229-
.catch(function() {
246+
.catch(function(err) {
230247
listen.removeListeners(listenerId);
248+
throw err;
231249
});
232250
},
233251
};

spec/infinispan_cq_spec.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ var t = require('./utils/testing');
44
var ispn = require('../lib/infinispan');
55
var protobuf = require('protobufjs');
66

7-
var personProto = 'package tutorial;\n'
8-
+ 'syntax = "proto3";\n'
7+
var personProto = 'syntax = "proto3";\n'
8+
+ 'package tutorial;\n'
99
+ '/**\n'
1010
+ ' * @TypeId(1000042)\n'
1111
+ ' */\n'

0 commit comments

Comments
 (0)