Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
ebin

_build/
rebar3.*
doc/*.tex
16 changes: 11 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
REBAR = ./rebar3
REBAR = $(shell which rebar3)

all:
@$(REBAR) $(MAKECMDGOALS)
compile:
$(REBAR) compile

%:
@$(REBAR) $(MAKECMDGOALS)
dialyzer:
$(REBAR) dialyzer

clean:
$(REBAR) clean

fmt:
$(REBAR) fmt -w
4 changes: 1 addition & 3 deletions config/sys.config
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
[
{amqp_dist, [
{connections, [
"amqp://guest:guest@rabbitmq:5676/lx1"
,"amqp://guest:guest@rabbitmq:5676/lx2"
,"amqp://guest:guest@rabbitmq:5676/lx3"
"amqp://guest:guest@localhost:5672"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a way to simulate multiple zones. any reason for removing it ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a config that probably works out of the box for someone, vs having to edit it then start nodes. not married to the change but could also introduce a sys.config.example with the multiple zones configured and annotated?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, leave the change and introduce the sys.config.example for multiple zones

]}
]}

Expand Down
10 changes: 10 additions & 0 deletions config/sys.config.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{amqp_dist, [
{connections, [
"amqp://guest:guest@rabbitmq:5676/lx1"
,"amqp://guest:guest@rabbitmq:5676/lx2"
,"amqp://guest:guest@rabbitmq:5676/lx3"
]}
]}

]
201 changes: 201 additions & 0 deletions doc/amqp_dist.org
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
* DistErl

Erlang nodes need to agree on a way to talk to each other over the
network. That agreed-upon way is called a distribution protocol.

Erlang ships with a default distribution protocol =tcp_dist= and leverages another program called EPMD (Erlang Port Mapper Daemon). When a node starts up, it registers itself with a little helper process (EPMD) running on the same machine, and other nodes ask EPMD "hey, how do I reach the node named foo@mymachine?" EPMD answers with a port number, and the connection proceeds from there. It's a bit like a phone book: you look up a name, get a number, dial it.

** Simplified example
Malls, business park, etc have directories of what stores or businesses are there and where to find them. The default for most (the EPMD) is a physical board with names and locations on it.

Alternatives could be QR codes that can be scanned for that information, a mall app folks can load on their phone to navigate, or TV screens that the owners can update in real time.

* Why alternatives?

** Security
The default setup sends a cookie (a shared secret) in a way that might not be acceptable in a locked-down environment. You might want TLS from the very first byte, before any handshake.

** Different environments
In a Kubernetes cluster, for example, pods discover each other through DNS or a service registry, not through EPMD. A custom module can speak that language natively.

** Avoiding an extra process
EPMD is a separate OS-level daemon. Some deployment setups prefer not to have it running at all, especially in containers where you want one process per container.

** Custom transports
The default distribution uses TCP. If you wanted to run node-to-node communication over something else — a Unix domain socket, a shared memory channel, or a message bus — you could do that by implementing the right callbacks.

* Distribution contract
** Registration
"Hi, I'm a node named foo@bar.com and can be reached on port 4369"
** Node Lookup
"Do you know how to reach bob@loblaw.com"
** Connections
Managing sockets (listening and accepting connections)

* DistErl over AMQP
** Why use AMQP
Traditional distribution creates direct TCP connections between connecting nodes, creating either a fully-connected mesh or (using hidden nodes) a hub-spoke arrangement.

With AMQP, each node instead connects to the AMQP broker (we recommend RabbitMQ obviously), and inter-node traffic flows through the broker via exchanges, queues and their bindings.

While this introduces the AMQP broker as a potential SPOF (single point of failure), presumably the broker is already part of your application's infrastructure and thus not introducing new risks. Additionally, =amqp_dist= supports multiple brokers and clustered RabbitMQ is also possible.

The benefits include interesting routing options and more observability of inter-node communication in a single place (the broker).

** Node Startup
When the node =foo@bar.com= starts up, it would publish its presence to an pre-determined exchange

** Inter-node communication
Per node, using default distribution:
- Ask EPMD for VM port to use
- TCP connect to remote IP/port and handshake
- TCP send/recv messages

Using AMQP:
- Publish heartbeat letting everyone know you're up (remote nodes track heartbeats)
- Publish connection request to remote node's queue (if not configured to auto-connect)
- Publish/consume messages to/from remote node's queue established after handeshake

* amqp_dist
** BEAM startup requirements
Code can only be used from the application itself, =kernel= and =stdlib=.

The module controlling distribution interactions should be suffixed =_dist=: we called ours =amqp_dist= accordingly.

** Infrastructure to setup

- A listener entity (a process or port)
- An acceptor process to accept incoming connections via the listening entity

*** Per Connection
Once a connection is accepted, the module needs to create:
- a connection supervisor process (handles handshake for setting up the connection)
- a distribution controller (process or port) for putting data onto the connection

Both should be linked so they're cleaned up when the connection goes down

*** Example dist module
From https://www.erlang.org/doc/apps/erts/alt_dist.html

#+begin_example
An example implementation of a distribution module can be found [[https://www.erlang.org/doc/apps/erts/assets/gen_tcp_dist.erl][here]]

It implements the distribution over TCP/IP using the gen_tcp API with
distribution controllers implemented by processes. This instead of
using port distribution controllers as the ordinary TCP/IP
distribution uses.
#+end_example

** Required Callbacks

#+begin_src erlang
-export([listen/1
,accept/1
,accept_connection/5
,setup/5
,close/1
,select/1
,is_node_name/1
,address/0
]).
#+end_src

*** listen(Name)
Called once, when Erlang distribution is brought up, to listen for incoming connection requests

=Name= is the username part of a =Name@Host= full node name (can be =atom()= or =string()=).

Returns a 3-tuple of ={Socket, Address, Creation}=:

- =Socket= Could be a =#socket{}= but in our case, we have a =#fake_socket{}= record, represents a handle which will be passed to the =accept/1= callback later.
- =Address= a =#net_address{}= record (defined in =kernel/include/net_address.hrl=) about the node
- =Creation= is an integer between =1..3=; we chose 3.

**** =#fake_socket{}=
#+begin_src erlang
-record(fake_socket, {read = 0,
write = 0,
pending = 0,
pid = self() :: pid(),
name :: term(),
mypid :: pid()
}).


{ok, Pid} = amqp_dist_acceptor:start(self(), Name) % start a gen_server

#fake_socket{name=Name, mypid=Pid} % Name from listen/1 arg, Pid is the amqp_dist_acceptor gen_server
#+end_src

**** =#net_address{}=
#+begin_src erlang
#net_address{address = []
,host = inet:gethostname()
,protocol = amqp
,family = amqp
}
#+end_src

*** accept(Listen)

Accepts new connection attempts from other Erlang nodes.

#+begin_src erlang
accept(Listen) ->
spawn_opt(?MODULE, start_accept, [self(), Listen], [link, {priority, max}]).
#+end_src

=accept_loop= receives connection tuples ={connection, Tag, Node, Connection, Queue}= from =amqp_dist_acceptor=

The loop will message the kernel process =Kernel ! {accept, self(), {Tag, Node, Connection, Queue, Listen}, amqp, amqp}= to accept the connection and wait for the Kernel to respond with the supervising process via ={Kernel, controller, SupervisorPid}= message back. The =accept= tuple has the shape of ={accept,AcceptPid,Socket,Family,Proto}= which maps back to the =#net_address{}= returned in =listen/1=.

=net_kernel= will call =amqp_dist:accept_connection/5= which will spawn a process into the =do_accept/6= function to perform the needed handshake. A new record =#hs_data{}= tracks the handshake information.

** =amqp_dist_acceptor=
This =gen_server= accepts AMQP payloads from other nodes to connect.

After initializing, it starts AMQP connection(s) to the configured brokers via =start_connections/0=. These settings are fetched with =application:get_env/3= with the app's =env= might look like:
#+begin_src erlang
{env,[{heartbeat_period_ms, 30000}
,{heartbeat_timeout_ms, 45000}
,{connection_timeout_ms, 10000}
,{pause_before_reconnect_ms, 3500}
,{server_call_timeout_ms, 750}
,{connections, ["amqp://guest:guest@broker.add.re.ss:5672"]
]}
#+end_src

Once the broker connection is established:
1. an AMQP channel is started
2. the exchange =amq.headers= is configured
- Headers exchange =amq.match= (and =amq.headers= in RabbitMQ) see [[https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-headers][here]]. For routing on attributes vs routing keys
3. an exclusive queue is declared: =list_to_binary(["amqp_dist_acceptor-", atom_to_list(node()), "-", pid_to_list(self())]);=
4. the queue is bound to the exchange with a header argument ={<<"distribution.ping">>, bool, true}=
5. start consuming from the queue

Once a broker is up and channel and queue configured, amqp_dist_acceptor starts a heartbeat timer (default 60s) which will publish a message with headers:
#+begin_src erlang
,reply_to = QueueName
,headers = [{<<"distribution.ping">>, bool, true}
,{<<"node.start">>, timestamp, Start}
]
#+end_src

Which should match all the bindings for any other existing nodes' queues bound to the broker.

*** AMQP Message handling
**** New node present
When a remote node publishes its heartbeat and the local node is seeing it for the first time, the =gen_server= will determine whether to auto-connect to the node (via the =auto_connect_nodes= env param), =net_kernel:connect_node(RemoteNode)= will be spawned to establish a connection to the remote node. Ultimately this will call =amqp_dist:select(RemoteNode)= which will call =amqp_dist_acceptor:is_up(RemoteNode)= which returns whether the remote node is known and the "connection" is established in =net_kernel=.

**** Remote node wants to connect
When the =amqp_dist_acceptor= receives a payload off AMQP, it will be the heartbeat of another node.

The payload is a term_to_binary-encoded two-tuple ={amqp_dist, connect}=. When received, a 5-tuple will be sent to the =amqp_dist= acceptor process ={connection, Label, Node, Connection, RemoteQueue}=.
** =amqp_dist_node=
=gen_server= that handles sending and receiving data between the local node and a connected remote node.

Once the handshake is completed, messages between nodes can begin. While not necessary, =amqp_dist= spawns an input handler process =amqp_dist:dist_cntrlr_input_setup/3= to register itself with =amqp_dist_node= as the receiver process for data from the remote node.

For data from the local node to send to the remote node, =erlang:dist_ctrl_get_data(DHandle)= will be called and if data is returned, =amqp_dist_node:send/2= will take care of publishing the data to the correct remote node's AMQP queue (as the routing key).

Arbitrary Erlang terms are encoded using =base64:encode(term_to_binary(Term))= for sending and decoded in reverse.
Loading