diff --git a/gevent_zeromq/core.py b/gevent_zeromq/core.py index 6badd0a..03a320c 100644 --- a/gevent_zeromq/core.py +++ b/gevent_zeromq/core.py @@ -10,6 +10,8 @@ from gevent.event import Event from gevent.hub import get_hub +from gevent_zeromq.helpers import create_weakmethod + class _Context(_original_Context): """Replacement for :class:`zmq.core.context.Context` @@ -48,29 +50,41 @@ class _Socket(_original_Socket): """ def __init__(self, context, socket_type): + self._state_event = None super(_Socket, self).__init__(context, socket_type) self.__setup_events() - def close(self): - # close the _state_event event, keeps the number of active file descriptors down - if not self.closed and getattr(self, '_state_event', None): + def __del__(self): + """Unregisters itself from the event loop. + """ + # We need __del__. We would not be able to access + # the object properties inside close() called + # from _original_Socket.__dealloc__(). + if self._state_event is not None: try: self._state_event.stop() except AttributeError, e: # gevent<1.0 compat self._state_event.cancel() + self._state_event = None + + def close(self): + # close the _state_event event, keeps the number of active file descriptors down + if not self.closed and getattr(self, '_state_event', None): + self.__del__() super(_Socket, self).close() def __setup_events(self): self.__readable = Event() self.__writable = Event() + callback = create_weakmethod(_Socket.__state_changed, self, _Socket) try: self._state_event = get_hub().loop.io(self.getsockopt(FD), 1) # read state watcher - self._state_event.start(self.__state_changed) + self._state_event.start(callback) except AttributeError: # for gevent<1.0 compatibility from gevent.core import read_event - self._state_event = read_event(self.getsockopt(FD), self.__state_changed, persist=True) + self._state_event = read_event(self.getsockopt(FD), callback, persist=True) def __state_changed(self, event=None, _evtype=None): if self.closed: diff --git a/gevent_zeromq/core.pyx b/gevent_zeromq/core.pyx index f3f29ea..27ed22e 100644 --- a/gevent_zeromq/core.pyx +++ b/gevent_zeromq/core.pyx @@ -10,6 +10,8 @@ from zmq.core.socket cimport Socket as _original_Socket from gevent.event import Event from gevent.hub import get_hub +from gevent_zeromq.helpers import create_weakmethod + cdef class _Socket(_original_Socket) @@ -50,6 +52,7 @@ cdef class _Socket(_original_Socket): """ cdef object __readable cdef object __writable + cdef object __weakref__ cdef public object _state_event def __init__(self, _Context context, int socket_type): @@ -69,13 +72,14 @@ cdef class _Socket(_original_Socket): cdef __setup_events(self) with gil: self.__readable = Event() self.__writable = Event() + callback = create_weakmethod(_Socket.__state_changed, self, _Socket) try: self._state_event = get_hub().loop.io(self.getsockopt(FD), 1) # read state watcher - self._state_event.start(self.__state_changed) + self._state_event.start(callback) except AttributeError, e: # for gevent<1.0 compatibility from gevent.core import read_event - self._state_event = read_event(self.getsockopt(FD), self.__state_changed, persist=True) + self._state_event = read_event(self.getsockopt(FD), callback, persist=True) def __state_changed(self, event=None, _evtype=None): if self.closed: diff --git a/gevent_zeromq/helpers.py b/gevent_zeromq/helpers.py new file mode 100644 index 0000000..98a76b9 --- /dev/null +++ b/gevent_zeromq/helpers.py @@ -0,0 +1,18 @@ + +import weakref +import types + + +def create_weakmethod(unbound_method, im_self, im_class): + """Allows the given callback to disappear + when nobody else is referencing it. + """ + ref = weakref.ref(im_self) + im_self = None + def wrapper(*args, **kw): + obj = ref() + if obj is None: + return + types.MethodType(unbound_method, obj, im_class)(*args, **kw) + + return wrapper