Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions examples/scripting/client.exs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import Slipstream
import Slipstream.Socket

socket = connect!(uri: "ws://localhost:4000/socket/websocket") |> await_connect!
socket =
connect!(uri: "ws://localhost:4000/socket/websocket") |> await_connect!()

topic = "rooms:lobby"
socket = join(socket, topic, %{"fizz" => "buzz"}) |> await_join!(topic)

_ref = push!(socket, topic, "quicksand", %{"a" => "b"})

{:ok, %{"pong" => "pong"}} = push!(socket, topic, "ping", %{}) |> await_reply!
{:ok, %{"pong" => "pong"}} = push!(socket, topic, "ping", %{}) |> await_reply!()

push!(socket, topic, "push to me", %{})

await_message!(^topic, "foo", _payload)

socket = leave(socket, topic) |> await_leave!(topic)
socket |> disconnect() |> await_disconnect!
socket |> disconnect() |> await_disconnect!()
10 changes: 5 additions & 5 deletions lib/slipstream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -802,14 +802,14 @@ defmodule Slipstream do
{:ok, Socket.t()} | {:error, NimbleOptions.ValidationError.t()}
@spec connect(socket :: Socket.t(), opts :: Keyword.t()) ::
{:ok, Socket.t()} | {:error, NimbleOptions.ValidationError.t()}
def connect(socket \\ new_socket(), opts) do
def connect(%Socket{} = socket \\ new_socket(), opts) do
case Slipstream.Configuration.validate(opts) do
{:ok, config} ->
socket = TelemetryHelper.begin_connect(socket, config)

route_command %Commands.OpenConnection{config: config, socket: socket}

{:ok, %Socket{socket | channel_config: config}}
{:ok, %{socket | channel_config: config}}

{:error, reason} ->
{:error, reason}
Expand All @@ -826,13 +826,13 @@ defmodule Slipstream do
@doc since: "0.1.0"
@spec connect!(opts :: Keyword.t()) :: Socket.t()
@spec connect!(socket :: Socket.t(), opts :: Keyword.t()) :: Socket.t()
def connect!(socket \\ new_socket(), opts) do
def connect!(%Socket{} = socket \\ new_socket(), opts) do
config = Slipstream.Configuration.validate!(opts)
socket = TelemetryHelper.begin_connect(socket, config)
%Socket{} = socket = TelemetryHelper.begin_connect(socket, config)

route_command %Commands.OpenConnection{config: config, socket: socket}

%Socket{socket | channel_config: config}
%{socket | channel_config: config}
end

@doc """
Expand Down
2 changes: 1 addition & 1 deletion lib/slipstream/callback.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ defmodule Slipstream.Callback do
# note that `args` needs to be a compile-time list for this to work
arity = length(args) + 1

unless {name, arity} in @known_callbacks do
if {name, arity} not in @known_callbacks do
raise CompileError,
file: __CALLER__.file,
line: __CALLER__.line,
Expand Down
2 changes: 1 addition & 1 deletion lib/slipstream/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Slipstream.Connection do

metadata = Telemetry.begin(initial_state)

state = %State{initial_state | metadata: metadata}
%State{} = state = %{initial_state | metadata: metadata}

{:ok, state, {:continue, :connect}}
end
Expand Down
4 changes: 2 additions & 2 deletions lib/slipstream/connection/impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ defmodule Slipstream.Connection.Impl do

# ---

def push_message(frame, state) when is_tuple(frame) do
def push_message(frame, %State{} = state) when is_tuple(frame) do
case Mint.WebSocket.encode(state.websocket, frame) do
{:ok, websocket, data} ->
case Mint.WebSocket.stream_request_body(
Expand All @@ -57,7 +57,7 @@ defmodule Slipstream.Connection.Impl do
data
) do
{:ok, conn} ->
{:ok, %State{state | conn: conn, websocket: websocket}}
{:ok, %{state | conn: conn, websocket: websocket}}

# coveralls-ignore-start
{:error, conn, reason} ->
Expand Down
49 changes: 25 additions & 24 deletions lib/slipstream/connection/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Slipstream.Connection.Pipeline do
defp decode_message(
%{
raw_message: {:DOWN, ref, :process, _pid, reason},
state: %{client_ref: ref}
state: %State{client_ref: ref}
} = p
) do
put_message(p, event(%Events.ParentProcessExited{reason: reason}))
Expand All @@ -64,7 +64,7 @@ defmodule Slipstream.Connection.Pipeline do
{:status, ref, status},
{:headers, ref, headers} | _maybe_done
],
state: %{request_ref: ref} = state
state: %State{request_ref: ref} = state
} = p
) do
case Mint.WebSocket.new(state.conn, ref, status, headers) do
Expand All @@ -77,7 +77,7 @@ defmodule Slipstream.Connection.Pipeline do
response_headers: headers
})
)
|> put_state(%State{p.state | conn: conn, websocket: websocket})
|> put_state(%{p.state | conn: conn, websocket: websocket})

{:error, _conn, reason} ->
failure_info = %{
Expand Down Expand Up @@ -140,7 +140,7 @@ defmodule Slipstream.Connection.Pipeline do
case Mint.WebSocket.stream(p.state.conn, p.raw_message) do
{:ok, conn, messages} ->
put_in(p.raw_message, messages)
|> put_state(%State{p.state | conn: conn})
|> put_state(%{p.state | conn: conn})
|> decode_message()

{:error, conn, %Mint.TransportError{reason: :closed}, _} ->
Expand Down Expand Up @@ -173,11 +173,11 @@ defmodule Slipstream.Connection.Pipeline do

@spec handle_message(t()) :: t()
defp handle_message(
%{message: :connect, state: %{config: config} = state} = p
%{message: :connect, state: %State{config: config} = state} = p
) do
with {:ok, conn} <- Impl.http_connect(config),
{:ok, conn, ref} <- Impl.websocket_upgrade(conn, config) do
put_state(p, %State{state | conn: conn, request_ref: ref})
put_state(p, %{state | conn: conn, request_ref: ref})
else
# coveralls-ignore-start
{:error, conn, reason} ->
Expand Down Expand Up @@ -273,7 +273,7 @@ defmodule Slipstream.Connection.Pipeline do
%{message: command(%Commands.JoinTopic{} = cmd), state: state} = p
) do
{ref, state} = State.next_ref(state)
state = %State{state | joins: Map.put(state.joins, cmd.topic, ref)}
%State{} = state = %{state | joins: Map.put(state.joins, cmd.topic, ref)}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these clauses of handle_message/1, rather than matching %State{} as the return value, shouldn't we match in the function parameters?

defp handle_message(%{message: command(%Commands.JoinTopic{} = cmd), state: %State{} = state}) do
  # ..
end

I haven't been following the type system stuff lately but I would assume that this would give the type system a better hint about the original state binding, and it would know that state.joins exists on this line for example. And that state.config exists in clauses below, like the one for %Events.ChannelConnected{}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah pattern matching in the head is probably a better thing to do,


p
|> put_state(state)
Expand All @@ -290,7 +290,7 @@ defmodule Slipstream.Connection.Pipeline do
%{message: command(%Commands.LeaveTopic{} = cmd), state: state} = p
) do
{ref, state} = State.next_ref(state)
state = %State{state | leaves: Map.put(state.leaves, cmd.topic, ref)}
%State{} = state = %{state | leaves: Map.put(state.leaves, cmd.topic, ref)}

p
|> put_state(state)
Expand Down Expand Up @@ -353,7 +353,7 @@ defmodule Slipstream.Connection.Pipeline do

defp handle_message(%{message: event(%type{} = event), state: state} = p)
when type in [Events.TopicJoinFailed, Events.TopicJoinClosed] do
state = %State{state | joins: Map.delete(state.joins, event.topic)}
%State{} = state = %{state | joins: Map.delete(state.joins, event.topic)}

route_event state, event

Expand All @@ -364,7 +364,7 @@ defmodule Slipstream.Connection.Pipeline do
%{message: event(%Events.TopicLeaveAccepted{} = event), state: state} =
p
) do
state = %State{state | leaves: Map.delete(state.leaves, event.topic)}
%State{} = state = %{state | leaves: Map.delete(state.leaves, event.topic)}

put_state(p, state)
end
Expand All @@ -391,8 +391,9 @@ defmodule Slipstream.Connection.Pipeline do
tref
end

state =
%State{state | status: :connected, heartbeat_timer: timer}
%State{} =
state =
%{state | status: :connected, heartbeat_timer: timer}
|> State.reset_heartbeat()

route_event state, event
Expand All @@ -410,7 +411,7 @@ defmodule Slipstream.Connection.Pipeline do
:timer.cancel(state.heartbeat_timer)
end

state = %State{state | status: :terminating}
%State{} = state = %{state | status: :terminating}

route_event state, event

Expand Down Expand Up @@ -450,15 +451,15 @@ defmodule Slipstream.Connection.Pipeline do
defp default_return(p), do: p

@spec build_events(t()) :: t()
defp build_events(%{events: []} = p), do: p
defp build_events(%__MODULE__{events: []} = p), do: p

defp build_events(%{events: events} = p) do
defp build_events(%__MODULE__{events: events} = p) do
built_events =
Enum.map(events, fn %{type: type, attrs: attrs} ->
build_event(type, attrs)
end)

%__MODULE__{p | built_events: built_events}
%{p | built_events: built_events}
end

defp emit_events(%{built_events: []} = p), do: p
Expand All @@ -482,13 +483,13 @@ defmodule Slipstream.Connection.Pipeline do
# --- token API

@spec put_state(t(), State.t()) :: t()
def put_state(p, state) do
%__MODULE__{p | state: state}
def put_state(%__MODULE__{} = p, state) do
%{p | state: state}
end

@spec put_message(t(), term()) :: t()
def put_message(p, message) do
%__MODULE__{p | message: message}
def put_message(%__MODULE__{} = p, message) do
%{p | message: message}
end

@doc """
Expand All @@ -498,8 +499,8 @@ defmodule Slipstream.Connection.Pipeline do
build the event in the `build_events/1` phase of the pipeline
"""
@spec put_event(t(), atom(), Keyword.t() | map()) :: t()
def put_event(p, event, attrs \\ %{}) do
%__MODULE__{
def put_event(%__MODULE__{} = p, event, attrs \\ %{}) do
%{
p
| events: p.events ++ [%{type: event, attrs: Enum.into(attrs, %{})}]
}
Expand All @@ -511,8 +512,8 @@ defmodule Slipstream.Connection.Pipeline do
This value will be given to the GenServer callback that invoked
"""
@spec put_return(t(), term()) :: t()
def put_return(p, return) do
%__MODULE__{p | return: return}
def put_return(%__MODULE__{} = p, return) do
%{p | return: return}
end

def push_message(p, message) do
Expand Down
12 changes: 6 additions & 6 deletions lib/slipstream/connection/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ defmodule Slipstream.Connection.State do

Refs are simply strings of incrementing integers.
"""
def next_ref(state) do
def next_ref(%__MODULE__{} = state) do
ref = state.current_ref + 1

{to_string(ref),
%__MODULE__{state | current_ref: ref, current_ref_str: to_string(ref)}}
%{state | current_ref: ref, current_ref_str: to_string(ref)}}
end

# coveralls-ignore-start
def next_heartbeat_ref(state) do
def next_heartbeat_ref(%__MODULE__{} = state) do
{ref, state} = next_ref(state)

%__MODULE__{state | heartbeat_ref: ref}
%{state | heartbeat_ref: ref}
end

# coveralls-ignore-stop
Expand All @@ -85,8 +85,8 @@ defmodule Slipstream.Connection.State do
in state is nil, that means we have not received a reply to our heartbeat
request and that the server is potentially stuck or otherwise not responding.
"""
def reset_heartbeat(state) do
%__MODULE__{state | heartbeat_ref: nil}
def reset_heartbeat(%__MODULE__{} = state) do
%{state | heartbeat_ref: nil}
end

@doc """
Expand Down
19 changes: 11 additions & 8 deletions lib/slipstream/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -255,25 +255,28 @@ defmodule Slipstream.Socket do
@spec apply_event(t(), struct()) :: t()
def apply_event(socket, event)

def apply_event(socket, %Events.ChannelConnected{} = event) do
def apply_event(%__MODULE__{} = socket, %Events.ChannelConnected{} = event) do
socket = TelemetryHelper.conclude_connect(socket, event)

%__MODULE__{
%{
socket
| channel_pid: event.pid,
channel_config: event.config || socket.channel_config,
reconnect_counter: 0
}
end

def apply_event(socket, %Events.TopicJoinSucceeded{topic: topic} = event) do
def apply_event(
%__MODULE__{} = socket,
%Events.TopicJoinSucceeded{topic: topic} = event
) do
socket
|> TelemetryHelper.conclude_join(event)
|> put_in([Access.key(:joins), topic, Access.key(:status)], :joined)
|> put_in([Access.key(:joins), topic, Access.key(:rejoin_counter)], 0)
end

def apply_event(socket, %event{topic: topic})
def apply_event(%__MODULE__{} = socket, %event{topic: topic})
when event in [
Events.TopicLeft,
Events.TopicJoinFailed,
Expand All @@ -282,13 +285,13 @@ defmodule Slipstream.Socket do
put_in(socket, [Access.key(:joins), topic, Access.key(:status)], :closed)
end

def apply_event(socket, %Events.ChannelClosed{}) do
%__MODULE__{
def apply_event(%__MODULE__{} = socket, %Events.ChannelClosed{}) do
%{
socket
| channel_pid: nil,
joins:
Enum.into(socket.joins, %{}, fn {topic, join} ->
{topic, %Join{join | status: :closed}}
Enum.into(socket.joins, %{}, fn {topic, %Join{} = join} ->
{topic, %{join | status: :closed}}
end)
}
end
Expand Down
11 changes: 7 additions & 4 deletions lib/slipstream/telemetry_helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ defmodule Slipstream.TelemetryHelper do
"""
@doc since: "0.4.0"
@spec conclude_connect(Socket.t(), Events.ChannelConnected.t()) :: Socket.t()
def conclude_connect(%{metadata: %{connect: start_metadata}} = socket, event)
def conclude_connect(
%Socket{metadata: %{connect: start_metadata}} = socket,
event
)
when is_map(start_metadata) and map_size(start_metadata) > 0 do
metadata =
start_metadata
Expand All @@ -67,7 +70,7 @@ defmodule Slipstream.TelemetryHelper do
metadata
)

%Socket{socket | metadata: Map.delete(socket.metadata, :connect)}
%{socket | metadata: Map.delete(socket.metadata, :connect)}
end

# technically speaking this case doesn't make any sense... you need to connect
Expand Down Expand Up @@ -186,11 +189,11 @@ defmodule Slipstream.TelemetryHelper do
return_value
end

defp clean_socket(socket) do
defp clean_socket(%Slipstream.Socket{} = socket) do
# Clear metadata from the socket. The socket contains the metadata and
# the metadata contains the socket so if we repeatedly "begin" operations
# like connects or joins, we cause sharp memory growth in the client and
# connection processes.
%Slipstream.Socket{socket | metadata: %{}}
%{socket | metadata: %{}}
end
end
Loading