Skip to content

Commit 9d92450

Browse files
Rob Lyerlymeta-codesync[bot]
authored andcommitted
Coro <-> Fiber adapter for NavyWorkers
Summary: Create a helper utility that exports a coroutine interface that can be used to wait for NavyWorker's fiber-based async processing. The utility provides a couple of niceties above a bare-bones async framework conversion: - Supports cancellation and provides a cleanup callback that will be run if the coroutine is cancelled when the fiber completes the async work (i.e., no consumer of that async work) - Understands Navy's retry status code and will automatically retry if specified by lower layers - Propagates exceptions from fibers to coros Reviewed By: pbhandar2 Differential Revision: D88489667 fbshipit-source-id: f66cbcf52f6c9fe96cf38db3f561ac20105ff608
1 parent 654eb34 commit 9d92450

File tree

2 files changed

+250
-0
lines changed

2 files changed

+250
-0
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <folly/coro/Promise.h>
20+
#include <folly/coro/Task.h>
21+
22+
#include "cachelib/navy/common/NavyThread.h"
23+
#include "cachelib/navy/common/Types.h"
24+
25+
namespace facebook::cachelib::interface::utils {
26+
namespace detail {
27+
using DefaultCleanupT = decltype([](auto&&) {});
28+
} // namespace detail
29+
30+
/**
31+
* Run a (potentially async) function on a fiber on a Navy worker thread and
32+
* block until the operation completes. Returns the output from the function.
33+
*
34+
* Your function should return a folly::Expected<ValueT, navy::Status> --
35+
* onWorkerThread() will continue retrying if it returns a navy::Status::Retry
36+
* error. Note that the return value from the fiber *must* be movable. If your
37+
* type isn't movable, wrap it in a unique_ptr.
38+
*
39+
* onWorkerThread() runs cleanup() on the fiber if the awaiting coroutine got
40+
* cancelled while the fiber was still executing. This allows cleaning up in
41+
* order to prevent stranded resources (e.g., open RegionDescriptors).
42+
*
43+
* @param func function to run on a worker fiber
44+
* @param cleanup function to run if the coroutine got cancelled while the
45+
* fiber was executing func()
46+
* @return output from func()
47+
*/
48+
template <typename FuncT,
49+
typename CleanupFuncT = detail::DefaultCleanupT,
50+
typename ReturnT = std::invoke_result_t<FuncT>>
51+
folly::coro::Task<ReturnT> onWorkerThread(
52+
navy::NavyThread& thread,
53+
FuncT&& func,
54+
CleanupFuncT&& cleanup = detail::DefaultCleanupT{}) {
55+
// Wrap the fiber output in an RAII struct; this allows the fiber to run
56+
// cleanup() if the coroutine doesn't consume the output.
57+
struct CleanupHelper {
58+
explicit CleanupHelper(CleanupFuncT&& cleanup)
59+
: cleanup_(std::forward<CleanupFuncT>(cleanup)), consumed_(false) {}
60+
CleanupHelper(CleanupHelper&& other) noexcept
61+
: cleanup_(std::move(other.cleanup_)),
62+
result_(std::move(other.result_)),
63+
consumed_(other.consumed_) {
64+
other.consumed_ = true;
65+
}
66+
67+
CleanupHelper(const CleanupHelper&) = delete;
68+
CleanupHelper& operator=(const CleanupHelper&) = delete;
69+
CleanupHelper& operator=(CleanupHelper&& other) noexcept = delete;
70+
71+
~CleanupHelper() {
72+
if (!consumed_) {
73+
cleanup_(std::move(result_));
74+
}
75+
}
76+
77+
CleanupFuncT cleanup_;
78+
ReturnT result_;
79+
bool consumed_;
80+
};
81+
82+
auto promiseFuturePair = folly::coro::makePromiseContract<CleanupHelper>();
83+
auto cancellationToken = co_await folly::coro::co_current_cancellation_token;
84+
thread.addTaskRemote(
85+
[func = std::forward<FuncT>(func),
86+
promise = std::move(promiseFuturePair.first),
87+
cleanupHelper = CleanupHelper(std::forward<CleanupFuncT>(cleanup)),
88+
token = cancellationToken]() mutable {
89+
// setting error is required to kick off the first loop
90+
cleanupHelper.result_ = folly::makeUnexpected(navy::Status::Retry);
91+
while (cleanupHelper.result_.hasError() &&
92+
cleanupHelper.result_.error() == navy::Status::Retry &&
93+
!token.isCancellationRequested()) {
94+
try {
95+
cleanupHelper.result_ = func();
96+
} catch (...) {
97+
// Not going to produce a value so don't run the cleanup
98+
cleanupHelper.consumed_ = true;
99+
promise.setException(std::current_exception());
100+
return;
101+
}
102+
}
103+
promise.setValue(std::move(cleanupHelper));
104+
});
105+
106+
auto retVal = co_await std::move(promiseFuturePair.second);
107+
retVal.consumed_ = true;
108+
co_return std::move(retVal.result_);
109+
}
110+
111+
} // namespace facebook::cachelib::interface::utils
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include <folly/Expected.h>
18+
#include <folly/coro/Collect.h>
19+
#include <folly/coro/GtestHelpers.h>
20+
#include <gtest/gtest.h>
21+
22+
#include "cachelib/interface/utils/CoroFiberAdapter.h"
23+
24+
using namespace facebook::cachelib;
25+
using namespace facebook::cachelib::interface::utils;
26+
27+
class CoroFiberAdapterTest : public testing::Test {
28+
public:
29+
template <typename T>
30+
using Result = folly::Expected<T, navy::Status>;
31+
32+
navy::NavyThread thread_{"test"};
33+
};
34+
35+
CO_TEST_F(CoroFiberAdapterTest, success) {
36+
auto result = co_await onWorkerThread(
37+
thread_,
38+
[]() -> Result<int> { return 42; },
39+
[](auto&&) { FAIL() << "should not have called cleanup function"; });
40+
CO_ASSERT_TRUE(result.hasValue());
41+
EXPECT_EQ(result.value(), 42);
42+
}
43+
44+
CO_TEST_F(CoroFiberAdapterTest, defaultCleanup) {
45+
auto result = co_await onWorkerThread(thread_, []() -> Result<int> {
46+
return folly::makeUnexpected(navy::Status::DeviceError);
47+
});
48+
CO_ASSERT_TRUE(result.hasError());
49+
EXPECT_EQ(result.error(), navy::Status::DeviceError);
50+
}
51+
52+
CO_TEST_F(CoroFiberAdapterTest, retryThenSuccess) {
53+
size_t callCount = 0;
54+
auto result = co_await onWorkerThread(
55+
thread_,
56+
[&callCount]() -> Result<int> {
57+
if (callCount++ < 3) {
58+
return folly::makeUnexpected(navy::Status::Retry);
59+
}
60+
return 100;
61+
},
62+
[](auto&&) { FAIL() << "should not have called cleanup function"; });
63+
CO_ASSERT_TRUE(result.hasValue());
64+
EXPECT_EQ(result.value(), 100);
65+
EXPECT_EQ(callCount, 4);
66+
}
67+
68+
CO_TEST_F(CoroFiberAdapterTest, returnError) {
69+
auto result = co_await onWorkerThread(
70+
thread_,
71+
[]() -> Result<int> {
72+
return folly::makeUnexpected(navy::Status::DeviceError);
73+
},
74+
[](auto&&) { FAIL() << "should not have called cleanup function"; });
75+
CO_ASSERT_TRUE(result.hasError());
76+
EXPECT_EQ(result.error(), navy::Status::DeviceError);
77+
}
78+
79+
CO_TEST_F(CoroFiberAdapterTest, throws) {
80+
auto result = co_await folly::coro::co_awaitTry(onWorkerThread(
81+
thread_,
82+
[]() -> Result<int> { throw std::runtime_error("error"); },
83+
[](auto&&) { FAIL() << "should not have called cleanup function"; }));
84+
CO_ASSERT_TRUE(result.hasException<std::runtime_error>());
85+
}
86+
87+
CO_TEST_F(CoroFiberAdapterTest, cancellation) {
88+
folly::CancellationSource cs;
89+
folly::fibers::Baton fiberReady, cancelReady;
90+
bool ranCleanup = false;
91+
92+
// Ensure the fiber runs the cleanup function in the following situation to
93+
// avoid leaking resources:
94+
//
95+
// 1. Fiber is running an async operation
96+
// 2. Awaiting coro is cancelled (e.g., times out)
97+
// 3. Fiber finishes and sets the promise
98+
99+
auto fiberFunc = [&]() -> Result<int> {
100+
fiberReady.post();
101+
cancelReady.wait();
102+
return 42;
103+
};
104+
auto cleanupFunc = [&ranCleanup](auto&& result) {
105+
ranCleanup = true;
106+
ASSERT_TRUE(result.hasValue());
107+
EXPECT_EQ(result.value(), 42);
108+
};
109+
auto cancelTask = [&]() -> folly::coro::Task<void> {
110+
// Ensure the fiber is inside fiberFunc() before cancellation, otherwise
111+
// onWorkerThread() will skip calling it
112+
fiberReady.wait();
113+
cs.requestCancellation();
114+
cancelReady.post();
115+
co_return;
116+
};
117+
118+
auto result = co_await folly::coro::collectAllTry(
119+
folly::coro::co_withCancellation(cs.getToken(),
120+
onWorkerThread(thread_,
121+
std::move(fiberFunc),
122+
std::move(cleanupFunc))),
123+
cancelTask());
124+
EXPECT_TRUE(std::get<0>(result).hasException<folly::OperationCancelled>());
125+
thread_.drain(); // ensure fiberFunc() finishes running & sets the promise
126+
EXPECT_TRUE(ranCleanup);
127+
}
128+
129+
CO_TEST_F(CoroFiberAdapterTest, concurrent) {
130+
auto task = [&](int value) -> folly::coro::Task<void> {
131+
auto result = co_await onWorkerThread(
132+
thread_,
133+
[value]() -> Result<int> { return value * 2; },
134+
[](auto&&) { FAIL() << "should not have called cleanup function"; });
135+
CO_ASSERT_TRUE(result.hasValue());
136+
EXPECT_EQ(result.value(), value * 2);
137+
};
138+
co_await folly::coro::collectAll(task(1), task(2), task(3), task(4), task(5));
139+
}

0 commit comments

Comments
 (0)