99import Dispatch
1010import CAtomics
1111
12- private let hptrOffset = 0
13- private let tptrOffset = MemoryLayout< AtomicMutableRawPointer> . stride
14- private let fptrOffset = MemoryLayout< AtomicMutableRawPointer> . stride*2
15-
1612open class PostBox < Value> : EventStream < Value >
1713{
1814 private typealias Node = BufferNode < Event < Value > >
1915
20- private let s = UnsafeMutableRawPointer . allocate ( byteCount: MemoryLayout< AtomicMutableRawPointer> . stride*3,
21- alignment: MemoryLayout< AtomicMutableRawPointer> . alignment)
22- private var hptr : UnsafeMutablePointer < AtomicMutableRawPointer > {
23- return ( s+ hptrOffset) . assumingMemoryBound ( to: AtomicMutableRawPointer . self)
24- }
25- private var head : Node {
26- get { return Node ( storage: CAtomicsLoad ( hptr, . relaxed) ) }
27- set { CAtomicsStore ( hptr, newValue. storage, . relaxed) }
16+ private let s = UnsafeMutableRawPointer . allocate ( byteCount: MemoryLayout< PostBoxState> . size,
17+ alignment: MemoryLayout< PostBoxState> . alignment)
18+ private var head : UnsafeMutablePointer < AtomicMutableRawPointer > {
19+ return ( s+ headOffset) . assumingMemoryBound ( to: AtomicMutableRawPointer . self)
2820 }
29- private var tptr : UnsafeMutablePointer < AtomicMutableRawPointer > {
30- return ( s+ tptrOffset ) . assumingMemoryBound ( to: AtomicMutableRawPointer . self)
21+ private var tail : UnsafeMutablePointer < AtomicMutableRawPointer > {
22+ return ( s+ tailOffset ) . assumingMemoryBound ( to: AtomicMutableRawPointer . self)
3123 }
32- private var fptr : UnsafeMutablePointer < AtomicOptionalMutableRawPointer > {
33- return ( s+ fptrOffset ) . assumingMemoryBound ( to: AtomicOptionalMutableRawPointer . self)
24+ private var last : UnsafeMutablePointer < AtomicOptionalMutableRawPointer > {
25+ return ( s+ lastOffset ) . assumingMemoryBound ( to: AtomicOptionalMutableRawPointer . self)
3426 }
3527
3628 override init ( validated: ValidatedQueue )
@@ -39,47 +31,46 @@ open class PostBox<Value>: EventStream<Value>
3931
4032 // set up an initial dummy node
4133 let node = Node . dummy
42- ( s+ hptrOffset ) . bindMemory ( to: AtomicMutableRawPointer . self, capacity: 2 )
43- CAtomicsInitialize ( hptr , node. storage)
44- CAtomicsInitialize ( tptr , node. storage)
45- ( s+ fptrOffset ) . bindMemory ( to: AtomicOptionalMutableRawPointer . self, capacity: 1 )
46- CAtomicsInitialize ( fptr , nil )
34+ ( s+ headOffset ) . bindMemory ( to: AtomicMutableRawPointer . self, capacity: 2 )
35+ CAtomicsInitialize ( head , node. storage)
36+ CAtomicsInitialize ( tail , node. storage)
37+ ( s+ lastOffset ) . bindMemory ( to: AtomicOptionalMutableRawPointer . self, capacity: 1 )
38+ CAtomicsInitialize ( last , nil )
4739 }
4840
4941 deinit {
5042 // empty the queue
51- let head = self . head
52- var next = head. next
43+ let head = Node ( storage : CAtomicsLoad ( self . head, . relaxed ) )
44+ var next = Node ( storage : CAtomicsLoad ( head. next, . relaxed ) )
5345 while let node = next
5446 {
55- next = node. next
47+ next = Node ( storage : CAtomicsLoad ( node. next, . relaxed ) )
5648 node. deinitialize ( )
5749 node. deallocate ( )
5850 }
5951
6052 s. deallocate ( )
6153 }
6254
63- final public var isEmpty : Bool { return CAtomicsLoad ( hptr , . relaxed) == CAtomicsLoad ( tptr , . relaxed) }
55+ final public var isEmpty : Bool { return CAtomicsLoad ( head , . relaxed) == CAtomicsLoad ( tail , . relaxed) }
6456
6557 final public func post( _ event: Event < Value > )
6658 {
67- guard completed == false , CAtomicsLoad ( fptr , . relaxed) == nil else { return }
59+ guard completed == false , CAtomicsLoad ( last , . relaxed) == nil else { return }
6860
6961 let node = Node ( initializedWith: event)
7062 if event. isError
7163 {
72- guard CAtomicsCompareAndExchange ( fptr , nil , node. storage, . strong, . relaxed) else { return }
64+ guard CAtomicsCompareAndExchange ( last , nil , node. storage, . strong, . relaxed) else { return }
7365 }
7466
7567 // events posted "simultaneously" synchronize with each other here
76- let previousTailPointer = CAtomicsExchange ( tptr, node. storage, . acqrel)
77- let previousTail = Node ( storage: previousTailPointer)
68+ let previousTail = Node ( storage: CAtomicsExchange ( tail, node. storage, . acqrel) )
7869
7970 // publish the new node to processing loop here
80- CAtomicsStore ( previousTail. nptr , node. storage, . release)
71+ CAtomicsStore ( previousTail. next , node. storage, . release)
8172
82- if previousTailPointer == CAtomicsLoad ( hptr , . relaxed)
73+ if previousTail . storage == CAtomicsLoad ( head , . relaxed)
8374 { // the queue had been empty or blocked
8475 // resume processing enqueued events
8576 queue. async ( execute: self . processNext)
@@ -110,17 +101,21 @@ open class PostBox<Value>: EventStream<Value>
110101 }
111102#endif
112103
104+ let requested = self . requested
105+ if requested <= 0 && CAtomicsLoad ( last, . relaxed) == nil { return }
106+
113107 // try to dequeue the next event
114- let oldHead = head
115- let next = CAtomicsLoad ( oldHead . nptr , . acquire)
108+ let head = Node ( storage : CAtomicsLoad ( self . head, . acquire ) )
109+ let next = CAtomicsLoad ( head . next , . acquire)
116110
117- if requested <= 0 && CAtomicsLoad ( fptr , . relaxed) != next { return }
111+ if requested <= 0 && CAtomicsLoad ( last , . relaxed) != next { return }
118112
119- if let next = Node ( storage : next)
113+ if let next = next
120114 {
121- let event = next. move ( )
122- head = next
123- oldHead. deallocate ( )
115+ let node = Node ( storage: next)
116+ let event = node. move ( )
117+ CAtomicsStore ( self . head, next, . release)
118+ head. deallocate ( )
124119
125120 dispatch ( event)
126121 queue. async ( execute: self . processNext)
@@ -140,6 +135,16 @@ open class PostBox<Value>: EventStream<Value>
140135 }
141136}
142137
138+ private struct PostBoxState
139+ {
140+ var head : AtomicMutableRawPointer
141+ var tail : AtomicMutableRawPointer
142+ var last : AtomicOptionalMutableRawPointer
143+ }
144+ private let headOffset = MemoryLayout . offset ( of: \PostBoxState . head) !
145+ private let tailOffset = MemoryLayout . offset ( of: \PostBoxState . tail) !
146+ private let lastOffset = MemoryLayout . offset ( of: \PostBoxState . last) !
147+
143148private let nextOffset = 0
144149private let dataOffset = ( MemoryLayout < AtomicOptionalMutableRawPointer > . stride + 15 ) & ~ 15
145150
@@ -160,10 +165,10 @@ private struct BufferNode<Element>: Equatable
160165
161166 private init ( )
162167 {
163- let size = dataOffset + MemoryLayout< Element> . stride
168+ let size = dataOffset + MemoryLayout< Element> . size
164169 storage = UnsafeMutableRawPointer . allocate ( byteCount: size, alignment: 16 )
165170 ( storage+ nextOffset) . bindMemory ( to: AtomicOptionalMutableRawPointer . self, capacity: 1 )
166- CAtomicsInitialize ( nptr , nil )
171+ CAtomicsInitialize ( next , nil )
167172 ( storage+ dataOffset) . bindMemory ( to: Element . self, capacity: 1 )
168173 }
169174
@@ -180,14 +185,8 @@ private struct BufferNode<Element>: Equatable
180185 storage. deallocate ( )
181186 }
182187
183- var nptr : UnsafeMutablePointer < AtomicOptionalMutableRawPointer > {
184- get {
185- return ( storage+ nextOffset) . assumingMemoryBound ( to: AtomicOptionalMutableRawPointer . self)
186- }
187- }
188-
189- var next : BufferNode ? {
190- get { return BufferNode ( storage: CAtomicsLoad ( nptr, . acquire) ) }
188+ var next : UnsafeMutablePointer < AtomicOptionalMutableRawPointer > {
189+ return ( storage+ nextOffset) . assumingMemoryBound ( to: AtomicOptionalMutableRawPointer . self)
191190 }
192191
193192 private var data : UnsafeMutablePointer < Element > {
0 commit comments