Skip to content

Commit

Permalink
Selective receive. Fix with out connection
Browse files Browse the repository at this point in the history
  • Loading branch information
kvakvs committed Jun 1, 2017
1 parent ea825ed commit 6375803
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 55 deletions.
41 changes: 29 additions & 12 deletions Pyrlang/Dist/base_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
from hashlib import md5
from typing import Union

from gevent.queue import Queue

from Pyrlang import logger, term
from Pyrlang import logger, mailbox, term
from Pyrlang.Dist import util, etf

LOG = logger.nothing
Expand Down Expand Up @@ -59,7 +57,8 @@ def __init__(self, node):
self.socket_ = None
self.addr_ = None

self.inbox_ = Queue() # refer to util.make_handler_in which reads this
# refer to util.make_handler_in which reads this
self.inbox_ = mailbox.Mailbox()
""" Inbox is used to ask the connection to do something. """

self.peer_distr_version_ = (None, None)
Expand Down Expand Up @@ -120,9 +119,10 @@ def on_packet(self, data) -> bool:
def on_connection_lost(self):
""" Handler is called when the client has disconnected
"""
from Pyrlang.node import Node
Node.singleton.inbox_.put(
('node_disconnected', self.peer_name_))
if self.peer_name_ is not None:
from Pyrlang.node import Node
Node.singleton.inbox_.put(
('node_disconnected', self.peer_name_))

def _send_packet2(self, content: bytes):
""" Send a handshake-time status message with a 2 byte length prefix
Expand All @@ -134,7 +134,7 @@ def _send_packet2(self, content: bytes):
def _send_packet4(self, content: bytes):
""" Send a connection-time status message with a 4 byte length prefix
"""
# LOG("Dist: pkt out", content)
LOG("Dist: pkt out", content)
msg = struct.pack(">I", len(content)) + content
self.socket_.sendall(msg)

Expand Down Expand Up @@ -172,11 +172,18 @@ def on_passthrough_message(control_term, msg_term):
else:
ERROR("Unhandled 'p' message: %s\n%s" % (control_term, msg_term))

def handle_inbox(self):
while True:
msg = self.inbox_.receive(filter_fn=lambda _: True)
if msg is None:
break
self.handle_one_inbox_message(msg)

def handle_one_inbox_message(self, m):
# Send a ('send', Dst, Msg) to deliver a message to the other side
if m[0] == 'send':
(_, dst, msg) = m
ctrl = self._control_term_send(dst)
(_, from_pid, dst, msg) = m
ctrl = self._control_term_send(from_pid=from_pid, dst=dst)
LOG("Connection: control msg %s; %s" % (ctrl, msg))
return self._control_message(ctrl, msg)

Expand All @@ -190,8 +197,11 @@ def handle_one_inbox_message(self, m):
ERROR("Connection: Unhandled message to InConnection %s" % m)

@staticmethod
def _control_term_send(dst):
return CONTROL_TERM_SEND, term.Atom(''), dst
def _control_term_send(from_pid, dst):
if isinstance(dst, term.Atom):
return CONTROL_TERM_REG_SEND, from_pid, term.Atom(''), dst
else:
return CONTROL_TERM_SEND, term.Atom(''), dst

def _control_message(self, ctrl, msg):
""" Pack a control message and a regular message (can be None) together
Expand Down Expand Up @@ -240,6 +250,7 @@ def on_packet_connected(self, data):
(msg_term, tail) = etf.binary_to_term(tail)
else:
msg_term = None

self.on_passthrough_message(control_term, msg_term)

else:
Expand All @@ -248,4 +259,10 @@ def on_packet_connected(self, data):

return True

def report_dist_connected(self):
assert(self.peer_name_ is not None)
LOG("Dist: connected to", self.peer_name_)
self.node_.inbox_.put(('node_connected', self.peer_name_, self))


__all__ = ['BaseConnection', 'DistributionError']
11 changes: 5 additions & 6 deletions Pyrlang/Dist/distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,29 +79,28 @@ def disconnect(self) -> None:
"""
self.epmd_.close()

def connect_to_node(self, this_node, remote_node: str) -> bool:
def connect_to_node(self, this_node, remote_node: str):
""" Query EPMD where is the node, and initiate dist connection. Blocks
the Greenlet until the connection is made or have failed.
:type this_node: Pyrlang.Node
:param this_node: Reference to Erlang Node object
:param remote_node: String with node 'name@ip'
:return: Success or failure
:return: Handler or None
"""
try:
host_port = EPMDClient.query_node(remote_node)
(handler, sock) = helpers.connect_with(
(handler, _sock) = helpers.connect_with(
protocol_class=OutConnection,
host_port=host_port,
args=[],
kwargs={"node": this_node}
)
return handler

except Exception as e:
ERROR("Dist:", e)
return False

return True
return None


__all__ = ['ErlangDistribution']
1 change: 0 additions & 1 deletion Pyrlang/Dist/etf.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ def binary_to_term(data: bytes):
if data[0] != ETF_VERSION_TAG:
raise ETFDecodeException("Unsupported external term version")

# TODO: Compressed tag
if data[1] == TAG_COMPRESSED:
do = decompressobj()
decomp_size = util.u32(data, 2)
Expand Down
23 changes: 11 additions & 12 deletions Pyrlang/Dist/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

from __future__ import print_function
import traceback

import gevent
import gevent.select as select
from gevent import socket


def _handle_socket_read(receiver, sock):
def _handle_socket_read(handler, sock):
collected = b''
while True:
# a full packet before calling on_packet in the handler class
Expand All @@ -33,7 +35,7 @@ def _handle_socket_read(receiver, sock):
# Try and consume repeatedly if multiple messages arrived
# in the same packet
while True:
collected1 = receiver.consume(collected)
collected1 = handler.consume(collected)
if collected1 is None:
print("Protocol requested to disconnect the socket")
break
Expand All @@ -42,14 +44,15 @@ def _handle_socket_read(receiver, sock):

collected = collected1
else:
while not receiver.inbox_.empty():
msg = receiver.inbox_.get_nowait()
receiver.handle_one_inbox_message(msg)
# Longer sleep when there's no data
handler.handle_inbox()

except select.error:
# Disconnected probably or another error
break

sock.close()
handler.on_connection_lost()


def make_handler_in(receiver_class, args, kwargs):
""" A basic connection handler that takes an accepted connection and feeds
Expand Down Expand Up @@ -107,18 +110,14 @@ def connect_with(protocol_class, host_port: tuple,
print("Connection to %s established" % str(host_port))

try:
_handle_socket_read(handler, sock)
g = gevent.spawn(_handle_socket_read, handler, sock)
g.start()

except Exception as e:
print("\nException: %s" % e)
traceback.print_exc()
print()

finally:
print("Client disconnected", host_port)
sock.close()
handler.on_connection_lost()

return handler, sock


Expand Down
2 changes: 1 addition & 1 deletion Pyrlang/Dist/in_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def on_packet_challengereply(self, data):
self._send_challenge_ack(peers_challenge, my_cookie)
self.packet_len_size_ = 4
self.state_ = self.CONNECTED
self.node_.inbox_.put(('node_connected', self.peer_name_, self))
self.report_dist_connected()

# TODO: start timer with node_opts_.network_tick_time_

Expand Down
2 changes: 1 addition & 1 deletion Pyrlang/Dist/out_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def on_packet_recvchallenge_ack(self, data):

self.packet_len_size_ = 4
self.state_ = self.CONNECTED
self.node_.inbox_.put(('node_connected', self.peer_name_, self))
self.report_dist_connected()

# TODO: start timer with node_opts_.network_tick_time_

Expand Down
3 changes: 2 additions & 1 deletion Pyrlang/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
from Pyrlang.node import *
from Pyrlang.process import *
from Pyrlang.term import *
from Pyrlang.mailbox import *


__all__ = ['init',
'Node', 'Process', 'Atom', 'Pid', 'Reference',
'List']
'List', 'Mailbox']
88 changes: 88 additions & 0 deletions Pyrlang/mailbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright 2017, Erlang Solutions Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import queue

import gevent
from gevent.queue import Queue

from Pyrlang import logger

LOG = logger.tty
ERROR = logger.tty


class Mailbox:
""" Implements a wrapper around gevent.Queue which serves as a message
inbox with selective receive feature.
"""

def __init__(self):
self.queue_ = Queue()

def put(self, m: tuple):
LOG("Mailbox.put", m)
self.queue_.put(m)

def get(self):
""" Receives ANY message whatever is the first in the queue. Blocks the
greenlet if the queue is empty.
"""
return self.queue_.get()

def get_nowait(self):
""" Receives ANY message whatever is the first.
:raises queue.Empty: If the queue is empty
"""
return self.queue_.get_nowait()

def receive_wait(self, filter_fn: callable):
while True:
LOG(self.queue_.queue)

m = self.receive(filter_fn=filter_fn)
if m is not None:
return m

LOG("receive wait...")
gevent.sleep(3.0)

def receive(self, filter_fn: callable):
""" Apply filter repeatedly to messages in the inbox.
:returns: Message, if the filter returned True, otherwise ``None``
if no message matches or the mailbox was empty
"""
if self.queue_.empty():
return None

# try every element in the queue, get it, check it, place it into the
# queue end (NOTE: This will mix the messages breaking the order)
try:
for i in range(len(self.queue_)):
m = self.queue_.get_nowait()

if filter_fn(m):
LOG("Mailbox: match return", m)
return m

self.queue_.put(m)

except queue.Empty:
pass

return None


__all__ = ['Mailbox']
Loading

0 comments on commit 6375803

Please sign in to comment.