Skip to content

Commit

Permalink
Almost working
Browse files Browse the repository at this point in the history
  • Loading branch information
cor3ntin committed Oct 28, 2019
1 parent 0a79e31 commit 78d1fbb
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 126 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[submodule "cmcstl2"]
path = cmcstl2
url = https://github.com/CaseyCarter/cmcstl2
[submodule "cppcoro"]
path = cppcoro
url = https://github.com/lewissbaker/cppcoro.git
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ file(GLOB_RECURSE FILES
)

add_executable(corio ${FILES})
target_include_directories(corio PUBLIC ${PROJECT_SOURCE_DIR}/include ${PROJECT_SOURCE_DIR}/cmcstl2/include)
target_include_directories(corio PUBLIC ${PROJECT_SOURCE_DIR}/include ${PROJECT_SOURCE_DIR}/cmcstl2/include ${PROJECT_SOURCE_DIR}/cppcoro/include)
target_compile_options(corio PUBLIC -std=c++2a -Xclang -fconcepts-ts -stdlib=libc++ -lc++experimental)
target_link_options(corio PUBLIC -stdlib=libc++ -lc++experimental -pthread)
1 change: 1 addition & 0 deletions cppcoro
Submodule cppcoro added at 1d5a4d
184 changes: 80 additions & 104 deletions include/corio/await_sender.hpp
Original file line number Diff line number Diff line change
@@ -1,136 +1,112 @@
#pragma once
#include <corio/concepts.hpp>
#include <cppcoro/task.hpp>
#include <experimental/coroutine>
#include <variant>
#include <iostream>

namespace cor3ntin::corio {

struct operation_cancelled : std::exception {
virtual const char* what() const noexcept {
return "operation cancelled";
}
};

template <typename Sender>
struct sender_awaiter {
private:
using value_type = int; //sender_values_t<Sender, detail::identity_or_void_t>;
using coro_handle = std::experimental::coroutine_handle<>;
// using is_always_blocking = property_query<From, is_always_blocking<>>;
struct internal_receiver {
sender_awaiter* this_;

std::add_pointer_t<Sender> sender_{};
enum class state { empty, value, exception };
// using receiver_category = receiver_tag;

coro_handle continuation_{};
state state_ = state::empty;

union {
value_type value_{};
std::exception_ptr exception_;
};
template <class U>
void set_value(U&& value) noexcept(std::is_nothrow_constructible_v<value_type, U>) {
this_->m_data.template emplace<1>(std::forward<U>(value));
this_->m_continuation.resume();
}

//using is_always_blocking = property_query<From, is_always_blocking<>>;
void set_value() noexcept {
this_->m_data.template emplace<1>();
this_->m_continuation.resume();
}

struct internal_receiver {
sender_awaiter* this_;
template <typename Error>
void set_error(Error&& error) noexcept {
if constexpr(std::is_same<Error, std::exception_ptr>::value) {
this_->m_data.template emplace<2>(std::move(error));
} else {
this_->m_data.template emplace<2>(std::make_exception_ptr(std::move(error)));
}
// if (!is_always_blocking::value)
this_->m_continuation.resume();
}

//using receiver_category = receiver_tag;
void set_done() noexcept {
this_->m_data.template emplace<0>(std::monostate{});
// if (!is_always_blocking::value)
this_->m_continuation.resume();
}
};

template <class U>
requires cor3ntin::corio::concepts::convertible_to<U, value_type>
void set_value(U&& value)
noexcept(std::is_nothrow_constructible<value_type, U>::value) {
this_->value_.construct(static_cast<U&&>(value));
this_->state_ = state::value;
}

template <class V = value_type>
requires std::is_void_v<V>
void set_value() noexcept {
this_->value_.construct();
this_->state_ = state::value;
}
using value_type = int;
using coro_handle = std::experimental::coroutine_handle<>;

void set_done() noexcept {
//if (!is_always_blocking::value)
this_->continuation_.resume();
}
coro_handle m_continuation{};
using operation_type = decltype(
corio::execution::connect(std::declval<Sender>(), std::declval<internal_receiver>()));
operation_type m_op;
std::variant<std::monostate, value_type, std::exception_ptr> m_data;

template<typename Error>
void set_error(Error error) noexcept {
assert(this_->state_ != state::value);
if constexpr(std::is_same<Error, std::exception_ptr>::value){
this_->exception_.construct(std::move(error));
} else {
this_->exception_.construct(std::make_exception_ptr(std::move(error)));
}
this_->state_ = state::exception;
//if (!is_always_blocking::value)
this_->continuation_.resume();
}
};

public:
sender_awaiter() {}
sender_awaiter(Sender&& sender) noexcept
: sender_(std::addressof(sender))
{}
sender_awaiter(sender_awaiter &&that)
noexcept(std::is_nothrow_move_constructible<value_type>::value ||
std::is_void<value_type>::value)
: sender_(std::exchange(that.sender_, nullptr))
, continuation_{std::exchange(that.continuation_, {})}
, state_(that.state_) {
if (that.state_ == state::value) {
if constexpr(!std::is_void<value_type>::value) {
id(value_).construct(std::move(that.value_).get());
}
else {
that.value_.destruct();
that.state_ = state::empty;
}
} else if (that.state_ == state::exception) {
exception_.construct(std::move(that.exception_).get());
that.exception_.destruct();
that.state_ = state::empty;
sender_awaiter(Sender sender) noexcept
: m_op(corio::execution::connect(std::move(sender), internal_receiver{this})) {
printf("CTR\n");
}
sender_awaiter(sender_awaiter&& that) = default;
~sender_awaiter() {
printf("DTR\n");
}
}

~sender_awaiter() {
if (state_ == state::value) {
value_.destruct();
} else if (state_ == state::exception) {
exception_.destruct();

static constexpr bool await_ready() noexcept {
return true;
}
}

static constexpr bool await_ready() noexcept {
return false;
}
// TODO HANDLE BLOCKING
void await_suspend(coro_handle continuation) noexcept {
printf("await_suspend\n");
m_continuation = continuation;
m_op.start();

// Add detection and handling of blocking completion here, and
// return 'false' from await_suspend() in that case rather than
// potentially recursively resuming the awaiting coroutine which
// could eventually lead to a stack-overflow.
using await_suspend_result_t =
std::conditional_t<true, bool, void>;

await_suspend_result_t await_suspend(coro_handle continuation) noexcept {
continuation_ = continuation;
//pushmi::submit(static_cast<From&&>(*sender_), internal_receiver{this});
return await_suspend_result_t(); // return false or void
}
return;
}

decltype(auto) await_resume() {
if (state_ == state::exception) {
std::rethrow_exception(std::move(exception_).get());
} else if (state_ == state::empty) {
//throw operation_cancelled{};
} else {
return std::move(value_).get();
decltype(auto) await_resume() {
printf("await_resume\n");
switch(m_data.index()) {
case 0: throw operation_cancelled{}; break;
case 1: return std::get<1>(m_data);
case 2: std::rethrow_exception(std::move(std::get<2>(m_data))); break;
}
return std::get<1>(m_data);
}
}
};

template<typename From>
sender_awaiter(From&&) -> sender_awaiter<From>;
template <typename From>
sender_awaiter(From)->sender_awaiter<From>;

/*
template <sender S>
sender_awaiter<S> operator co_await(S&& sender) {
return static_cast<S&&>(from);
template <cor3ntin::corio::execution::sender S>
auto operator co_await(S&& sender) {
return cor3ntin::corio::sender_awaiter(std::forward<S>(sender));
}
*/

} // namespace cor3ntin::corio


/*namespace awaitable_senders {
// Any TypedSender that inherits from `sender` or `sender_traits` is
Expand Down
62 changes: 47 additions & 15 deletions include/corio/concepts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,27 @@ namespace execution {
} set_error;


namespace __connect_ns {
struct __connect_base {};
} // namespace __connect_ns

inline constexpr struct __connect_fn : __connect_ns::__connect_base {
template <typename Sender, typename Receiver>
requires cor3ntin::corio::tag_invocable<__connect_fn, Sender, Receiver> auto
operator()(Sender&& s, Receiver&& r) const {
return cor3ntin::corio::tag_invoke(*this, std::forward<Sender>(s), (Receiver &&) r);
}

template <typename Sender, typename Receiver>
requires requires(Sender&& s, Receiver&& r) {
std::forward<Sender>(s).connect((Receiver &&) r);
}
friend auto tag_invoke(__connect_fn, Sender&& s, Receiver&& r) {
return std::forward<Sender>(s).connect((Receiver &&) r);
}
} connect;


template <typename T, typename E = std::exception_ptr>
concept receiver = concepts::move_constructible<std::remove_cvref_t<T>>&&
details::nothrow_move_or_copy_constructible<std::remove_cvref_t<T>>&& requires(T&& t,
Expand All @@ -102,13 +123,35 @@ namespace execution {
execution::set_value((T &&) t, (Val &&) val...);
};

namespace details {
template <typename T>
concept sealed_object = !concepts::move_constructible<std::remove_cvref_t<T>> &&
!concepts::copy_constructible<std::remove_cvref_t<T>>;
}


/*template <typename T>
concept operation = requires(T& t) {
{ t.start(); }
noexcept;
};
template <typename Op>
concept operation = details::sealed_object<Op> && requires(Op& op) {
//execution::start(op)->void;
}
*/


namespace details {
template <class S, class R>
concept sender_to_impl = true; /*requires(S&& s, R&& r) {
execution::submit((S &&) s, (R &&) r);
};*/
} // namespace details
concept sender_to_impl = requires(S&& s, R&& r) {
execution::connect(std::forward<S>(s), (R &&) r);
/*{ execution::spawn((S &&) s, (R &&) r) }
->void;*/
};
} // namespace details
template <typename S>
concept sender = concepts::move_constructible<std::remove_cvref_t<S>>&&
details::sender_to_impl<S, sink_receiver>;
Expand All @@ -122,17 +165,6 @@ namespace execution {

} // namespace cor3ntin::corio

// test

/*struct stupid_executor {
template <typename Receiver>
requires cor3ntin::corio::execution::receiver<Receiver>
void submit(Receiver&& r) const {
return r.set_value();
}
};*/


namespace {

using namespace cor3ntin::corio;
Expand Down
6 changes: 3 additions & 3 deletions include/corio/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ class static_thread_pool {
static_thread_pool& m_pool;

public:
template <execution::receiver R>
auto connect(R&& r) && {
template <typename R>
auto connect(R&& r) && noexcept {
return depleted_operation{std::move(*this), std::forward<R>(r)};
}

Expand Down Expand Up @@ -245,8 +245,8 @@ class static_thread_pool {
while(true) {
if(m_stopped)
return;
m_condition.wait(lock);

m_condition.wait(lock);

while(m_head) {
operation_base& op = *m_head;
Expand Down
24 changes: 21 additions & 3 deletions src/corio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct stupid_receiver {
void set_done() noexcept {}
};


/*
bool operator==(const stupid_executor&, const stupid_executor&) {
return true;
Expand All @@ -22,15 +23,32 @@ bool operator!=(const stupid_executor&, const stupid_executor&) {
};
**/


template <typename scheduler>
cppcoro::task<> run_in_pool(scheduler my_scheduler) {
printf("Coro: %ul\n", std::this_thread::get_id());
cor3ntin::corio::sender_awaiter a{my_scheduler.schedule()};
auto x = co_await a;
printf("Coro: %ul\n", std::this_thread::get_id());
}

int main() {
printf("Main: %ul\n", std::this_thread::get_id());
using namespace cor3ntin::corio::execution;

std::mutex m;
// std::mutex m;

static_thread_pool p(20);
for(auto i = 0; i < 1000000; i++) {
static_assert(cor3ntin::corio::execution::sender<decltype(p.scheduler().schedule())>);


run_in_pool(p.scheduler());


for(auto i = 0; i < 10; i++) {
auto sender = p.scheduler().schedule();
std::move(sender).spawn(as_receiver{[&m] { printf("%ul\n", std::this_thread::get_id()); }});
std::move(sender).spawn(
as_receiver{[] { printf("Receiver: %ul\n", std::this_thread::get_id()); }});
}

wait(p.depleted());
Expand Down

0 comments on commit 78d1fbb

Please sign in to comment.