diff --git a/lib/datastar/dispatcher.rb b/lib/datastar/dispatcher.rb index 63031a3..98e4600 100644 --- a/lib/datastar/dispatcher.rb +++ b/lib/datastar/dispatcher.rb @@ -285,7 +285,7 @@ def stream_one(streamer) proc do |socket| generator = ServerSentEventGenerator.new(socket, signals:, view_context: @view_context) @on_connect.each { |callable| callable.call(generator) } - handling_errors(generator, socket) do + handling_sync_errors(generator, socket) do streamer.call(generator) end ensure @@ -313,9 +313,10 @@ def stream_many(streamer) @on_connect.each { |callable| callable.call(conn_generator) } threads = @streamers.map do |streamer| + duped_signals = signs.dup.freeze @executor.spawn do # TODO: Review thread-safe view context - generator = ServerSentEventGenerator.new(@queue, signals: signs, view_context: @view_context) + generator = ServerSentEventGenerator.new(@queue, signals: duped_signals, view_context: @view_context) streamer.call(generator) @queue << :done rescue StandardError => e @@ -323,7 +324,12 @@ def stream_many(streamer) end end - handling_errors(conn_generator, socket) do + # Now launch the control thread that actually writes to the socket + # We don't want to block the main thread, so that servers like Puma + # which have a limited thread pool can keep serving other requests + # Other streamers will push any StandardError exceptions to the queue + # So we handle them here + @executor.spawn do done_count = 0 threads_size = @heartbeat_on ? threads.size - 1 : threads.size @@ -332,24 +338,46 @@ def stream_many(streamer) done_count += 1 @queue << nil if done_count == threads_size elsif data.is_a?(Exception) - raise data + handle_streaming_error(data, socket) + @queue << nil else - socket << data + # Here we attempt writing to the actual socket + # which may raise an IOError if the client disconnected + begin + socket << data + rescue Exception => e + handle_streaming_error(e, socket) + @queue << nil + end end end + + ensure + @on_server_disconnect.each { |callable| callable.call(conn_generator) } + @executor.stop(threads) if threads + socket.close end - ensure - @executor.stop(threads) if threads - socket.close end end - # Run a streaming block while handling errors + # Handle errors caught during streaming + # @param error [Exception] the error that occurred + # @param socket [IO] the socket to pass to error handlers + def handle_streaming_error(error, socket) + case error + when IOError, Errno::EPIPE, Errno::ECONNRESET + @on_client_disconnect.each { |callable| callable.call(socket) } + when Exception + @on_error.each { |callable| callable.call(error) } + end + end + + # Run a block while handling errors # @param generator [ServerSentEventGenerator] # @param socket [IO] # @yield # @api private - def handling_errors(generator, socket, &) + def handling_sync_errors(generator, socket, &) yield @on_server_disconnect.each { |callable| callable.call(generator) } diff --git a/lib/datastar/server_sent_event_generator.rb b/lib/datastar/server_sent_event_generator.rb index e116e1a..f8c6de7 100644 --- a/lib/datastar/server_sent_event_generator.rb +++ b/lib/datastar/server_sent_event_generator.rb @@ -24,6 +24,9 @@ class ServerSentEventGenerator attr_reader :signals + # @param stream [IO, Queue] The IO stream or Queue to write to + # @option signals [Hash] A hash of signals (params) + # @option view_context [Object] The view context for rendering elements, if applicable. def initialize(stream, signals:, view_context: nil) @stream = stream @signals = signals diff --git a/spec/dispatcher_spec.rb b/spec/dispatcher_spec.rb index ba4e2d6..8c3b36b 100644 --- a/spec/dispatcher_spec.rb +++ b/spec/dispatcher_spec.rb @@ -2,20 +2,35 @@ class TestSocket attr_reader :lines, :open - def initialize + + def initialize(open: true) @lines = [] - @open = true + @open = open + @finish = Thread::Queue.new end def <<(line) + raise Errno::EPIPE, 'Socket closed' unless @open + @lines << line end - def close = @open = false + def close + @open = false + @finish << true + end def split_lines @lines.join.split("\n") end + + # Streams run in threads + # we can call this to signal the end of the stream + # in tests + def wait_for_close(&) + @finish.pop + yield if block_given? + end end RSpec.describe Datastar::Dispatcher do @@ -407,6 +422,8 @@ def self.render_in(view_context) = %(
\n#{view_context}\ndata: elements hello\ndata: elements
\n\n") @@ -448,8 +465,8 @@ def self.render_in(view_context) = %(
\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}