Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions lib/datastar/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def stream_one(streamer)
proc do |socket|
generator = ServerSentEventGenerator.new(socket, signals:, view_context: @view_context)
@on_connect.each { |callable| callable.call(generator) }
handling_errors(generator, socket) do
handling_sync_errors(generator, socket) do
streamer.call(generator)
end
ensure
Expand Down Expand Up @@ -313,17 +313,23 @@ def stream_many(streamer)
@on_connect.each { |callable| callable.call(conn_generator) }

threads = @streamers.map do |streamer|
duped_signals = signs.dup.freeze
@executor.spawn do
# TODO: Review thread-safe view context
generator = ServerSentEventGenerator.new(@queue, signals: signs, view_context: @view_context)
generator = ServerSentEventGenerator.new(@queue, signals: duped_signals, view_context: @view_context)
streamer.call(generator)
@queue << :done
rescue StandardError => e
@queue << e
end
end

handling_errors(conn_generator, socket) do
# Now launch the control thread that actually writes to the socket
# We don't want to block the main thread, so that servers like Puma
# which have a limited thread pool can keep serving other requests
# Other streamers will push any StandardError exceptions to the queue
# So we handle them here
@executor.spawn do
done_count = 0
threads_size = @heartbeat_on ? threads.size - 1 : threads.size

Expand All @@ -332,24 +338,46 @@ def stream_many(streamer)
done_count += 1
@queue << nil if done_count == threads_size
elsif data.is_a?(Exception)
raise data
handle_streaming_error(data, socket)
@queue << nil
else
socket << data
# Here we attempt writing to the actual socket
# which may raise an IOError if the client disconnected
begin
socket << data
rescue Exception => e
handle_streaming_error(e, socket)
@queue << nil
end
end
end

ensure
@on_server_disconnect.each { |callable| callable.call(conn_generator) }
@executor.stop(threads) if threads
socket.close
end
ensure
@executor.stop(threads) if threads
socket.close
end
end

# Run a streaming block while handling errors
# Handle errors caught during streaming
# @param error [Exception] the error that occurred
# @param socket [IO] the socket to pass to error handlers
def handle_streaming_error(error, socket)
case error
when IOError, Errno::EPIPE, Errno::ECONNRESET
@on_client_disconnect.each { |callable| callable.call(socket) }
when Exception
@on_error.each { |callable| callable.call(error) }
end
end

# Run a block while handling errors
# @param generator [ServerSentEventGenerator]
# @param socket [IO]
# @yield
# @api private
def handling_errors(generator, socket, &)
def handling_sync_errors(generator, socket, &)
yield

@on_server_disconnect.each { |callable| callable.call(generator) }
Expand Down
3 changes: 3 additions & 0 deletions lib/datastar/server_sent_event_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class ServerSentEventGenerator

attr_reader :signals

# @param stream [IO, Queue] The IO stream or Queue to write to
# @option signals [Hash] A hash of signals (params)
# @option view_context [Object] The view context for rendering elements, if applicable.
def initialize(stream, signals:, view_context: nil)
@stream = stream
@signals = signals
Expand Down
44 changes: 33 additions & 11 deletions spec/dispatcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,35 @@

class TestSocket
attr_reader :lines, :open
def initialize

def initialize(open: true)
@lines = []
@open = true
@open = open
@finish = Thread::Queue.new
end

def <<(line)
raise Errno::EPIPE, 'Socket closed' unless @open

@lines << line
end

def close = @open = false
def close
@open = false
@finish << true
end

def split_lines
@lines.join.split("\n")
end

# Streams run in threads
# we can call this to signal the end of the stream
# in tests
def wait_for_close(&)
@finish.pop
yield if block_given?
end
end

RSpec.describe Datastar::Dispatcher do
Expand Down Expand Up @@ -407,6 +422,8 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
end

dispatcher.response.body.call(socket)

socket.wait_for_close
expect(socket.open).to be(false)
expect(socket.lines.size).to eq(2)
expect(socket.lines[0]).to eq("event: datastar-patch-elements\ndata: elements <div id=\"foo\">\ndata: elements <span>hello</span>\ndata: elements </div>\n\n")
Expand Down Expand Up @@ -448,15 +465,17 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
block_called = false
dispatcher.on_client_disconnect { |conn| connected = false }

socket = TestSocket.new
allow(socket).to receive(:<<).with("\n").and_raise(Errno::EPIPE, 'Socket closed')
socket = TestSocket.new(open: false)
# allow(socket).to receive(:<<).with("\n").and_raise(Errno::EPIPE, 'Socket closed')

dispatcher.stream do |sse|
sleep 10
block_called = true
end

dispatcher.response.body.call(socket)
socket.wait_for_close

expect(connected).to be(false)
expect(block_called).to be(false)
end
Expand All @@ -467,8 +486,7 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
block_called = false
dispatcher.on_client_disconnect { |conn| connected = false }

socket = TestSocket.new
allow(socket).to receive(:<<).with("\n").and_raise(Errno::EPIPE, 'Socket closed')
socket = TestSocket.new(open: false)

dispatcher.stream do |sse|
sleep 0.001
Expand Down Expand Up @@ -496,6 +514,7 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
end
socket = TestSocket.new
dispatcher.response.body.call(socket)
socket.wait_for_close

expect(signals['user']['name']).to eq('joe')
end
Expand All @@ -520,10 +539,10 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
dispatcher.stream do |sse|
sse.patch_signals(foo: 'bar')
end
socket = TestSocket.new
allow(socket).to receive(:<<).and_raise(Errno::EPIPE, 'Socket closed')
socket = TestSocket.new(open: false)

dispatcher.response.body.call(socket)
socket.wait_for_close
expect(events).to eq([true, false])
end

Expand All @@ -536,10 +555,10 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
dispatcher.stream do |sse|
sse.check_connection!
end
socket = TestSocket.new
allow(socket).to receive(:<<).with("\n").and_raise(Errno::EPIPE, 'Socket closed')
socket = TestSocket.new(open: false)

dispatcher.response.body.call(socket)
socket.wait_for_close
expect(events).to eq([true, false])
end

Expand All @@ -555,6 +574,7 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
socket = TestSocket.new

dispatcher.response.body.call(socket)
socket.wait_for_close
expect(events).to eq([true, false])
end

Expand All @@ -570,6 +590,7 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
allow(socket).to receive(:<<).and_raise(ArgumentError, 'Invalid argument')

dispatcher.response.body.call(socket)
socket.wait_for_close
expect(errors.first).to be_a(ArgumentError)
expect(Datastar.config.logger).to have_received(:error).with(/ArgumentError \(Invalid argument\):/)
end
Expand All @@ -584,6 +605,7 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
sse.patch_signals(foo: 'bar')
end
dispatcher.response.body.call(socket)
socket.wait_for_close
expect(errs.first).to be_a(ArgumentError)
end
end
Expand Down
2 changes: 2 additions & 0 deletions spec/support/dispatcher_examples.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module DispatcherExamples

socket = TestSocket.new
dispatcher.response.body.call(socket)
socket.wait_for_close
expect(socket.open).to be(false)
expect(socket.lines.size).to eq(2)
expect(socket.lines[0]).to eq("event: datastar-patch-signals\ndata: signals {\"foo\":\"bar\"}\n\n")
Expand All @@ -45,6 +46,7 @@ module DispatcherExamples

socket = TestSocket.new
dispatcher.response.body.call(socket)
socket.wait_for_close
expect(errs.first).to be_a(ArgumentError)
Thread.report_on_exception = true
end
Expand Down