Skip to content

Commit 3eb9cdd

Browse files
committed
Fix task queue
1 parent 6e6a09c commit 3eb9cdd

File tree

3 files changed

+28
-16
lines changed

3 files changed

+28
-16
lines changed

cpp/RNMultithreadingInstaller.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,30 @@
33
#include <RNReanimated/Scheduler.h>
44
#include <RNReanimated/ShareableValue.h>
55
#include <RNReanimated/RuntimeManager.h>
6+
#include <RNReanimated/RuntimeDecorator.h>
7+
#include <RNReanimated/ErrorHandler.h>
8+
#include "MakeJSIRuntime.h"
69

710
#define MAX_THREAD_COUNT 2
811

912
namespace mrousavy {
1013
namespace multithreading {
1114

12-
static ThreadPool pool(MAX_THREAD_COUNT);
15+
static ThreadPool pool(1);
16+
static std::unique_ptr<reanimated::RuntimeManager> manager;
1317

1418
//reanimated::RuntimeManager manager;
1519

1620
void install(jsi::Runtime& runtime) {
21+
// Quickly setup the runtime - this is executed in parallel, and _might_ introduce race conditions if spawnThread is called before this finishes.
22+
pool.enqueue([]() {
23+
auto runtime = makeJSIRuntime();
24+
reanimated::RuntimeDecorator::decorateRuntime(*runtime, "CUSTOM_THREAD");
25+
manager = std::make_unique<reanimated::RuntimeManager>(std::move(runtime),
26+
std::shared_ptr<reanimated::ErrorHandler>(),
27+
std::shared_ptr<reanimated::Scheduler>());
28+
});
29+
1730
// spawnThread(run: () => T): Promise<T>
1831
auto spawnThread = jsi::Function::createFromHostFunction(runtime,
1932
jsi::PropNameID::forAscii(runtime, "spawnThread"),
@@ -37,10 +50,11 @@ void install(jsi::Runtime& runtime) {
3750
.call(runtime, jsi::JSError(runtime, message).value());
3851
};
3952
// TODO: Get correct RuntimeManager instance
40-
auto run = reanimated::ShareableValue::adapt(runtime, arguments[0], nullptr);
53+
auto run = reanimated::ShareableValue::adapt(runtime, arguments[0], manager.get());
4154

42-
pool.enqueue([&resolver, &rejecter, run](jsi::Runtime& runtime) {
55+
pool.enqueue([&resolver, &rejecter, run]() {
4356
try {
57+
auto& runtime = *manager->runtime;
4458
auto func = run->getValue(runtime).asObject(runtime).asFunction(runtime);
4559
auto result = func.callWithThis(runtime, func);
4660

cpp/ThreadPool.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,8 @@ namespace multithreading {
1818
ThreadPool::ThreadPool(size_t threads): stop(false) {
1919
for (size_t i = 0; i < threads; ++i) {
2020
workers.emplace_back([this, i] {
21-
auto runtime = makeJSIRuntime();
22-
std::stringstream stringstream;
23-
stringstream << "THREAD #" << i;
24-
reanimated::RuntimeDecorator::decorateRuntime(*runtime, stringstream.str());
25-
2621
while (true) {
27-
task_t task;
22+
std::function<void()> task;
2823

2924
{
3025
std::unique_lock<std::mutex> lock(this->queue_mutex);
@@ -36,24 +31,29 @@ ThreadPool::ThreadPool(size_t threads): stop(false) {
3631
this->tasks.pop();
3732
}
3833

39-
task(*runtime);
34+
task();
4035
}
4136
});
4237
}
4338
}
4439

4540
// add new work item to the pool
46-
void ThreadPool::enqueue(task_t task) {
41+
std::future<void> ThreadPool::enqueue(std::function<void()> func) {
42+
auto task = std::make_shared<std::packaged_task<void()>>(std::bind(std::forward<std::function<void()>>(func)));
43+
std::future<void> res = task->get_future();
44+
4745
{
4846
std::unique_lock<std::mutex> lock(queue_mutex);
4947

5048
// don't allow enqueueing after stopping the pool
5149
if (stop)
5250
throw std::runtime_error("enqueue on stopped ThreadPool");
5351

54-
tasks.emplace(task);
52+
tasks.emplace([task](){ (*task)(); });
5553
}
54+
5655
condition.notify_one();
56+
return res;
5757
}
5858

5959
// the destructor joins all threads

cpp/ThreadPool.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,19 @@
1515
#include <unordered_map>
1616
#include <jsi/jsi.h>
1717

18-
typedef std::function<void (facebook::jsi::Runtime&)> task_t;
19-
2018
namespace mrousavy {
2119
namespace multithreading {
2220

2321
class ThreadPool {
2422
public:
2523
ThreadPool(size_t threadCount);
26-
void enqueue(task_t task);
24+
std::future<void> enqueue(std::function<void()> task);
2725
~ThreadPool();
2826
private:
2927
// need to keep track of threads so we can join them
3028
std::vector<std::thread> workers;
3129
// the task queue
32-
std::queue<task_t> tasks;
30+
std::queue<std::function<void()>> tasks;
3331

3432
// synchronization
3533
std::mutex queue_mutex;

0 commit comments

Comments
 (0)