Skip to content
Open
12 changes: 12 additions & 0 deletions spec/std/channel_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ describe Channel do
Channel.receive_first(Channel(Int32).new, channel).should eq 1
end

it "raises when receive_first timeout exceeded" do
expect_raises Channel::TimeoutError do
Channel.receive_first(Channel(Int32).new, Channel(Int32).new, timeout: 1.nanosecond)
end
end

it "does send_first" do
ch1 = Channel(Int32).new(1)
ch2 = Channel(Int32).new(1)
Expand All @@ -67,6 +73,12 @@ describe Channel do
ch2.receive.should eq 2
end

it "raises when send_first timeout exceeded" do
expect_raises Channel::TimeoutError do
Channel.send_first(1, Channel(Int32).new, Channel(Int32).new, timeout: 1.nanosecond)
end
end

it "does not raise or change its status when it is closed more than once" do
ch = Channel(Int32).new
ch.closed?.should be_false
Expand Down
70 changes: 61 additions & 9 deletions src/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@ class Channel(T)
# :nodoc:
record UseDefault

class ClosedError < Exception
class Error < Exception
end

class ClosedError < Error
def initialize(msg = "Channel is closed")
super(msg)
end
end

class TimeoutError < Error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: I'm wondering if a dedicated Channel::TimeoutError makes much sense. Perhaps a generic timeout error would be better? It could be reused for other kinds of timeouts. I suppose it usually shouldn't matter much whether a timeout occurred in a channel or somewhere else...
There's already IO::Timeout, but it probably makes sense to be a separate type because it should be in the IO::Error hierarchy. I don't think Channel::Error is equivalently relevant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started out just using IO::TimeoutError but I convinced myself it wasn't appropriate to use an IO related error for a Channel timeout.

If adding a Channel::Error superclass to the existing Channel::ClosedError and new Channel::TimeoutError has little utility, I'll go ahead and remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed Channel::Error, but left Channel::TimeoutError for now pending whether or not there should be a generic and reusable TimeoutError in the stdlib.

end

private module SenderReceiverCloseAction
def close
self.state = DeliveryState::Closed
Expand Down Expand Up @@ -294,21 +300,67 @@ class Channel(T)
pp.text inspect
end

def self.receive_first(*channels)
receive_first channels
# Returns the first available value received from the given *channels*, or
# raises `Channel::TimeoutError` if given a *timeout* that expires before a
# value is received.
#
# ```
# c1 = Channel(String).new(1)
# c2 = Channel(String).new(1)
#
# c2.send "hello"
# value = Channel.receive_first c1, c2 # => receives "hello" from c2
#
# begin
# # will timeout after 1 second and raise Channel::TimeoutError because
# # no channels are ready to receive
# value = Channel.receive_first c1, c2, timeout: 1.second
# rescue ex : Channel::TimeoutError
# Log.error(exception: ex)
# end
# ```
def self.receive_first(*channels, timeout : Time::Span? = nil)
receive_first channels, timeout: timeout
end

def self.receive_first(channels : Enumerable(Channel))
_, value = self.select(channels.map(&.receive_select_action))
# :ditto:
def self.receive_first(channels : Enumerable(Channel), *, timeout : Time::Span? = nil)
actions = channels.map(&.receive_select_action)
actions = actions.to_a + [TimeoutAction.new(timeout)] unless timeout.nil?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: ary + [x] should at least be rewritten as ary << x to avoid the intermediate array.
But perhaps we could also avoid unnecessary heap allocations entirely?

actions.to_a would always allocate an array, even when channels is a tuple, and currently would not allocate at all.
For reference, Channel.select_impl goes extra lengths with custom implementations for different collection types in order to avoid heap allocations if at all possible.

This is only for the case when using a timeout, so it doesn't look like this introduces a performance regression for existing code. But it could still mean receive_first with timeout is significantly less performant than without. I haven't benchmarked this, just based on intuition (which may be wrong).

Suggested change
actions = actions.to_a + [TimeoutAction.new(timeout)] unless timeout.nil?
actions = actions.to_a << TimeoutAction.new(timeout) unless timeout.nil?

ditto for .send_first

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I try actions = actions.to_a << TimeoutAction.new(timeout) unless timeout.nil? the compiler gives this error:

In src\channel.cr:330:45

 330 | actions = actions.to_a << TimeoutAction.new(timeout) unless timeout.nil?
                                               ^--
Error: expected argument #1 to 'Array(Channel::StrictReceiveAction(Int32))#<<' to be Channel::StrictReceiveAction(Int32), not Channel::TimeoutAction

Overloads are:
 - Array(T)#<<(value : T)

So I ended up with the actions = actions.to_a + [TimeoutAction.new(timeout)] approach as it was the only way I could figure out how to make the compiler happy with the types.

I would love if there was a better approach than concatenating these two temporary arrays to make the compiler happy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of array concatenation, I tried splatting method arguments into the call to self.select and that works and seems like a better approach? And I assume splatting method arguments avoids heap allocation, but @straight-shoota please confirm?

index, value = self.select(*actions, TimeoutAction.new(timeout))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splatting only works for Tuple. So this code wouldn't compile if actions was, for example, Array or StaticArray.
Seems like we're missing a test case for that. 🙈

We can use splatting as an optimization for Tuple, but we would need other branches for other enumerables as well, just like in .select_impl.

For the array append to work, we must type the array as Array(SelectAction) (which is fine because .select expects that item type). An easy way to do that is to explicitly cast the output type of the map block: channels.map(&.receive_select_action.as(SelectAction)). This should result in an appropriate collection type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some additional tests to cover Array and StaticArray, and yep as you said compilation failed:

In src\channel.cr:360:34

 360 | index, value = self.select(*actions, TimeoutAction.new(timeout))
                                  ^
Error: argument to splat must be a tuple, not Array(Channel::StrictReceiveAction(Int32))

I'm sorry (for being such a noob) but I cannot for the life of me figure out how to make the compiler happy trying to use << to append a TimeoutAction. I tried the following:

In src\channel.cr:364:41

 364 | actions = actions.to_a.as(Array(SelectAction)) << timeout_action
                                       ^-----------
Error: can't use Channel::SelectAction(S) as generic type argument yet, use a more specific type

...

In src\channel.cr:364:55

 364 | actions = actions.to_a(&.as(SelectAction)) << timeout_action
                                                     ^-------------
Error: expected argument #1 to 'Array(Channel::StrictReceiveAction(Int32))#<<' to be Channel::StrictReceiveAction(Int32), not Channel::TimeoutAction

...

In src\channel.cr:365:59

 365 | actions = actions.to_a.map(&.as(SelectAction)) << timeout_action
                                                         ^-------------
Error: expected argument #1 to 'Array(Channel::StrictReceiveAction(Int32))#<<' to be Channel::StrictReceiveAction(Int32), not Channel::TimeoutAction

But have ended up back at the actions.to_a + [TimeoutAction.new(timeout)] as the only way I could get it to compile.

index, value = self.select(actions)
raise TimeoutError.new unless timeout.nil? || index < (actions.size - 1)
value
end

def self.send_first(value, *channels) : Nil
send_first value, channels
# Sends the given *value* to the first channel ready to receive in *channels*,
# or raises `Channel::TimeoutError` if given a *timeout* that expires before
# a channel becomes ready to receive.
#
# ```
# c1 = Channel(String).new(1)
# c2 = Channel(String).new(1)
#
# c1.send "hello"
# value = Channel.send_first "goodbye", c1, c2 # => sends "goodbye" to c2
#
# begin
# # will timeout after 1 second and raise Channel::TimeoutError because
# # no channels are ready to receive
# value = Channel.send_first "ciao", c1, c2, timeout: 1.second
# rescue ex : Channel::TimeoutError
# Log.error(exception: ex)
# end
# ```
def self.send_first(value, *channels, timeout : Time::Span? = nil) : Nil
send_first value, channels, timeout: timeout
end

def self.send_first(value, channels : Enumerable(Channel)) : Nil
self.select(channels.map(&.send_select_action(value)))
# :ditto:
def self.send_first(value, channels : Enumerable(Channel), *, timeout : Time::Span? = nil) : Nil
actions = channels.map(&.send_select_action(value))
actions = actions.to_a + [TimeoutAction.new(timeout)] unless timeout.nil?
index, _ = self.select(actions)
raise TimeoutError.new unless timeout.nil? || index < (actions.size - 1)
nil
end

Expand Down
Loading