Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 51 additions & 28 deletions src/lavinmq/bool_channel.cr
Original file line number Diff line number Diff line change
@@ -1,46 +1,69 @@
# A class which has two channels, one for which a state is true
# and the other for when the state if false
class BoolChannel
getter when_true = Channel(Nil).new
getter when_false = Channel(Nil).new
@value = Channel(Bool).new
class StateChannel < Channel(Nil)
class EndlessQueue < Deque(Nil)
property? empty : Bool = true

def shift(&)
yield if empty?
end

def push(value : T)
raise "Can't push to an EndlessQueue"
end
end

@queue = EndlessQueue.new

def activate
@lock.sync do
@queue.as(EndlessQueue).empty = false
while receiver_ptr = dequeue_receiver
receiver_ptr.value.data = nil
receiver_ptr.value.state = DeliveryState::Delivered
receiver_ptr.value.fiber.enqueue
end
end
end

def deactivate
@lock.sync do
@queue.as(EndlessQueue).empty = true
end
end
end

getter when_true = StateChannel.new
getter when_false = StateChannel.new
@value : Atomic(Bool)

def initialize(value : Bool)
spawn(name: "BoolChannel#send_loop") do
send_loop(value)
@value = Atomic.new(value)
if value
@when_true.activate
else
@when_false.activate
end
end

def set(value : Bool)
@value.send value
rescue ::Channel::ClosedError
Log.debug { "BoolChannel closed, could not set value" }
def value
@value.get
end

def value : Bool
select
when when_true.receive
true
when when_false.receive
false
def set(value : Bool)
return if @value.swap(value) == value
if value
@when_false.deactivate
@when_true.activate
else
@when_true.deactivate
@when_false.activate
end
end

def close
@when_true.close
@when_false.close
@value.close
end

private def send_loop(value)
loop do
channel = value ? @when_true : @when_false
select
when value = @value.receive
when channel.send nil
Fiber.yield # Improves fairness by allowing the receiving fiber to run instead of notifying all waiting fibers
end
end
rescue ::Channel::ClosedError
end
end
Loading