diff --git a/src/lavinmq/bool_channel.cr b/src/lavinmq/bool_channel.cr index 7796a48fe8..dda203f698 100644 --- a/src/lavinmq/bool_channel.cr +++ b/src/lavinmq/bool_channel.cr @@ -1,46 +1,69 @@ # A class which has two channels, one for which a state is true # and the other for when the state if false class BoolChannel - getter when_true = Channel(Nil).new - getter when_false = Channel(Nil).new - @value = Channel(Bool).new + class StateChannel < Channel(Nil) + class EndlessQueue < Deque(Nil) + property? empty : Bool = true + + def shift(&) + yield if empty? + end + + def push(value : T) + raise "Can't push to an EndlessQueue" + end + end + + @queue = EndlessQueue.new + + def activate + @lock.sync do + @queue.as(EndlessQueue).empty = false + while receiver_ptr = dequeue_receiver + receiver_ptr.value.data = nil + receiver_ptr.value.state = DeliveryState::Delivered + receiver_ptr.value.fiber.enqueue + end + end + end + + def deactivate + @lock.sync do + @queue.as(EndlessQueue).empty = true + end + end + end + + getter when_true = StateChannel.new + getter when_false = StateChannel.new + @value : Atomic(Bool) def initialize(value : Bool) - spawn(name: "BoolChannel#send_loop") do - send_loop(value) + @value = Atomic.new(value) + if value + @when_true.activate + else + @when_false.activate end end - def set(value : Bool) - @value.send value - rescue ::Channel::ClosedError - Log.debug { "BoolChannel closed, could not set value" } + def value + @value.get end - def value : Bool - select - when when_true.receive - true - when when_false.receive - false + def set(value : Bool) + return if @value.swap(value) == value + if value + @when_false.deactivate + @when_true.activate + else + @when_true.deactivate + @when_false.activate end end def close @when_true.close @when_false.close - @value.close - end - - private def send_loop(value) - loop do - channel = value ? @when_true : @when_false - select - when value = @value.receive - when channel.send nil - Fiber.yield # Improves fairness by allowing the receiving fiber to run instead of notifying all waiting fibers - end - end - rescue ::Channel::ClosedError end end