Skip to content

Commit f37a93a

Browse files
author
David Ellis
committed
- Corrected param name
- Added variable for repetitively accessed dictionary value - extended if statement to also check if "sub" is not None to see if the subscription has actually been made. - Added on_remote_leave to empty dictionary of forwarded subscriptions - extended if statement to also check if "reg" is not None to see if the registration has actually been made. - Made "create forwarding registration" log debug level to match the subscription one. This is expected functionality and there can be large amounts of registrations which would cause log spam.
1 parent 03d7057 commit f37a93a

File tree

1 file changed

+33
-18
lines changed

1 file changed

+33
-18
lines changed

crossbar/worker/rlink.py

+33-18
Original file line numberDiff line numberDiff line change
@@ -74,23 +74,26 @@ def on_subscription_create(sub_session, sub_details, details=None):
7474
The handler will then also subscribe on the other router, and when receiving
7575
events, re-publish those on this router.
7676
77-
:param sub_id:
77+
:param sub_session:
7878
:param sub_details:
7979
:param details:
8080
:return:
8181
"""
8282
if sub_details["uri"].startswith("wamp."):
8383
return
8484

85-
if sub_details["id"] in self._subs:
86-
# this should not happen actually, but not sure ..
85+
sub_id = sub_details["id"]
86+
87+
if sub_id in self._subs and self._subs[sub_id]["sub"]:
88+
# This will happen if, partway through the subscription process, the RLink disconnects
8789
self.log.error('on_subscription_create: sub ID {sub_id} already in map {method}',
88-
sub_id=sub_details["id"],
90+
sub_id=sub_id,
8991
method=hltype(BridgeSession._setup_event_forwarding))
9092
return
9193

92-
self._subs[sub_details["id"]] = sub_details
93-
self._subs[sub_details["id"]]["sub"] = None
94+
if sub_id not in self._subs:
95+
self._subs[sub_id] = sub_details
96+
self._subs[sub_id]["sub"] = None
9497

9598
uri = sub_details['uri']
9699
ERR_MSG = [None]
@@ -162,17 +165,17 @@ def on_event(*args, **kwargs):
162165
uri))
163166
return
164167

165-
if sub_details["id"] not in self._subs:
168+
if sub_id not in self._subs:
166169
self.log.info("subscription already gone: {uri}", uri=sub_details['uri'])
167170
yield sub.unregister()
168171
else:
169-
self._subs[sub_details["id"]]["sub"] = sub
172+
self._subs[sub_id]["sub"] = sub
170173

171174
self.log.debug(
172175
"created forwarding subscription: me={me} other={other} sub_id={sub_id} sub_details={sub_details} details={details} sub_session={sub_session}",
173176
me=self._session_id,
174177
other=other,
175-
sub_id=sub_details["id"],
178+
sub_id=sub_id,
176179
sub_details=sub_details,
177180
details=details,
178181
sub_session=sub_session,
@@ -222,12 +225,21 @@ def forward_current_subs():
222225
def on_remote_join(_session, _details):
223226
yield forward_current_subs()
224227

228+
def on_remote_leave(_session, _details):
229+
# The remote session has ended, clear subscription records.
230+
# Clearing this dictionary helps avoid the case where
231+
# local procedures are not subscribed on the remote leg
232+
# on reestablishment of remote session.
233+
# See: https://github.com/crossbario/crossbar/issues/1909
234+
self._subs = {}
235+
225236
if self.IS_REMOTE_LEG:
226237
yield forward_current_subs()
227238
else:
228239
# from the local leg, don't try to forward events on the
229240
# remote leg unless the remote session is established.
230241
other.on('join', on_remote_join)
242+
other.on('leave', on_remote_leave)
231243

232244
# listen to when new subscriptions are created on the local router
233245
yield self.subscribe(on_subscription_create,
@@ -267,15 +279,18 @@ def on_registration_create(reg_session, reg_details, details=None):
267279
if reg_details['uri'].startswith("wamp."):
268280
return
269281

270-
if reg_details['id'] in self._regs:
271-
# this should not happen actually, but not sure ..
282+
reg_id = reg_details["id"]
283+
284+
if reg_id in self._regs and self._regs[reg_id]["reg"]:
285+
# This will happen if, partway through the registration process, the RLink disconnects
272286
self.log.error('on_registration_create: reg ID {reg_id} already in map {method}',
273-
reg_id=reg_details['id'],
287+
reg_id=reg_id,
274288
method=hltype(BridgeSession._setup_invocation_forwarding))
275289
return
276290

277-
self._regs[reg_details['id']] = reg_details
278-
self._regs[reg_details['id']]['reg'] = None
291+
if reg_id not in self._regs:
292+
self._regs[reg_id] = reg_details
293+
self._regs[reg_id]['reg'] = None
279294

280295
uri = reg_details['uri']
281296
ERR_MSG = [None]
@@ -365,17 +380,17 @@ def on_call(*args, **kwargs):
365380
# on the "other" router, *this* router may have already
366381
# un-registered. If that happened, our registration will
367382
# be gone, so we immediately un-register on the other side
368-
if reg_details['id'] not in self._regs:
383+
if reg_id not in self._regs:
369384
self.log.info("registration already gone: {uri}", uri=reg_details['uri'])
370385
yield reg.unregister()
371386
else:
372-
self._regs[reg_details['id']]['reg'] = reg
387+
self._regs[reg_id]['reg'] = reg
373388

374-
self.log.info(
389+
self.log.debug(
375390
"created forwarding registration: me={me} other={other} reg_id={reg_id} reg_details={reg_details} details={details} reg_session={reg_session}",
376391
me=self._session_id,
377392
other=other._session_id,
378-
reg_id=reg_details['id'],
393+
reg_id=reg_id,
379394
reg_details=reg_details,
380395
details=details,
381396
reg_session=reg_session,

0 commit comments

Comments
 (0)