diff --git a/src/liblelantus/threadpool.h b/src/liblelantus/threadpool.h index 2b3b873a29..1fa04369d4 100644 --- a/src/liblelantus/threadpool.h +++ b/src/liblelantus/threadpool.h @@ -49,7 +49,7 @@ class ParallelOpThreadPool { // In case of shutdown thread list will be empty and destructor will wait for this thread completion boost::thread::id currentId = boost::this_thread::get_id(); auto pThread = std::find_if(threads.begin(), threads.end(), - [=](const boost::thread &t) { return t.get_id() == currentId; }); + [currentId](const boost::thread &t) { return t.get_id() == currentId; }); if (pThread != threads.end()) { pThread->detach(); threads.erase(pThread); @@ -79,7 +79,7 @@ class ParallelOpThreadPool { // Post a task to the thread pool and return a future to wait for its completion boost::future PostTask(std::function task) { - boost::packaged_task packagedTask(std::move(task)); + boost::packaged_task packagedTask(task); boost::future ret = packagedTask.get_future(); boost::mutex::scoped_lock lock(task_queue_mutex); @@ -91,7 +91,7 @@ class ParallelOpThreadPool { task_queue.emplace(std::move(packagedTask)); task_queue_condition.notify_one(); - return std::move(ret); + return ret; } int GetNumberOfThreads() const { @@ -119,6 +119,11 @@ class ParallelOpThreadPool { boost::mutex::scoped_lock lock(task_queue_mutex); return shutdown; } + + std::size_t GetPendingTaskCount() { + boost::mutex::scoped_lock lock(task_queue_mutex); + return task_queue.size(); + } }; diff --git a/src/spark/state.cpp b/src/spark/state.cpp index 63907d4252..3b8f537377 100644 --- a/src/spark/state.cpp +++ b/src/spark/state.cpp @@ -24,7 +24,7 @@ struct ProofCheckState { static std::map gCheckedSparkSpendTransactions; static CCriticalSection cs_checkedSparkSpendTransactions; -static ParallelOpThreadPool gCheckProofThreadPool(boost::thread::hardware_concurrency()); +static ParallelOpThreadPool gCheckProofThreadPool(std::min(boost::thread::hardware_concurrency(), 4u)); static CSparkState sparkState; @@ -804,17 +804,23 @@ bool CheckSparkSpendTransaction( } else if (!fStatefulSigmaCheck && !gCheckProofThreadPool.IsPoolShutdown()) { // not an urgent check, put the proof into the thread pool for verification - auto future = gCheckProofThreadPool.PostTask([spend, cover_sets]() { - try { - return spark::SpendTransaction::verify(*spend, cover_sets); - } catch (const std::exception &) { - return false; - } - }); - auto &checkState = gCheckedSparkSpendTransactions[hashTx]; - checkState.fChecked = false; - checkState.fResult = false; - checkState.checkInProgress = std::make_shared>(std::move(future)); + // don't post a request if there are too many tasks already + if (gCheckProofThreadPool.GetPendingTaskCount() < (std::size_t)gCheckProofThreadPool.GetNumberOfThreads()/2) { + auto future = gCheckProofThreadPool.PostTask([spend, cover_sets]() mutable { + try { + bool result = spark::SpendTransaction::verify(*spend, cover_sets); + spend.reset(); + cover_sets.clear(); + return result; + } catch (const std::exception &) { + return false; + } + }); + auto &checkState = gCheckedSparkSpendTransactions[hashTx]; + checkState.fChecked = false; + checkState.fResult = false; + checkState.checkInProgress = std::make_shared>(std::move(future)); + } } } while (fRecheckNeeded);