6
6
#include < boost/thread.hpp>
7
7
#include < boost/noncopyable.hpp>
8
8
#include < boost/coroutine/all.hpp>
9
+ #include < boost/smart_ptr/make_local_shared.hpp>
9
10
#include < algorithm>
10
11
#include < atomic>
11
12
#include < deque>
@@ -82,6 +83,9 @@ typedef boost::coroutines::coroutine<void>::push_type push_coro_t;
82
83
#define AWAIT_HINT (HINT, EXPR ) AWAIT_HINT_MX(MUX, HINT, EXPR)
83
84
84
85
// coroutine async sleep()
86
+ #define MX_SLEEP (TIME ) Multiplexer::sleep(TIME);
87
+
88
+ // deprecated
85
89
#define SLEEP (TIME ) Multiplexer::sleep(TIME);
86
90
87
91
#define MX_EPSILON 0.00001
@@ -157,7 +161,7 @@ class Multiplexer:
157
161
if (!m_Buffered || m_Units.size () < m_Buffered) {
158
162
auto cbt = Task<T ()>(std::move (cb));
159
163
auto fut = cbt.get_future ();
160
- auto cbc = std::make_shared <Task<T ()>>(std::move (cbt));
164
+ auto cbc = boost::make_local_shared <Task<T ()>>(std::move (cbt));
161
165
m_Units.emplace_back (cond, [cbc]() {
162
166
(*cbc)();
163
167
});
@@ -178,7 +182,7 @@ class Multiplexer:
178
182
if (!m_Buffered || m_Units.size () < m_Buffered) {
179
183
auto cbt = Task<T ()>(std::move (cb));
180
184
auto fut = cbt.get_future ();
181
- auto cbc = std::make_shared <Task<T ()>>(std::move (cbt));
185
+ auto cbc = boost::make_local_shared <Task<T ()>>(std::move (cbt));
182
186
m_Units.emplace_back (
183
187
std::function<bool ()>(),
184
188
std::function<void ()>()
@@ -222,15 +226,15 @@ class Multiplexer:
222
226
// std::shared_ptr<Channel<T>> channel(
223
227
// std::function<void(std::shared_ptr<Channel<T>>)> worker
224
228
// ) {
225
- // auto chan = std::make_shared <Channel<>>();
229
+ // auto chan = boost::make_local_shared <Channel<>>();
226
230
// // ... inside lambda if(chan->closed()) remove?
227
231
// }
228
232
229
233
// TODO: handle multi-direction channels that may block
230
234
231
235
template <class R , class T >
232
236
std::future<R> when (std::future<T>& fut, std::function<R(std::future<T>&)> cb) {
233
- auto futc = std::make_shared <std::future<T>>(std::move (fut));
237
+ auto futc = boost::make_local_shared <std::future<T>>(std::move (fut));
234
238
235
239
return task<void >([cb, futc]() {
236
240
if (futc->wait_for (std::chrono::seconds (0 )) ==
@@ -439,6 +443,7 @@ class Multiplexer:
439
443
m_Circuits.emplace_back (make_tuple (
440
444
kit::make_unique<Circuit>(this , i), CacheLinePadding ()
441
445
));
446
+ // m_MultiCircuit = kit::make_unique<Circuit>(this, i);
442
447
}
443
448
// void join() {
444
449
// for(auto& s: m_Circuits)
@@ -501,8 +506,24 @@ class Multiplexer:
501
506
std::get<0 >(s)->finish_nojoin ();
502
507
for (auto & s: m_Circuits)
503
508
std::get<0 >(s)->join ();
509
+ // for(auto& s: m_MultiCircuit)
510
+ // std::get<0>(s)->finish_nojoin();
511
+ // for(auto& s: m_MultiCircuit)
512
+ // std::get<0>(s)->join();
504
513
}
505
514
515
+ // template <class Time>
516
+ // static bool retry(int count, Time delay, std::function<bool()> func)
517
+ // {
518
+ // for(int i=0; i<count || count<0; count<0 || ++i)
519
+ // {
520
+ // if(func())
521
+ // return true;
522
+ // SLEEP(delay);
523
+ // }
524
+ // return false;
525
+ // }
526
+
506
527
private:
507
528
508
529
struct CacheLinePadding
@@ -512,6 +533,7 @@ class Multiplexer:
512
533
513
534
const unsigned m_Concurrency;
514
535
std::vector<std::tuple<std::unique_ptr<Circuit>, CacheLinePadding>> m_Circuits;
536
+ // std::unique_ptr<Circuit> m_MultiCircuit;
515
537
516
538
// read-write mutex might be more optimal here
517
539
kit::mutex_wrap<std::map<boost::thread::id, unsigned >> m_ThreadToCircuit;
0 commit comments