diff --git a/Pyrlang/Dist/base_connection.py b/Pyrlang/Dist/base_connection.py index 241aff9..5eda61d 100644 --- a/Pyrlang/Dist/base_connection.py +++ b/Pyrlang/Dist/base_connection.py @@ -22,9 +22,7 @@ from hashlib import md5 from typing import Union -from gevent.queue import Queue - -from Pyrlang import logger, term +from Pyrlang import logger, mailbox, term from Pyrlang.Dist import util, etf LOG = logger.nothing @@ -59,7 +57,8 @@ def __init__(self, node): self.socket_ = None self.addr_ = None - self.inbox_ = Queue() # refer to util.make_handler_in which reads this + # refer to util.make_handler_in which reads this + self.inbox_ = mailbox.Mailbox() """ Inbox is used to ask the connection to do something. """ self.peer_distr_version_ = (None, None) @@ -120,9 +119,10 @@ def on_packet(self, data) -> bool: def on_connection_lost(self): """ Handler is called when the client has disconnected """ - from Pyrlang.node import Node - Node.singleton.inbox_.put( - ('node_disconnected', self.peer_name_)) + if self.peer_name_ is not None: + from Pyrlang.node import Node + Node.singleton.inbox_.put( + ('node_disconnected', self.peer_name_)) def _send_packet2(self, content: bytes): """ Send a handshake-time status message with a 2 byte length prefix @@ -134,7 +134,7 @@ def _send_packet2(self, content: bytes): def _send_packet4(self, content: bytes): """ Send a connection-time status message with a 4 byte length prefix """ - # LOG("Dist: pkt out", content) + LOG("Dist: pkt out", content) msg = struct.pack(">I", len(content)) + content self.socket_.sendall(msg) @@ -172,11 +172,18 @@ def on_passthrough_message(control_term, msg_term): else: ERROR("Unhandled 'p' message: %s\n%s" % (control_term, msg_term)) + def handle_inbox(self): + while True: + msg = self.inbox_.receive(filter_fn=lambda _: True) + if msg is None: + break + self.handle_one_inbox_message(msg) + def handle_one_inbox_message(self, m): # Send a ('send', Dst, Msg) to deliver a message to the other side if m[0] == 'send': - (_, dst, msg) = m - ctrl = self._control_term_send(dst) + (_, from_pid, dst, msg) = m + ctrl = self._control_term_send(from_pid=from_pid, dst=dst) LOG("Connection: control msg %s; %s" % (ctrl, msg)) return self._control_message(ctrl, msg) @@ -190,8 +197,11 @@ def handle_one_inbox_message(self, m): ERROR("Connection: Unhandled message to InConnection %s" % m) @staticmethod - def _control_term_send(dst): - return CONTROL_TERM_SEND, term.Atom(''), dst + def _control_term_send(from_pid, dst): + if isinstance(dst, term.Atom): + return CONTROL_TERM_REG_SEND, from_pid, term.Atom(''), dst + else: + return CONTROL_TERM_SEND, term.Atom(''), dst def _control_message(self, ctrl, msg): """ Pack a control message and a regular message (can be None) together @@ -240,6 +250,7 @@ def on_packet_connected(self, data): (msg_term, tail) = etf.binary_to_term(tail) else: msg_term = None + self.on_passthrough_message(control_term, msg_term) else: @@ -248,4 +259,10 @@ def on_packet_connected(self, data): return True + def report_dist_connected(self): + assert(self.peer_name_ is not None) + LOG("Dist: connected to", self.peer_name_) + self.node_.inbox_.put(('node_connected', self.peer_name_, self)) + + __all__ = ['BaseConnection', 'DistributionError'] diff --git a/Pyrlang/Dist/distribution.py b/Pyrlang/Dist/distribution.py index e97a515..960d908 100644 --- a/Pyrlang/Dist/distribution.py +++ b/Pyrlang/Dist/distribution.py @@ -79,29 +79,28 @@ def disconnect(self) -> None: """ self.epmd_.close() - def connect_to_node(self, this_node, remote_node: str) -> bool: + def connect_to_node(self, this_node, remote_node: str): """ Query EPMD where is the node, and initiate dist connection. Blocks the Greenlet until the connection is made or have failed. :type this_node: Pyrlang.Node :param this_node: Reference to Erlang Node object :param remote_node: String with node 'name@ip' - :return: Success or failure + :return: Handler or None """ try: host_port = EPMDClient.query_node(remote_node) - (handler, sock) = helpers.connect_with( + (handler, _sock) = helpers.connect_with( protocol_class=OutConnection, host_port=host_port, args=[], kwargs={"node": this_node} ) + return handler except Exception as e: ERROR("Dist:", e) - return False - - return True + return None __all__ = ['ErlangDistribution'] diff --git a/Pyrlang/Dist/etf.py b/Pyrlang/Dist/etf.py index aa7f10c..5e35c4f 100644 --- a/Pyrlang/Dist/etf.py +++ b/Pyrlang/Dist/etf.py @@ -71,7 +71,6 @@ def binary_to_term(data: bytes): if data[0] != ETF_VERSION_TAG: raise ETFDecodeException("Unsupported external term version") - # TODO: Compressed tag if data[1] == TAG_COMPRESSED: do = decompressobj() decomp_size = util.u32(data, 2) diff --git a/Pyrlang/Dist/helpers.py b/Pyrlang/Dist/helpers.py index 5497a0b..95a8cb8 100644 --- a/Pyrlang/Dist/helpers.py +++ b/Pyrlang/Dist/helpers.py @@ -14,11 +14,13 @@ from __future__ import print_function import traceback + +import gevent import gevent.select as select from gevent import socket -def _handle_socket_read(receiver, sock): +def _handle_socket_read(handler, sock): collected = b'' while True: # a full packet before calling on_packet in the handler class @@ -33,7 +35,7 @@ def _handle_socket_read(receiver, sock): # Try and consume repeatedly if multiple messages arrived # in the same packet while True: - collected1 = receiver.consume(collected) + collected1 = handler.consume(collected) if collected1 is None: print("Protocol requested to disconnect the socket") break @@ -42,14 +44,15 @@ def _handle_socket_read(receiver, sock): collected = collected1 else: - while not receiver.inbox_.empty(): - msg = receiver.inbox_.get_nowait() - receiver.handle_one_inbox_message(msg) - # Longer sleep when there's no data + handler.handle_inbox() + except select.error: # Disconnected probably or another error break + sock.close() + handler.on_connection_lost() + def make_handler_in(receiver_class, args, kwargs): """ A basic connection handler that takes an accepted connection and feeds @@ -107,18 +110,14 @@ def connect_with(protocol_class, host_port: tuple, print("Connection to %s established" % str(host_port)) try: - _handle_socket_read(handler, sock) + g = gevent.spawn(_handle_socket_read, handler, sock) + g.start() except Exception as e: print("\nException: %s" % e) traceback.print_exc() print() - finally: - print("Client disconnected", host_port) - sock.close() - handler.on_connection_lost() - return handler, sock diff --git a/Pyrlang/Dist/in_connection.py b/Pyrlang/Dist/in_connection.py index 8b55a3c..f81ffee 100644 --- a/Pyrlang/Dist/in_connection.py +++ b/Pyrlang/Dist/in_connection.py @@ -116,7 +116,7 @@ def on_packet_challengereply(self, data): self._send_challenge_ack(peers_challenge, my_cookie) self.packet_len_size_ = 4 self.state_ = self.CONNECTED - self.node_.inbox_.put(('node_connected', self.peer_name_, self)) + self.report_dist_connected() # TODO: start timer with node_opts_.network_tick_time_ diff --git a/Pyrlang/Dist/out_connection.py b/Pyrlang/Dist/out_connection.py index cb87bcd..7c6931b 100644 --- a/Pyrlang/Dist/out_connection.py +++ b/Pyrlang/Dist/out_connection.py @@ -149,7 +149,7 @@ def on_packet_recvchallenge_ack(self, data): self.packet_len_size_ = 4 self.state_ = self.CONNECTED - self.node_.inbox_.put(('node_connected', self.peer_name_, self)) + self.report_dist_connected() # TODO: start timer with node_opts_.network_tick_time_ diff --git a/Pyrlang/__init__.py b/Pyrlang/__init__.py index a23f6c2..ae8b44c 100644 --- a/Pyrlang/__init__.py +++ b/Pyrlang/__init__.py @@ -15,8 +15,9 @@ from Pyrlang.node import * from Pyrlang.process import * from Pyrlang.term import * +from Pyrlang.mailbox import * __all__ = ['init', 'Node', 'Process', 'Atom', 'Pid', 'Reference', - 'List'] + 'List', 'Mailbox'] diff --git a/Pyrlang/mailbox.py b/Pyrlang/mailbox.py new file mode 100644 index 0000000..2e5f74f --- /dev/null +++ b/Pyrlang/mailbox.py @@ -0,0 +1,88 @@ +# Copyright 2017, Erlang Solutions Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import queue + +import gevent +from gevent.queue import Queue + +from Pyrlang import logger + +LOG = logger.tty +ERROR = logger.tty + + +class Mailbox: + """ Implements a wrapper around gevent.Queue which serves as a message + inbox with selective receive feature. + """ + + def __init__(self): + self.queue_ = Queue() + + def put(self, m: tuple): + LOG("Mailbox.put", m) + self.queue_.put(m) + + def get(self): + """ Receives ANY message whatever is the first in the queue. Blocks the + greenlet if the queue is empty. + """ + return self.queue_.get() + + def get_nowait(self): + """ Receives ANY message whatever is the first. + + :raises queue.Empty: If the queue is empty + """ + return self.queue_.get_nowait() + + def receive_wait(self, filter_fn: callable): + while True: + LOG(self.queue_.queue) + + m = self.receive(filter_fn=filter_fn) + if m is not None: + return m + + LOG("receive wait...") + gevent.sleep(3.0) + + def receive(self, filter_fn: callable): + """ Apply filter repeatedly to messages in the inbox. + + :returns: Message, if the filter returned True, otherwise ``None`` + if no message matches or the mailbox was empty + """ + if self.queue_.empty(): + return None + + # try every element in the queue, get it, check it, place it into the + # queue end (NOTE: This will mix the messages breaking the order) + try: + for i in range(len(self.queue_)): + m = self.queue_.get_nowait() + + if filter_fn(m): + LOG("Mailbox: match return", m) + return m + + self.queue_.put(m) + + except queue.Empty: + pass + + return None + + +__all__ = ['Mailbox'] diff --git a/Pyrlang/node.py b/Pyrlang/node.py index 45ad92b..c0fffe4 100644 --- a/Pyrlang/node.py +++ b/Pyrlang/node.py @@ -20,13 +20,13 @@ from gevent import Greenlet from gevent.queue import Queue -from Pyrlang import logger +from Pyrlang import logger, mailbox from Pyrlang.term import * from Pyrlang.Dist.distribution import ErlangDistribution from Pyrlang.Dist.node_opts import NodeOpts from Pyrlang.process import Process -LOG = logger.nothing +LOG = logger.tty WARN = logger.nothing ERROR = logger.tty @@ -70,7 +70,7 @@ def __init__(self, name: str, cookie: str) -> None: raise NodeException("Singleton Node was already created") Node.singleton = self - self.inbox_ = Queue() + self.inbox_ = mailbox.Mailbox() """ Message queue based on ``gevent.Queue``. It is periodically checked in the ``_run`` method and the receive handler is called. """ @@ -124,21 +124,26 @@ def __init__(self, name: str, cookie: str) -> None: def _run(self): while not self.is_exiting_: - while not self.inbox_.empty(): - msg = self.inbox_.get_nowait() - self.handle_one_inbox_message(msg) + self.handle_inbox() gevent.sleep(0.0) + def handle_inbox(self): + while True: + msg = self.inbox_.receive(filter_fn=lambda _: True) + if msg is None: + break + self.handle_one_inbox_message(msg) + def handle_one_inbox_message(self, m: tuple): """ Handler is called whenever a message arrives to the mailbox. """ - # Send a ('node_connected', IP, Connection) to inform about the + # Send a ('node_connected', NodeName, Connection) to inform about the # connectivity with the other node if m[0] == 'node_connected': (_, addr, connection) = m self.dist_nodes_[addr] = connection - # Send a ('node_disconnected', IP) to forget the connection + # Send a ('node_disconnected', NodeName) to forget the connection elif m[0] == 'node_disconnected': (_, addr) = m del self.dist_nodes_[addr] @@ -225,7 +230,7 @@ def send(self, sender, receiver, message) -> None: """ Deliver a message to a pid or to a registered name. The pid may be located on another Erlang node. - :param sender: Currently unused + :param sender: Message sender :type sender: Pid :type receiver: Pid or Atom or tuple[Atom, Pid or Atom] :param receiver: Message receiver, a pid, or a name, or a tuple with @@ -243,7 +248,8 @@ def send(self, sender, receiver, message) -> None: return self.send(sender, r_name, message) else: # route remotely - return self._send_remote(dst_node=str(r_node), + return self._send_remote(sender=sender, + dst_node=str(r_node), receiver=r_name, message=message) @@ -251,7 +257,8 @@ def send(self, sender, receiver, message) -> None: if receiver.is_local_to(self): return self._send_local(receiver, message) else: - return self._send_remote(dst_node=str(receiver.node_), + return self._send_remote(sender=sender, + dst_node=str(receiver.node_), receiver=receiver, message=message) @@ -260,9 +267,9 @@ def send(self, sender, receiver, message) -> None: raise NodeException("Don't know how to send to %s" % receiver) - def _send_remote(self, dst_node: str, receiver, message) -> None: + def _send_remote(self, sender, dst_node: str, receiver, message) -> None: LOG("Node._send_remote %s <- %s" % (receiver, message)) - m = ('send', receiver, message) + m = ('send', sender, receiver, message) return self.dist_command(receiver_node=dst_node, message=m) @@ -283,10 +290,24 @@ def dist_command(self, receiver_node: str, message: tuple) -> None: values """ if receiver_node not in self.dist_nodes_: - if not self.dist_.connect_to_node(this_node=self, - remote_node=receiver_node): + LOG("Node: connect to node", receiver_node) + handler = self.dist_.connect_to_node( + this_node=self, + remote_node=receiver_node) + + if handler is None: raise NodeException("Node not connected %s" % receiver_node) + # block until connected, and get the connected message + LOG("Node: wait for 'node_connected'") + # msg = self.inbox_.receive_wait( + # filter_fn=lambda m: m[0] == 'node_connected' + # ) + while receiver_node not in self.dist_nodes_: + gevent.sleep(0.1) + + LOG("Node: connected") + conn = self.dist_nodes_[receiver_node] conn.inbox_.put(message) diff --git a/Pyrlang/process.py b/Pyrlang/process.py index 0ca3a85..7159c6f 100644 --- a/Pyrlang/process.py +++ b/Pyrlang/process.py @@ -16,7 +16,8 @@ import gevent from gevent import Greenlet -from gevent.queue import Queue + +from Pyrlang import mailbox class Process(Greenlet): @@ -33,7 +34,7 @@ def __init__(self, node) -> None: """ Greenlet.__init__(self) - self.inbox_ = Queue() + self.inbox_ = mailbox.Mailbox() """ Message queue (gevent.Queue). Messages are detected by the ``_run`` loop and handled one by one in ``handle_one_inbox_message()``. """ @@ -55,11 +56,16 @@ def __init__(self, node) -> None: def _run(self): while not self.is_exiting_: - while not self.inbox_.empty(): - msg = self.inbox_.get_nowait() - self.handle_one_inbox_message(msg) + self.handle_inbox() gevent.sleep(0.0) + def handle_inbox(self): + while True: + msg = self.inbox_.receive(filter_fn=lambda _: True) + if msg is None: + break + self.handle_one_inbox_message(msg) + def handle_one_inbox_message(self, msg): """ Override this method to handle new incoming messages. """ print("%s: Handling msg %s" % (self.pid_, msg)) diff --git a/docs/source/index.rst b/docs/source/index.rst index 2e02837..8dc539c 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -44,6 +44,7 @@ changes required. :maxdepth: 1 :caption: Other modules + mailbox rex net_kernel gen diff --git a/docs/source/mailbox.rst b/docs/source/mailbox.rst new file mode 100644 index 0000000..dd9d519 --- /dev/null +++ b/docs/source/mailbox.rst @@ -0,0 +1,7 @@ +mailbox module +============== + +.. automodule:: Pyrlang.mailbox + :members: + :undoc-members: + :show-inheritance: diff --git a/test1.py b/test1.py index 86fb8c2..b0a9d20 100644 --- a/test1.py +++ b/test1.py @@ -10,7 +10,8 @@ def main(): node = Pyrlang.Node("py@127.0.0.1", "COOKIE") node.start() - node.send(None, (Atom('erl@127.0.0.1'), Atom('rex')), Atom('hello')) + pid = node.register_new_process(None) + node.send(pid, (Atom('erl@127.0.0.1'), Atom('shell')), Atom('hello')) while True: # Sleep gives other greenlets time to run