diff --git a/stlab/concurrency/actor.hpp b/stlab/concurrency/actor.hpp new file mode 100644 index 00000000..69ffc836 --- /dev/null +++ b/stlab/concurrency/actor.hpp @@ -0,0 +1,386 @@ +/* + Copyright 2023 Adobe + Distributed under the Boost Software License, Version 1.0. + (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +*/ + +/**************************************************************************************************/ + +#pragma once +#ifndef STLAB_CONCURRENCY_ACTOR_HPP +#define STLAB_CONCURRENCY_ACTOR_HPP + +// stdc++ +#include +#include + +// stlab +#include +#include +#include +#include + +// lambda pack captures are a C++20 feature, but both clang and msvc support it, so we should be +// okay to use it even under C++17. +#if __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wc++20-extensions" +#endif // __clang__ + +//------------------------------------------------------------------------------------------------------------------------------------------ + +namespace stlab { +inline namespace v1 { + +//------------------------------------------------------------------------------------------------------------------------------------------ + +/// @brief Opaque type representing an identifier unique to an actor. +/// @hyde-owner fbrereto +enum class actor_id : std::intptr_t; + +/// @brief Get the `actor_id` of the currently running actor. +/// @hyde-owner fbrereto +actor_id this_actor_id(); + +//------------------------------------------------------------------------------------------------------------------------------------------ + +/// @brief Support class to get details about the currently running actor. +/// @hyde-owner fbrereto +struct this_actor { + /// @brief Get the `actor_id` of the currently running actor. + /// @return The appropriate `actor_id`, or `0` if this is called outside the context of an actor. + static actor_id get_id(); + + /// @brief Get a reference to the currently running actor. + /// @throw `std::runtime_error` if this is called outside the context of an actor. + template + static decltype(auto) get(); +}; + +//------------------------------------------------------------------------------------------------------------------------------------------ + +namespace detail { + +//------------------------------------------------------------------------------------------------------------------------------------------ + +struct temp_thread_name { + explicit temp_thread_name(const char* name) { +#if STLAB_THREADS(WIN32) + // Nothing +#elif STLAB_THREADS(PTHREAD_EMSCRIPTEN) + // Nothing +#elif STLAB_THREADS(PTHREAD_APPLE) || STLAB_THREADS(PTHREAD) + pthread_getname_np(pthread_self(), _old_name, _old_name_size_k); +#elif STLAB_THREADS(NONE) + // Nothing +#else +#error "Unspecified or unknown thread mode set." +#endif + + stlab::set_current_thread_name(name); + } + + ~temp_thread_name() { stlab::set_current_thread_name(_old_name); } + +private: + constexpr static std::size_t _old_name_size_k = 64; + char _old_name[_old_name_size_k] = {0}; +}; + +//------------------------------------------------------------------------------------------------------------------------------------------ + +template +struct value_instance { + std::optional _x; +}; + +template <> +struct value_instance {}; + +//------------------------------------------------------------------------------------------------------------------------------------------ + +struct actor_instance_base { + virtual ~actor_instance_base() = default; + + actor_id id() const { + return static_cast(reinterpret_cast>(this)); + } +}; + +//------------------------------------------------------------------------------------------------------------------------------------------ + +inline actor_instance_base*& this_actor_accessor() { + thread_local actor_instance_base* this_actor{nullptr}; // `thread_local` implies `static` + return this_actor; +} + +struct temp_this_actor { + // Actors can "nest" if the inner one is running on an immediate executor. + temp_this_actor(actor_instance_base* actor) : _old_actor(this_actor_accessor()) { + assert(actor); + this_actor_accessor() = actor; + } + + ~temp_this_actor() { this_actor_accessor() = _old_actor; } + +private: + actor_instance_base* _old_actor{nullptr}; +}; + +//------------------------------------------------------------------------------------------------------------------------------------------ + +template +struct actor_instance : public actor_instance_base, + std::enable_shared_from_this> { + /// Value which is owned by the instance. If `T` is `void`, nothing will be instantiated. + using value_type = T; + + template + actor_instance(Executor e, std::string name) : _q(std::move(e)), _name(std::move(name)) {} + + template + void initialize(Args&&... args) { + if constexpr (std::is_same_v) { + _hold = stlab::make_ready_future(executor()); + } else { + // We want to construct the object instance in the executor where it will be running. We + // cannot initialize in the constructor because `shared_from_this` will throw + // `bad_weak_ptr`. + // + // Unfortunately we cannot use schedule() here as it would dereference the `std::optional` + // when it doesn't contain a value, and that's UB... + _hold = stlab::async( + executor(), + [_this = this->shared_from_this()](auto&&... args) { + _this->_instance._x = T(std::forward(args)...); + }, std::forward(args)...); + } + } + + auto set_name(std::string&& name) { _name = std::move(name); } + + template + auto entask(F&& f) { + return [_f = std::forward(f), _this = this->shared_from_this() +#ifndef NDEBUG + , _id = this->id() +#endif // NDEBUG + ](auto&&... args) { + // tasks that are "entasked" for an actor must be executed on that same actor, or + // Bad Things could happen, namely, a data race between this task and any other + // task(s) the original actor may run simultaneously. + // + // If you find yourself here, you have created a task intending it for one actor, + // but have accidentally tried to execute it on another (including no actor). + assert(_id == stlab::this_actor::get_id()); + + if constexpr (std::is_same_v) { + return _f(std::forward(args)...); + } else { + return _f(*(_this->_instance._x), + std::forward(args)...); + } + }; + } + + template + auto operator()(F&& f) { + auto future = _hold.then(executor(), entask(std::forward(f))); + using result_type = typename decltype(future)::result_type; + + // ERROR: These assignments to `_hold` are not threadsafe, are they? + if constexpr (std::is_same_v) { + _hold = future; + } else { + _hold = future.then([](auto){}); + } + + return future; + } + + template + auto enqueue(F&& f) { + (void)(*this)(std::forward(f)); + } + + void complete() { + stlab::await(_hold); + } + + auto executor() { + return [_this = this->shared_from_this()](auto&& task) { + _this->_q.executor()([_t = std::forward(task), _this = _this]() mutable { + temp_this_actor tta(_this.get()); + temp_thread_name ttn(_this->_name.c_str()); + std::move(_t)(); + }); + }; + } + + value_instance _instance; + stlab::serial_queue_t _q; + std::string _name; + stlab::future _hold; +}; + +//------------------------------------------------------------------------------------------------------------------------------------------ + +} // namespace detail + +//------------------------------------------------------------------------------------------------------------------------------------------ + +/// @hyde-owner fbrereto +/// @brief Serialized, asynchronous access to a resource +template +class actor { + using instance = detail::actor_instance; + std::shared_ptr _impl; + + /// friend declaration to the free function. + template + friend actor this_actor_instance(); + +public: + /// Value type for the class. `actor` will own this instance. If the `T` is `void`, nothing + /// will be instantiated. + using value_type = typename instance::value_type; + + actor() = default; + + /// @param e The executor where actor lambdas will be scheduled. + /// @param name The name of the executor. While the actor is running, the thread it is executing + /// on will be temporarily renamed to this value (if the OS supports it.) + /// @param args Additional arguments to be passed to the `value_type` of this instance during + /// its construction. + template + actor(Executor e, std::string name, Args&&... args) : + _impl(std::make_shared>(std::move(e), std::move(name))) { + _impl->initialize(std::forward(args)...); + } + + /// @brief Sets the name of the actor to something else. + /// @param name The incoming name to use from here on out. + auto set_name(std::string name) { _impl->set_name(std::move(name)); } + + /// @brief Schedule a task for the actor to execute. + /// @note This routine has identical semantics to `operator()`. + /// @param f The function to execute. Note that the first parameter to this function must be + /// `T&`, and will reference the instance owned by the actor. + template + auto schedule(F&& f) { + return (*_impl)(std::forward(f)); + } + + /// @brief Schedule a task for the actor to execute. + /// @note This routine has identical semantics to `schedule`. + /// @param f The function to execute. Note that the first parameter to this function must be + /// `T&`, and will reference the instance owned by the actor. + template + auto operator()(F&& f) { + return (*_impl)(std::forward(f)); + } + + /// @brief "Fire and forget" a task on the actor. + /// @param f The task to run. Note that the first parameter to this task must be `T&`, and + /// will reference the instance owned by the actor. + template + void enqueue(F&& f) { + _impl->enqueue(std::forward(f)); + } + + /// @brief Block until all scheduled tasks have completed. + void complete() { + _impl->complete(); + } + + /// @brief Get the unique `actor_id` of this actor. + auto get_id() const { return _impl->id(); } + + /// This is a nullary task executor, namely, you will not get a reference to the task + /// local data this actor owns. If you want access that data using this executor, wrap the task + /// via `actor::entask`. For example: + /// + /// ``` + /// auto f = async(actor.executor(), actor.entask([](auto& data){ /*...*/ })); + /// ``` + /// @brief Get a nullary task executor for this actor. + auto executor() { return _impl->executor(); } + + /// @brief Obtain a nullary lambda that can access the `actor`'s task local data. + /// @return a nullary lambda that, when invoked, will receive the `actor`'s task local data as + /// its first argument. + + template + auto entask(F&& f) { + return _impl->entask(std::forward(f)); + } + + friend bool operator==(const actor& x, const actor& y) { return x._impl == y._impl; } + friend bool operator!=(const actor& x, const actor& y) { return !(x == y); } +}; + +//------------------------------------------------------------------------------------------------------------------------------------------ + +/// In the event the routine is called outside the context of a running actor, this routine will +/// return the `actor_id` equivalent of `0`. +/// @brief Get the `actor_id` of the currently running actor. +/// @hyde-owner fbrereto +inline actor_id this_actor_id() { + detail::actor_instance_base* base = detail::this_actor_accessor(); + return base ? base->id() : actor_id{0}; +} + +/// Get the currently running actor. +/// @hyde-owner fbrereto +/// @throw `std::runtime_error` in the event the routine is called outside the context of a running actor. +template +actor this_actor_instance() { + detail::actor_instance_base* base = detail::this_actor_accessor(); + if (!base) { + throw std::runtime_error("Not in an actor"); + } + detail::actor_instance& instance = dynamic_cast&>(*base); + actor result; + result._impl = instance.shared_from_this(); + return result; +} + +/// @brief Determine if the caller is currently in an `actor` execution context. +/// @hyde-owner fbrereto +template +bool in_actor() { + // I could make this a little faster by imitating `this_actor` right up to the instance cast. + return this_actor_instance() != actor(); +} + +//------------------------------------------------------------------------------------------------------------------------------------------ + +/// @brief Get the `actor_id` of the currently running actor. +/// @hyde-owner fbrereto +/// @note This routine has identical semantics to `this_actor_id`. +inline actor_id this_actor::get_id() { return this_actor_id(); } + +/// @brief Get the currently running actor. +/// @hyde-owner fbrereto +/// @note This routine has identical semantics to `stlab::this_actor()`. +template +decltype(auto) this_actor::get() { + return stlab::this_actor_instance(); +} + +//------------------------------------------------------------------------------------------------------------------------------------------ + +} // namespace v1 +} // namespace stlab + +//------------------------------------------------------------------------------------------------------------------------------------------ + +#if __clang__ +#pragma clang diagnostic pop +#endif // __clang__ + +//------------------------------------------------------------------------------------------------------------------------------------------ + +#endif // STLAB_CONCURRENCY_ACTOR_HPP + +//------------------------------------------------------------------------------------------------------------------------------------------ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 704fd998..941230c9 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -172,12 +172,26 @@ add_test( COMMAND stlab.test.utility ) +################################################################################ + +add_executable( stlab.test.actor + actor_tests.cpp + main.cpp ) + +target_link_libraries( stlab.test.actor PUBLIC stlab::testing ) + +add_test( + NAME stlab.test.actor + COMMAND stlab.test.actor +) + ################################################################################ # # tests are compiled without compiler extensions to ensure the stlab headers # are not dependent upon any such extension. # set_target_properties( + stlab.test.actor stlab.test.channel stlab.test.future stlab.test.serial_queue @@ -202,6 +216,7 @@ ProcessorCount(nProcessors) if(nProcessors) set_tests_properties( + stlab.test.actor stlab.test.channel stlab.test.executor stlab.test.future diff --git a/test/actor_tests.cpp b/test/actor_tests.cpp new file mode 100644 index 00000000..7352dc42 --- /dev/null +++ b/test/actor_tests.cpp @@ -0,0 +1,277 @@ +/* + Copyright 2023 Adobe + Distributed under the Boost Software License, Version 1.0. + (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +*/ + +/**************************************************************************************************/ + +// identity +#include + +// boost +#include + +// stlab +#include +#include + +/**************************************************************************************************/ + +void increment_by(int& i, int amount) { i += amount; } + +void increment(int& i) { + increment_by(i, 1); +} + +template +T get_actor_value(stlab::actor& a) { + return stlab::await(a([](auto x) { + return x; + })); +} + +std::string current_test_name() { + return boost::unit_test::framework::current_test_case().full_name(); +} + +/**************************************************************************************************/ + +BOOST_AUTO_TEST_CASE(actor_construct_with_arguments) { + stlab::actor a(stlab::default_executor, current_test_name(), 42); + bool sent{false}; + stlab::future f = a([&](auto i) { + sent = true; + BOOST_REQUIRE(i == 42); + }); + + stlab::await(f); + + BOOST_REQUIRE(get_actor_value(a) == 42); + BOOST_REQUIRE(sent); +} + +/**************************************************************************************************/ + +BOOST_AUTO_TEST_CASE(actor_construct_void) { + stlab::actor a(stlab::default_executor, current_test_name()); + bool sent{false}; + + a.enqueue([&]() { sent = true; }); + + a.complete(); + + BOOST_REQUIRE(sent); +} + +/**************************************************************************************************/ + +BOOST_AUTO_TEST_CASE(actor_regularity) { + stlab::actor empty_ctor; + + stlab::actor default_ctor(stlab::default_executor, current_test_name()); // default construction + default_ctor.enqueue(increment); + BOOST_REQUIRE(get_actor_value(default_ctor) == 1); + + stlab::actor copy_ctor(default_ctor); // copy construction + copy_ctor.enqueue(increment); + BOOST_REQUIRE(get_actor_value(copy_ctor) == 2); + + stlab::actor move_ctor(std::move(default_ctor)); // move construction + move_ctor.enqueue(increment); + BOOST_REQUIRE(get_actor_value(move_ctor) == 3); + + stlab::actor copy_assign = copy_ctor; // copy assignment + copy_assign.enqueue(increment); + BOOST_REQUIRE(get_actor_value(copy_assign) == 4); + + stlab::actor move_assign = std::move(move_ctor); // move assignment + move_assign.enqueue(increment); + BOOST_REQUIRE(get_actor_value(move_assign) == 5); + + // equality comparable + stlab::actor a(stlab::default_executor, "a"); + stlab::actor b(stlab::default_executor, "b"); + BOOST_REQUIRE(a != b); // tests operator!= + BOOST_REQUIRE(!(a == b)); // tests operator== + + BOOST_REQUIRE(a == a); // tests operator== +} + +/**************************************************************************************************/ + +BOOST_AUTO_TEST_CASE(actor_regularity_void) { + std::size_t count{0}; + stlab::actor empty_ctor; + + stlab::actor default_ctor(stlab::default_executor, current_test_name()); // default construction + stlab::await(default_ctor([&] { ++count; })); + BOOST_REQUIRE(count == 1); + + stlab::actor copy_ctor(default_ctor); // copy construction + stlab::await(copy_ctor([&] { ++count; })); + BOOST_REQUIRE(count == 2); + + stlab::actor move_ctor(std::move(default_ctor)); // move construction + stlab::await(move_ctor([&] { ++count; })); + BOOST_REQUIRE(count == 3); + + stlab::actor copy_assign = move_ctor; // copy assignment + stlab::await(copy_assign([&] { ++count; })); + BOOST_REQUIRE(count == 4); + + stlab::actor move_assign = std::move(move_ctor); // move assignment + stlab::await(move_assign([&] { ++count; })); + BOOST_REQUIRE(count == 5); + + // equality comparable + stlab::actor a(stlab::default_executor, "a"); + stlab::actor b(stlab::default_executor, "b"); + BOOST_REQUIRE(a != b); // tests operator!= + BOOST_REQUIRE(!(a == b)); // tests operator== + + BOOST_REQUIRE(a == a); // tests operator== +} + +/**************************************************************************************************/ + +BOOST_AUTO_TEST_CASE(actor_schedule_to_void) { + { + stlab::actor a(stlab::default_executor, current_test_name()); + stlab::future f = a(increment); + + stlab::await(f); + + BOOST_REQUIRE(f.get_try()); + BOOST_REQUIRE(get_actor_value(a) == 1); + } + + { + stlab::actor a(stlab::default_executor, current_test_name()); + std::atomic_bool sent{false}; + stlab::future f = a([&] { sent = true; }); + + stlab::await(f); + + BOOST_REQUIRE(sent); + } +} + +/**************************************************************************************************/ + +BOOST_AUTO_TEST_CASE(actor_schedule_to_value) { + { + stlab::actor a(stlab::default_executor, current_test_name(), 42); + stlab::future f = a([](auto x) { return x; }); + + BOOST_REQUIRE(stlab::await(f) == 42); + } + + { + stlab::actor a(stlab::default_executor, current_test_name()); + stlab::future f = a([]() { return 42; }); + + BOOST_REQUIRE(stlab::await(f) == 42); + } +} + +/**************************************************************************************************/ + +BOOST_AUTO_TEST_CASE(actor_then_from_void) { + { + stlab::actor a(stlab::default_executor, current_test_name()); + stlab::future f = + a([](int& x) { x += 42; }).then(a.executor(), a.entask([](int x) { return x; })); + + BOOST_REQUIRE(stlab::await(f) == 42); + } + + { + stlab::actor a(stlab::default_executor, current_test_name()); + stlab::future f = a([]() { return 42; }) + .then(a.executor(), a.entask([](auto x) { return 4200 + x; })) + .then(a.executor(), a.entask([](auto x) { return x + 420000; })); + + BOOST_REQUIRE(stlab::await(f) == 424242); + } +} + +/**************************************************************************************************/ + +BOOST_AUTO_TEST_CASE(actor_then_from_value) { + stlab::actor a(stlab::default_executor, current_test_name(), 42); + stlab::future f = + a([](auto x) { return x; }).then(a.executor(), a.entask([](auto x, auto y) { + BOOST_REQUIRE(x == 42); + BOOST_REQUIRE(y == 42); + return x + y; + })); + + BOOST_REQUIRE(stlab::await(f) == 84); +} + +/**************************************************************************************************/ + +BOOST_AUTO_TEST_CASE(actor_enqueue) { + std::size_t count{0}; + stlab::actor a(stlab::default_executor, current_test_name()); + + a.enqueue([&](int){ + ++count; + }); + + a.enqueue([&](int){ + ++count; + return 42; // does nothing, really. + }); + + a.complete(); + + BOOST_REQUIRE(count == 2); +} + +/**************************************************************************************************/ + +BOOST_AUTO_TEST_CASE(actor_enqueue_void) { + std::size_t count{0}; + stlab::actor a(stlab::default_executor, current_test_name()); + + a.enqueue([&](){ + ++count; + }); + + a.enqueue([&](){ + ++count; + return 42; // does nothing, really. + }); + + a.complete(); + + BOOST_REQUIRE(count == 2); +} + +/**************************************************************************************************/ + +BOOST_AUTO_TEST_CASE(actor_this_actor) { + stlab::actor a(stlab::default_executor, current_test_name()); + + try { + stlab::this_actor::get(); + BOOST_REQUIRE(false); // "Expected a throw - not run in an actor"; + } catch (...) { } + + BOOST_REQUIRE(stlab::this_actor::get_id() == stlab::actor_id{0}); + + auto f = a([_a = a](){ + auto this_instance = stlab::this_actor::get(); + BOOST_REQUIRE(this_instance == _a); + + auto this_actor_id = stlab::this_actor::get_id(); + BOOST_REQUIRE(this_actor_id == _a.get_id()); + BOOST_REQUIRE(this_actor_id != stlab::actor_id{0}); + }); + + stlab::await(f); +} + +/**************************************************************************************************/