-
Notifications
You must be signed in to change notification settings - Fork 1
/
promise_stream.tcc
392 lines (353 loc) · 12 KB
/
promise_stream.tcc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
#define promise_stream_tcc
#include <exception>
#include <stdexcept>
#include <thread>
#include "shared_functor.h"
#include "promise_stream.h"
namespace kaiu {
/*** PromiseStreamState ***/
/* Resolve / reject the stream */
template <typename Result, typename Datum>
void PromiseStreamState<Result, Datum>::resolve(Result result)
{
auto lock = get_lock();
do_resolve(lock, std::move(result));
}
template <typename Result, typename Datum>
void PromiseStreamState<Result, Datum>::reject(std::exception_ptr error)
{
auto lock = get_lock();
do_reject(lock, error, false);
}
template <typename Result, typename Datum>
void PromiseStreamState<Result, Datum>::reject(const std::string& error)
{
auto lock = get_lock();
do_reject(lock, make_exception_ptr(std::runtime_error(error)), false);
}
template <typename Result, typename Datum>
template <typename... Args>
void PromiseStreamState<Result, Datum>::write(Args&&... args)
{
auto lock = get_lock();
if (get_action(lock) == StreamAction::Continue) {
emplace_data(lock, std::forward<Args>(args)...);
set_stream_has_been_written_to(lock);
}
process_data(lock);
}
template <typename Result, typename Datum>
void PromiseStreamState<Result, Datum>::set_data_callback(DataFunc data_callback)
{
auto lock = get_lock();
#if defined(SAFE_PROMISE_STREAMS)
if (on_data != nullptr) {
throw std::logic_error("Callbacks are already assigned");
}
if (data_callback == nullptr) {
throw std::logic_error("Attempted to bind null callback");
}
#endif
on_data = data_callback;
set_data_callback_assigned(lock);
process_data(lock);
}
template <typename Result, typename Datum>
Promise<Result> PromiseStreamState<Result, Datum>::
do_stream(stream_consumer consumer)
{
auto data = [consumer] (ensure_locked, Datum datum) {
try {
return consumer(std::move(datum));
} catch (...) {
return promise::rejected<StreamAction>(std::current_exception());
}
};
set_data_callback(data);
return proxy_promise;
}
template <typename Result, typename Datum>
template <typename State, typename... Args>
Promise<std::pair<State, Result>> PromiseStreamState<Result, Datum>::
do_stateful_stream(stateful_stream_consumer<State> consumer, Args&&... args)
{
auto state = std::make_shared<State>(std::forward<Args>(args)...);
auto consumer_proxy = [this, consumer, state] (Datum datum) {
return consumer(*state, std::move(datum));
};
auto next_proxy = [this, state] (Result result) {
return std::make_pair(std::move(*state), std::move(result));
};
return do_stream(consumer_proxy)
->then(next_proxy);
}
template <typename Result, typename Datum>
Promise<Result> PromiseStreamState<Result, Datum>::always(StreamAction action)
{
auto consumer = [action] (Datum datum) {
return promise::resolved(action);
};
return do_stream(consumer);
}
template <typename Result, typename Datum>
Promise<Result> PromiseStreamState<Result, Datum>::discard()
{
return always(StreamAction::Discard);
}
template <typename Result, typename Datum>
Promise<Result> PromiseStreamState<Result, Datum>::stop()
{
return always(StreamAction::Stop);
}
template <typename Result, typename Datum>
void PromiseStreamState<Result, Datum>::forward_to(PromiseStream<Result, Datum> next)
{
this
->stream<void>([next] (Datum datum) {
next->write(std::move(datum));
return next->data_action();
})
->forward_to(next);
}
template <typename Result, typename Datum>
void PromiseStreamState<Result, Datum>::forward_to(Promise<Result> next)
{
this
->discard()
->forward_to(next);
}
template <typename Result, typename Datum>
void PromiseStreamState<Result, Datum>::call_data_callback(ensure_locked lock, Datum datum) try {
/*
* Asynchronous on_data callbacks will obviously not trigger the catch block
* if they throw. Synchronous on_data callbacks also will not, as the
* promise's "handler" eats the exception.
*
* The callback passed via stream(...) methods is wrapped in a try/catch
* block so in theory, the catch block in this function should never be
* triggered by exceptions from user code, with the possible exception of
* the move constructor for type Datum.
*/
/* Set that consumer is running */
set_consumer_is_running(lock, true);
/*
* We need to be able to detect whether the callbacks are run synchronously
* or not. If run asynchronously, they could be in the same thread as the
* caller, or a different one. Hence we need to combine two variables:
* * "current thread id"
* * the async_check boolean
*
* async_check detects asynchronous calling when the callbacks run in the
* calling thread, but may not always work if the callbacks are called in
* another thread. "current thread id" fixes that case.
*/
auto async_check = std::make_shared<std::atomic<bool>>(false);
const auto caller_id = std::this_thread::get_id();
const auto is_async = [=] {
return *async_check || std::this_thread::get_id() != caller_id;
};
using lock_type = typename std::decay<decltype(lock)>::type;
/* Run consumer */
on_data(lock, std::move_if_noexcept(datum))
->then(
[this, is_async, lck=&lock] (const StreamAction action) {
/* Get lock if we're running asynchronously */
const bool async = is_async();
lock_type new_lock;
if (async) {
new_lock = get_lock();
}
lock_type& lock = async ? new_lock : *lck;
/* Lock acquired */
set_action(lock, action);
set_consumer_is_running(lock, false);
/* Will release the lock */
process_data(lock);
},
[this, is_async, lck=&lock] (std::exception_ptr error) {
/* Get lock if we're running asynchronously */
const bool async = is_async();
lock_type new_lock;
if (async) {
new_lock = get_lock();
}
lock_type& lock = async ? new_lock : *lck;
/* Lock acquired */
do_reject(lock, error, true);
set_consumer_is_running(lock, false);
});
*async_check = true;
} catch (...) {
set_consumer_is_running(lock, false);
/* Rethrowing is implicit since this is a function-try-catch block */
throw;
}
template <typename Result, typename Datum>
void PromiseStreamState<Result, Datum>::process_data(ensure_locked lock)
{
Datum datum;
if (take_data(lock, datum)) {
call_data_callback(lock, std::move(datum));
}
}
template <typename Result, typename Datum>
bool PromiseStreamState<Result, Datum>::take_data(ensure_locked lock, Datum& out)
{
auto state = get_state(lock);
/* Wrong state */
if (state != stream_state::streaming1 && state != stream_state::streaming2) {
return false;
}
/* No consumer assigned yet */
if (on_data == nullptr) {
return false;
}
/* Don't run the consumer multiple time simultaneously */
if (get_consumer_is_running(lock)) {
return false;
}
/*
* If consumer has finished, empty the buffer (and ignore subsequent
* writes, see "write" method)
*/
if (get_action(lock) != StreamAction::Continue) {
decltype(buffer)().swap(buffer);
}
if (buffer.empty()) {
/*
* When removing the last data from the buffer, this call must occur
* AFTER set_consumer_is_running(lock, true) in order for state
* transitions to work correctly. Since we do not call
* set_buffer_is_empty on removing the last data from the buffer,
* but instead we call it after the last data has been processed,
* this is not a problem for us.
*/
set_buffer_is_empty(lock);
return false;
}
out = std::move(buffer.front());
buffer.pop();
/* Return success */
return true;
}
template <typename Result, typename Datum>
template <typename... Args>
void PromiseStreamState<Result, Datum>::emplace_data(ensure_locked, Args&&... args)
{
buffer.emplace(std::forward<Args>(args)...);
}
template <typename Result, typename Datum>
bool PromiseStreamState<Result, Datum>::has_data(ensure_locked) const
{
return buffer.size() > 0;
}
template <typename Result, typename Datum>
void PromiseStreamState<Result, Datum>::do_resolve(ensure_locked lock, Result result)
{
set_stream_result(lock, stream_result::resolved, resolve_completer(std::move(result)));
}
template <typename Result, typename Datum>
void PromiseStreamState<Result, Datum>::do_reject(ensure_locked lock, std::exception_ptr error, bool consumer_failed)
{
set_action(lock, StreamAction::Stop);
set_stream_result(lock, consumer_failed ? stream_result::consumer_failed : stream_result::rejected, reject_completer(error));
}
template <typename Result, typename Datum>
auto PromiseStreamState<Result, Datum>::resolve_completer(Result result)
-> completer_func
{
auto functor = [this, result = std::move(result)] (ensure_locked lock) mutable {
proxy_promise->resolve(std::move(result));
};
return detail::make_shared_functor(std::move(functor));
}
template <typename Result, typename Datum>
auto PromiseStreamState<Result, Datum>::reject_completer(std::exception_ptr error)
-> completer_func
{
return [this, error] (ensure_locked) {
proxy_promise->reject(error);
};
}
/* Stateless consumer returning promise */
template <typename Result, typename Datum>
template <typename, typename Consumer>
auto PromiseStreamState<Result, Datum>::stream(Consumer consumer)
-> stream_sel<Consumer, result_of_promise_is, StreamAction, Result, Datum>
{
return do_stream(consumer);
}
/* Stateless consumer returning action */
template <typename Result, typename Datum>
template <typename, typename Consumer>
auto PromiseStreamState<Result, Datum>::stream(Consumer consumer)
-> stream_sel<Consumer, result_of_not_promise_is, StreamAction, Result, Datum>
{
auto data_proxy = [consumer] (Datum datum) {
return promise::resolved<StreamAction>(consumer(std::move(datum)));
};
return do_stream(data_proxy);
}
/* Stateless consumer returning void */
template <typename Result, typename Datum>
template <typename, typename Consumer>
auto PromiseStreamState<Result, Datum>::stream(Consumer consumer)
-> stream_sel<Consumer, result_of_not_promise_is, void, Result, Datum>
{
auto data_proxy = [consumer] (Datum datum) {
consumer(std::move(datum));
return promise::resolved(StreamAction::Continue);
};
return do_stream(data_proxy);
}
/* Stateful consumer returning promise */
template <typename Result, typename Datum>
template <typename State, typename Consumer, typename... Args>
auto PromiseStreamState<Result, Datum>::stream(Consumer consumer, Args&&... args)
-> stream_sel<Consumer, result_of_promise_is, StreamAction, std::pair<State, Result>, State&, Datum>
{
return do_stateful_stream<State, Args...>(consumer, std::forward<Args>(args)...);
}
/* Stateful consumer returning action */
template <typename Result, typename Datum>
template <typename State, typename Consumer, typename... Args>
auto PromiseStreamState<Result, Datum>::stream(Consumer consumer, Args&&... args)
-> stream_sel<Consumer, result_of_not_promise_is, StreamAction, std::pair<State, Result>, State&, Datum>
{
auto data_proxy = [consumer] (State& state, Datum datum) {
return promise::resolved<StreamAction>(consumer(state, std::move(datum)));
};
return do_stateful_stream<State, Args...>(data_proxy, std::forward<Args>(args)...);
}
/* Stateful consumer returning void */
template <typename Result, typename Datum>
template <typename State, typename Consumer, typename... Args>
auto PromiseStreamState<Result, Datum>::stream(Consumer consumer, Args&&... args)
-> stream_sel<Consumer, result_of_not_promise_is, void, std::pair<State, Result>, State&, Datum>
{
auto data_proxy = [consumer] (State& state, Datum datum) {
consumer(state, std::move(datum));
return promise::resolved(StreamAction::Continue);
};
return do_stateful_stream<State, Args...>(data_proxy, std::forward<Args>(args)...);
}
/*** PromiseStream ***/
/* Access stream */
template <typename Result, typename Datum>
auto PromiseStream<Result, Datum>::operator ->() const -> PromiseStreamState<Result, Datum> *
{
return stream.get();
}
/* Constructor for sharing state with another promise */
template <typename Result, typename Datum>
PromiseStream<Result, Datum>::PromiseStream(std::shared_ptr<PromiseStreamState<Result, Datum>> const state) :
stream(state)
{
}
/* Constructor for creating a new state */
template <typename Result, typename Datum>
PromiseStream<Result, Datum>::PromiseStream() :
stream(std::make_shared<PromiseStreamState<Result, Datum>>())
{
}
}