diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 0046823f5df..7860cb9b0f4 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -42,7 +42,6 @@ set(common_sources perf_timer.cpp pruning.cpp spawn.cpp - threadpool.cpp updates.cpp aligned.c timings.cc diff --git a/src/common/dns_utils.cpp b/src/common/dns_utils.cpp index 48dab6961e9..79a5e05e86f 100644 --- a/src/common/dns_utils.cpp +++ b/src/common/dns_utils.cpp @@ -35,12 +35,19 @@ #include #include #include "include_base_utils.h" -#include "common/threadpool.h" #include "crypto/crypto.h" #include #include #include #include +#include +#include +#include +#include +#include +#include +#include + using namespace epee; #undef MONERO_DEFAULT_LOG_CATEGORY @@ -487,17 +494,17 @@ bool load_txt_records_from_dns(std::vector &good_records, const std // send all requests in parallel std::deque avail(dns_urls.size(), false), valid(dns_urls.size(), false); - tools::threadpool& tpool = tools::threadpool::getInstanceForIO(); - tools::threadpool::waiter waiter(tpool); + int threads = std::thread::hardware_concurrency(); + boost::asio::thread_pool thread_pool(threads); for (size_t n = 0; n < dns_urls.size(); ++n) { - tpool.submit(&waiter,[n, dns_urls, &records, &avail, &valid](){ + boost::asio::post(thread_pool, [n, dns_urls, &records, &avail, &valid](){ const auto res = tools::DNSResolver::instance().get_txt_record(dns_urls[n], avail[n], valid[n]); for (const auto &s: res) records[n].insert(s); }); } - waiter.wait(); + thread_pool.wait(); size_t cur_index = first_index; do diff --git a/src/common/threadpool.cpp b/src/common/threadpool.cpp deleted file mode 100644 index faa45e8bdae..00000000000 --- a/src/common/threadpool.cpp +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright (c) 2017-2024, The Monero Project -// -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are -// permitted provided that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other -// materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be -// used to endorse or promote products derived from this software without specific -// prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#include "misc_log_ex.h" -#include "common/threadpool.h" - -#include "cryptonote_config.h" -#include "common/util.h" - -static __thread int depth = 0; -static __thread bool is_leaf = false; - -namespace tools -{ -threadpool::threadpool(unsigned int max_threads) : running(true), active(0) { - create(max_threads); -} - -threadpool::~threadpool() { - destroy(); -} - -void threadpool::destroy() { - try - { - const boost::unique_lock lock(mutex); - running = false; - has_work.notify_all(); - } - catch (...) - { - // if the lock throws, we're just do it without a lock and hope, - // since the alternative is terminate - running = false; - has_work.notify_all(); - } - for (size_t i = 0; i lock(mutex); - boost::thread::attributes attrs; - attrs.set_stack_size(THREAD_STACK_SIZE); - max = max_threads ? max_threads : tools::get_max_concurrency(); - size_t i = max ? max - 1 : 0; - running = true; - while(i--) { - threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this, false))); - } -} - -void threadpool::submit(waiter *obj, std::function f, bool leaf) { - CHECK_AND_ASSERT_THROW_MES(!is_leaf, "A leaf routine is using a thread pool"); - boost::unique_lock lock(mutex); - if (!leaf && ((active == max && !queue.empty()) || depth > 0)) { - // if all available threads are already running - // and there's work waiting, just run in current thread - lock.unlock(); - ++depth; - is_leaf = leaf; - f(); - --depth; - is_leaf = false; - } else { - if (obj) - obj->inc(); - if (leaf) - queue.push_front({obj, f, leaf}); - else - queue.push_back({obj, f, leaf}); - has_work.notify_one(); - } -} - -unsigned int threadpool::get_max_concurrency() const { - return max; -} - -threadpool::waiter::~waiter() -{ - try - { - boost::unique_lock lock(mt); - if (num) - MERROR("wait should have been called before waiter dtor - waiting now"); - } - catch (...) { /* ignore */ } - try - { - wait(); - } - catch (const std::exception &e) - { - /* ignored */ - } -} - -bool threadpool::waiter::wait() { - pool.run(true); - boost::unique_lock lock(mt); - while(num) - cv.wait(lock); - return !error(); -} - -void threadpool::waiter::inc() { - const boost::unique_lock lock(mt); - num++; -} - -void threadpool::waiter::dec() { - const boost::unique_lock lock(mt); - num--; - if (!num) - cv.notify_all(); -} - -void threadpool::run(bool flush) { - boost::unique_lock lock(mutex); - while (running) { - entry e; - while(queue.empty() && running) - { - if (flush) - return; - has_work.wait(lock); - } - if (!running) break; - - active++; - e = std::move(queue.front()); - queue.pop_front(); - lock.unlock(); - ++depth; - is_leaf = e.leaf; - try { e.f(); } - catch (const std::exception &ex) { e.wo->set_error(); try { MERROR("Exception in threadpool job: " << ex.what()); } catch (...) {} } - --depth; - is_leaf = false; - - if (e.wo) - e.wo->dec(); - lock.lock(); - active--; - } -} -} diff --git a/src/common/threadpool.h b/src/common/threadpool.h deleted file mode 100644 index 8d32f425228..00000000000 --- a/src/common/threadpool.h +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright (c) 2017-2024, The Monero Project -// -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are -// permitted provided that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other -// materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be -// used to endorse or promote products derived from this software without specific -// prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace tools -{ -//! A global thread pool -class threadpool -{ -public: - static threadpool& getInstanceForCompute() { - static threadpool instance; - return instance; - } - static threadpool& getInstanceForIO() { - static threadpool instance(8); - return instance; - } - static threadpool *getNewForUnitTests(unsigned max_threads = 0) { - return new threadpool(max_threads); - } - - // The waiter lets the caller know when all of its - // tasks are completed. - class waiter { - boost::mutex mt; - boost::condition_variable cv; - threadpool &pool; - int num; - bool error_flag; - public: - void inc(); - void dec(); - bool wait(); //! Wait for a set of tasks to finish, returns false iff any error - void set_error() noexcept { error_flag = true; } - bool error() const noexcept { return error_flag; } - waiter(threadpool &pool) : pool(pool), num(0), error_flag(false) {} - ~waiter(); - }; - - // Submit a task to the pool. The waiter pointer may be - // NULL if the caller doesn't care to wait for the - // task to finish. - void submit(waiter *waiter, std::function f, bool leaf = false); - - // destroy and recreate threads - void recycle(); - - unsigned int get_max_concurrency() const; - - ~threadpool(); - - private: - threadpool(unsigned int max_threads = 0); - void destroy(); - void create(unsigned int max_threads); - typedef struct entry { - waiter *wo; - std::function f; - bool leaf; - } entry; - std::deque queue; - boost::condition_variable has_work; - boost::mutex mutex; - std::vector threads; - unsigned int active; - unsigned int max; - bool running; - void run(bool flush = false); -}; - -} diff --git a/src/cryptonote_core/blockchain.cpp b/src/cryptonote_core/blockchain.cpp index 72e779058df..f300076f0a3 100644 --- a/src/cryptonote_core/blockchain.cpp +++ b/src/cryptonote_core/blockchain.cpp @@ -29,11 +29,15 @@ // Parts of this file are originally copyright (c) 2012-2013 The Cryptonote developers #include +#include #include #include #include #include #include +#include +#include +#include #include "include_base_utils.h" #include "cryptonote_basic/cryptonote_basic_impl.h" @@ -48,7 +52,6 @@ #include "profile_tools.h" #include "file_io_utils.h" #include "int-util.h" -#include "common/threadpool.h" #include "warnings.h" #include "crypto/hash.h" #include "cryptonote_core.h" @@ -3384,9 +3387,8 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc, std::vector < uint64_t > results; results.resize(tx.vin.size(), 0); - tools::threadpool& tpool = tools::threadpool::getInstanceForCompute(); - tools::threadpool::waiter waiter(tpool); - int threads = tpool.get_max_concurrency(); + int threads = std::thread::hardware_concurrency(); + boost::asio::thread_pool thread_pool(threads); uint64_t max_used_block_height = 0; if (!pmax_used_block_height) @@ -3431,9 +3433,8 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc, { if (threads > 1) { - // ND: Speedup // 1. Thread ring signature verification if possible. - tpool.submit(&waiter, boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index])), true); + boost::asio::post(thread_pool, boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index]))); } else { @@ -3455,8 +3456,7 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc, sig_index++; } if (tx.version == 1 && threads > 1) - if (!waiter.wait()) - return false; + thread_pool.wait(); // enforce min output age if (hf_version >= HF_VERSION_ENFORCE_MIN_AGE) @@ -4925,8 +4925,8 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector(&blocks[thread_height - height], nblocks), std::ref(maps[i])), true); + boost::asio::post(thread_pool, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, epee::span(&blocks[thread_height - height], nblocks), std::ref(maps[i]))); thread_height += nblocks; } - if (!waiter.wait()) - return false; + thread_pool.wait(); m_prepare_height = 0; if (m_cancel) @@ -5132,21 +5130,18 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vectorcan_thread_bulk_indices()) threads = 1; if (threads > 1 && amounts.size() > 1) { - tools::threadpool::waiter waiter(tpool); for (size_t i = 0; i < amounts.size(); i++) { uint64_t amount = amounts[i]; - tpool.submit(&waiter, boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount])), true); + boost::asio::post(thread_pool, boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]))); } - if (!waiter.wait()) - return false; + thread_pool.wait(); } else { diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp index 954dc81e4c0..06e45929999 100644 --- a/src/cryptonote_core/cryptonote_core.cpp +++ b/src/cryptonote_core/cryptonote_core.cpp @@ -39,7 +39,6 @@ using namespace epee; #include "common/util.h" #include "common/updates.h" #include "common/download.h" -#include "common/threadpool.h" #include "common/command_line.h" #include "cryptonote_basic/events.h" #include "warnings.h" @@ -58,6 +57,10 @@ using namespace epee; #include "version.h" #include +#include +#include +#include +#include #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "cn" @@ -1009,11 +1012,11 @@ namespace cryptonote CRITICAL_REGION_LOCAL(m_incoming_tx_lock); - tools::threadpool& tpool = tools::threadpool::getInstanceForCompute(); - tools::threadpool::waiter waiter(tpool); + int threads = std::thread::hardware_concurrency(); + boost::asio::thread_pool thread_pool(threads); epee::span::const_iterator it = tx_blobs.begin(); for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { - tpool.submit(&waiter, [&, i, it] { + boost::asio::post(thread_pool, [&, i, it] { try { results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash); @@ -1026,8 +1029,7 @@ namespace cryptonote } }); } - if (!waiter.wait()) - return false; + thread_pool.wait(); it = tx_blobs.begin(); std::vector already_have(tx_blobs.size(), false); for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { @@ -1045,7 +1047,7 @@ namespace cryptonote } else { - tpool.submit(&waiter, [&, i, it] { + boost::asio::post(thread_pool, [&, i, it] { try { results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash); @@ -1059,8 +1061,8 @@ namespace cryptonote }); } } - if (!waiter.wait()) - return false; + thread_pool.wait(); + std::vector tx_info; tx_info.reserve(tx_blobs.size()); diff --git a/src/ringct/rctSigs.cpp b/src/ringct/rctSigs.cpp index 2d92ba05d4a..d625d539cbf 100644 --- a/src/ringct/rctSigs.cpp +++ b/src/ringct/rctSigs.cpp @@ -31,7 +31,6 @@ #include "misc_log_ex.h" #include "misc_language.h" #include "common/perf_timer.h" -#include "common/threadpool.h" #include "common/util.h" #include "rctSigs.h" #include "bulletproofs.h" @@ -39,6 +38,11 @@ #include "cryptonote_basic/cryptonote_format_utils.h" #include "cryptonote_config.h" +#include +#include +#include +#include + using namespace crypto; using namespace std; @@ -1333,14 +1337,13 @@ namespace rct { try { if (semantics) { - tools::threadpool& tpool = tools::threadpool::getInstanceForCompute(); - tools::threadpool::waiter waiter(tpool); + int threads = std::thread::hardware_concurrency(); + boost::asio::thread_pool thread_pool(threads); std::deque results(rv.outPk.size(), false); DP("range proofs verified?"); for (size_t i = 0; i < rv.outPk.size(); i++) - tpool.submit(&waiter, [&, i] { results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); }); - if (!waiter.wait()) - return false; + boost::asio::post(thread_pool, [&, i] { results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); }); + thread_pool.wait(); for (size_t i = 0; i < results.size(); ++i) { if (!results[i]) { @@ -1383,8 +1386,8 @@ namespace rct { { PERF_TIMER(verRctSemanticsSimple); - tools::threadpool& tpool = tools::threadpool::getInstanceForCompute(); - tools::threadpool::waiter waiter(tpool); + int threads = std::thread::hardware_concurrency(); + boost::asio::thread_pool thread_pool(threads); std::deque results; std::vector bp_proofs; std::vector bpp_proofs; @@ -1468,27 +1471,24 @@ namespace rct { else { for (size_t i = 0; i < rv.p.rangeSigs.size(); i++) - tpool.submit(&waiter, [&, i, offset] { results[i+offset] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); }); + boost::asio::post(thread_pool, [&, i, offset] { results[i+offset] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); }); offset += rv.p.rangeSigs.size(); } } if (!bpp_proofs.empty() && !verBulletproofPlus(bpp_proofs)) { LOG_PRINT_L1("Aggregate range proof verified failed"); - if (!waiter.wait()) - return false; + thread_pool.wait(); return false; } if (!bp_proofs.empty() && !verBulletproof(bp_proofs)) { LOG_PRINT_L1("Aggregate range proof verified failed"); - if (!waiter.wait()) - return false; + thread_pool.wait(); return false; } - if (!waiter.wait()) - return false; + thread_pool.wait(); for (size_t i = 0; i < results.size(); ++i) { if (!results[i]) { LOG_PRINT_L1("Range proof verified failed for proof " << i); @@ -1536,8 +1536,7 @@ namespace rct { const size_t threads = std::max(rv.outPk.size(), rv.mixRing.size()); std::deque results(threads); - tools::threadpool& tpool = tools::threadpool::getInstanceForCompute(); - tools::threadpool::waiter waiter(tpool); + boost::asio::thread_pool thread_pool(threads); const keyV &pseudoOuts = bulletproof || bulletproof_plus ? rv.p.pseudoOuts : rv.pseudoOuts; @@ -1546,15 +1545,14 @@ namespace rct { results.clear(); results.resize(rv.mixRing.size()); for (size_t i = 0 ; i < rv.mixRing.size() ; i++) { - tpool.submit(&waiter, [&, i] { + boost::asio::post(thread_pool, [&, i] { if (is_rct_clsag(rv.type)) results[i] = verRctCLSAGSimple(message, rv.p.CLSAGs[i], rv.mixRing[i], pseudoOuts[i]); else results[i] = verRctMGSimple(message, rv.p.MGs[i], rv.mixRing[i], pseudoOuts[i]); }); } - if (!waiter.wait()) - return false; + thread_pool.wait(); for (size_t i = 0; i < results.size(); ++i) { if (!results[i]) { diff --git a/src/wallet/wallet2.cpp b/src/wallet/wallet2.cpp index 04be12c137e..05934ebd088 100644 --- a/src/wallet/wallet2.cpp +++ b/src/wallet/wallet2.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -45,6 +46,9 @@ #include #include #include +#include +#include +#include #include #include "include_base_utils.h" using namespace epee; @@ -65,7 +69,6 @@ using namespace epee; #include "multisig/multisig_kex_msg.h" #include "multisig/multisig_tx_builder_ringct.h" #include "common/command_line.h" -#include "common/threadpool.h" #include "int-util.h" #include "profile_tools.h" #include "crypto/crypto.h" @@ -3197,8 +3200,8 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect THROW_WALLET_EXCEPTION_IF(blocks.size() != parsed_blocks.size(), error::wallet_internal_error, "size mismatch"); THROW_WALLET_EXCEPTION_IF(!m_blockchain.is_in_bounds(start_height), error::out_of_hashchain_bounds_error); - tools::threadpool& tpool = tools::threadpool::getInstanceForCompute(); - tools::threadpool::waiter waiter(tpool); + int threads = std::thread::hardware_concurrency(); + boost::asio::thread_pool thread_pool(threads); size_t num_txes = 0; std::vector tx_cache_data; @@ -3230,16 +3233,16 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect continue; } if (m_refresh_type != RefreshNoCoinbase) - tpool.submit(&waiter, [&, i, txidx](){ cache_tx_data(parsed_blocks[i].block.miner_tx, get_transaction_hash(parsed_blocks[i].block.miner_tx), tx_cache_data[txidx]); }); + boost::asio::post(thread_pool, [&, i, txidx](){ cache_tx_data(parsed_blocks[i].block.miner_tx, get_transaction_hash(parsed_blocks[i].block.miner_tx), tx_cache_data[txidx]); }); ++txidx; for (size_t idx = 0; idx < parsed_blocks[i].txes.size(); ++idx) { - tpool.submit(&waiter, [&, i, idx, txidx](){ cache_tx_data(parsed_blocks[i].txes[idx], parsed_blocks[i].block.tx_hashes[idx], tx_cache_data[txidx]); }); + boost::asio::post(thread_pool, [&, i, idx, txidx](){ cache_tx_data(parsed_blocks[i].txes[idx], parsed_blocks[i].block.tx_hashes[idx], tx_cache_data[txidx]); }); ++txidx; } } THROW_WALLET_EXCEPTION_IF(txidx != num_txes, error::wallet_internal_error, "txidx does not match tx_cache_data size"); - THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool"); + thread_pool.wait(); hw::device &hwdev = m_account.get_device(); hw::reset_mode rst(hwdev); @@ -3259,15 +3262,15 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect { if (tx_cache_data[i].empty()) continue; - tpool.submit(&waiter, [&gender, &tx_cache_data, i]() { + boost::asio::post(thread_pool, [&gender, &tx_cache_data, i]() { auto &slot = tx_cache_data[i]; for (auto &iod: slot.primary) gender(iod); for (auto &iod: slot.additional) gender(iod); - }, true); + }); } - THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool"); + thread_pool.wait(); auto geniod = [&](const cryptonote::transaction &tx, size_t n_vouts, size_t txidx) { for (size_t k = 0; k < n_vouts; ++k) @@ -3317,7 +3320,7 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect if (parsed_blocks[i].block.major_version >= hf_version_view_tags) geniods.push_back(geniod_params{ tx, n_vouts, txidx }); else - tpool.submit(&waiter, [&, n_vouts, txidx](){ geniod(tx, n_vouts, txidx); }, true); + boost::asio::post(thread_pool, [&, n_vouts, txidx](){ geniod(tx, n_vouts, txidx); }); } ++txidx; for (size_t j = 0; j < parsed_blocks[i].txes.size(); ++j) @@ -3326,7 +3329,7 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect if (parsed_blocks[i].block.major_version >= hf_version_view_tags) geniods.push_back(geniod_params{ parsed_blocks[i].txes[j], parsed_blocks[i].txes[j].vout.size(), txidx }); else - tpool.submit(&waiter, [&, i, j, txidx](){ geniod(parsed_blocks[i].txes[j], parsed_blocks[i].txes[j].vout.size(), txidx); }, true); + boost::asio::post(thread_pool, [&, i, j, txidx](){ geniod(parsed_blocks[i].txes[j], parsed_blocks[i].txes[j].vout.size(), txidx); }); ++txidx; } } @@ -3345,19 +3348,19 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect { size_t batch_end = std::min(batch_start + GENIOD_BATCH_SIZE, geniods.size()); THROW_WALLET_EXCEPTION_IF(batch_end < batch_start, error::wallet_internal_error, "Thread batch end overflow"); - tpool.submit(&waiter, [&geniods, &geniod, batch_start, batch_end]() { + boost::asio::post(thread_pool, [&geniods, &geniod, batch_start, batch_end]() { for (size_t i = batch_start; i < batch_end; ++i) { const geniod_params &gp = geniods[i]; geniod(gp.tx, gp.n_outs, gp.txidx); } - }, true); + }); num_batch_txes += batch_end - batch_start; batch_start = batch_end; } THROW_WALLET_EXCEPTION_IF(num_batch_txes != geniods.size(), error::wallet_internal_error, "txes batched for thread pool did not reach expected value"); } - THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool"); + thread_pool.wait(); hwdev.set_mode(hw::device::NONE); @@ -3460,15 +3463,15 @@ void wallet2::pull_and_parse_next_blocks(bool first, bool try_incremental, uint6 pull_blocks(first, try_incremental, start_height, blocks_start_height, short_chain_history, blocks, o_indices, current_height, process_pool_txs); THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "Mismatched sizes of blocks and o_indices"); - tools::threadpool& tpool = tools::threadpool::getInstanceForCompute(); - tools::threadpool::waiter waiter(tpool); + int threads = std::thread::hardware_concurrency(); + boost::asio::thread_pool thread_pool(threads); parsed_blocks.resize(blocks.size()); for (size_t i = 0; i < blocks.size(); ++i) { - tpool.submit(&waiter, boost::bind(&wallet2::parse_block_round, this, std::cref(blocks[i].block), - std::ref(parsed_blocks[i].block), std::ref(parsed_blocks[i].hash), std::ref(parsed_blocks[i].error)), true); + boost::asio::post(thread_pool, boost::bind(&wallet2::parse_block_round, this, std::cref(blocks[i].block), + std::ref(parsed_blocks[i].block), std::ref(parsed_blocks[i].hash), std::ref(parsed_blocks[i].error))); } - THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool"); + thread_pool.wait(); for (size_t i = 0; i < blocks.size(); ++i) { if (parsed_blocks[i].error) @@ -3502,16 +3505,16 @@ void wallet2::pull_and_parse_next_blocks(bool first, bool try_incremental, uint6 parsed_blocks[i].txes.resize(blocks[i].txs.size()); for (size_t j = 0; j < blocks[i].txs.size(); ++j) { - tpool.submit(&waiter, [&, i, j](){ + boost::asio::post(thread_pool, [&, i, j](){ if (!parse_and_validate_tx_base_from_blob(blocks[i].txs[j].blob, parsed_blocks[i].txes[j])) { boost::unique_lock lock(error_lock); error = true; } - }, true); + }); } } - THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool"); + thread_pool.wait(); last = !blocks.empty() && cryptonote::get_block_height(parsed_blocks.back().block) + 1 == current_height; } catch(...) @@ -4019,8 +4022,8 @@ void wallet2::refresh(bool trusted_daemon, uint64_t start_height, uint64_t & blo size_t try_count = 0; crypto::hash last_tx_hash_id = m_transfers.size() ? m_transfers.back().m_txid : null_hash; std::list short_chain_history; - tools::threadpool& tpool = tools::threadpool::getInstanceForCompute(); - tools::threadpool::waiter waiter(tpool); + int threads = std::thread::hardware_concurrency(); + boost::asio::thread_pool thread_pool(threads); uint64_t blocks_start_height; std::vector blocks; std::vector parsed_blocks; @@ -4092,7 +4095,7 @@ void wallet2::refresh(bool trusted_daemon, uint64_t start_height, uint64_t & blo break; } if (!last) - tpool.submit(&waiter, [&]{pull_and_parse_next_blocks(first, try_incremental, start_height, next_blocks_start_height, short_chain_history, blocks, parsed_blocks, next_blocks, next_parsed_blocks, process_pool_txs, last, error, exception);}); + boost::asio::post(thread_pool, [&]{pull_and_parse_next_blocks(first, try_incremental, start_height, next_blocks_start_height, short_chain_history, blocks, parsed_blocks, next_blocks, next_parsed_blocks, process_pool_txs, last, error, exception);}); if (!first) { @@ -4131,7 +4134,7 @@ void wallet2::refresh(bool trusted_daemon, uint64_t start_height, uint64_t & blo } blocks_fetched += added_blocks; } - THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool"); + thread_pool.wait(); // handle error from async fetching thread if (error) @@ -4165,28 +4168,28 @@ void wallet2::refresh(bool trusted_daemon, uint64_t start_height, uint64_t & blo catch (const tools::error::password_needed&) { blocks_fetched += added_blocks; - THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool"); + thread_pool.wait(); throw; } catch (const error::deprecated_rpc_access&) { - THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool"); + thread_pool.wait(); throw; } catch (const error::reorg_depth_error&) { - THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool"); + thread_pool.wait(); throw; } catch (const error::incorrect_fork_version&) { - THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool"); + thread_pool.wait(); throw; } catch (const std::exception&) { blocks_fetched += added_blocks; - THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool"); + thread_pool.wait(); if(try_count < 3) { LOG_PRINT_L1("Another try pull_blocks (try_count=" << try_count << ")..."); diff --git a/tests/core_tests/chaingen.h b/tests/core_tests/chaingen.h index 4cd969e9a1c..8ba569ba994 100644 --- a/tests/core_tests/chaingen.h +++ b/tests/core_tests/chaingen.h @@ -46,7 +46,6 @@ #include "include_base_utils.h" #include "chaingen_serialization.h" #include "common/command_line.h" -#include "common/threadpool.h" #include "cryptonote_basic/account_boost_serialization.h" #include "cryptonote_basic/cryptonote_basic.h" @@ -808,7 +807,6 @@ inline bool do_replay_events_get_core(std::vector& events, cry t_test_class validator; bool ret = replay_events_through_core(c, events, validator); - tools::threadpool::getInstanceForCompute().recycle(); // c.deinit(); return ret; } diff --git a/tests/unit_tests/CMakeLists.txt b/tests/unit_tests/CMakeLists.txt index e329b7506fa..d88910d6263 100644 --- a/tests/unit_tests/CMakeLists.txt +++ b/tests/unit_tests/CMakeLists.txt @@ -84,7 +84,6 @@ set(unit_tests_sources test_tx_utils.cpp test_peerlist.cpp test_protocol_pack.cpp - threadpool.cpp tx_proof.cpp hardfork.cpp unbound.cpp diff --git a/tests/unit_tests/threadpool.cpp b/tests/unit_tests/threadpool.cpp deleted file mode 100644 index d89f161672b..00000000000 --- a/tests/unit_tests/threadpool.cpp +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright (c) 2018-2024, The Monero Project - -// -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are -// permitted provided that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other -// materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be -// used to endorse or promote products derived from this software without specific -// prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -#include -#include "gtest/gtest.h" -#include "misc_language.h" -#include "common/threadpool.h" - -TEST(threadpool, wait_nothing) -{ - std::shared_ptr tpool(tools::threadpool::getNewForUnitTests()); - tools::threadpool::waiter waiter(*tpool);; - waiter.wait(); -} - -TEST(threadpool, wait_waits) -{ - std::shared_ptr tpool(tools::threadpool::getNewForUnitTests()); - tools::threadpool::waiter waiter(*tpool); - std::atomic b(false); - tpool->submit(&waiter, [&b](){ epee::misc_utils::sleep_no_w(1000); b = true; }); - ASSERT_FALSE(b); - waiter.wait(); - ASSERT_TRUE(b); -} - -TEST(threadpool, one_thread) -{ - std::shared_ptr tpool(tools::threadpool::getNewForUnitTests(1)); - tools::threadpool::waiter waiter(*tpool); - - std::atomic counter(0); - for (size_t n = 0; n < 4096; ++n) - { - tpool->submit(&waiter, [&counter](){++counter;}); - } - waiter.wait(); - ASSERT_EQ(counter, 4096); -} - -TEST(threadpool, many_threads) -{ - std::shared_ptr tpool(tools::threadpool::getNewForUnitTests(256)); - tools::threadpool::waiter waiter(*tpool); - - std::atomic counter(0); - for (size_t n = 0; n < 4096; ++n) - { - tpool->submit(&waiter, [&counter](){++counter;}); - } - waiter.wait(); - ASSERT_EQ(counter, 4096); -} - -static uint64_t fibonacci(std::shared_ptr tpool, uint64_t n) -{ - if (n <= 1) - return n; - uint64_t f1, f2; - tools::threadpool::waiter waiter(*tpool); - tpool->submit(&waiter, [&tpool, &f1, n](){ f1 = fibonacci(tpool, n-1); }); - tpool->submit(&waiter, [&tpool, &f2, n](){ f2 = fibonacci(tpool, n-2); }); - waiter.wait(); - return f1 + f2; -} - -TEST(threadpool, reentrency) -{ - std::shared_ptr tpool(tools::threadpool::getNewForUnitTests(4)); - tools::threadpool::waiter waiter(*tpool); - - uint64_t f = fibonacci(tpool, 13); - waiter.wait(); - ASSERT_EQ(f, 233); -} - -TEST(threadpool, reentrancy) -{ - std::shared_ptr tpool(tools::threadpool::getNewForUnitTests(4)); - tools::threadpool::waiter waiter(*tpool); - - uint64_t f = fibonacci(tpool, 13); - waiter.wait(); - ASSERT_EQ(f, 233); -} - -TEST(threadpool, leaf_throws) -{ - std::shared_ptr tpool(tools::threadpool::getNewForUnitTests()); - tools::threadpool::waiter waiter(*tpool); - - bool thrown = false, executed = false; - tpool->submit(&waiter, [&](){ - try { tpool->submit(&waiter, [&](){ executed = true; }); } - catch(const std::exception &e) { thrown = true; } - }, true); - waiter.wait(); - ASSERT_TRUE(thrown); - ASSERT_FALSE(executed); -} - -TEST(threadpool, leaf_reentrancy) -{ - std::shared_ptr tpool(tools::threadpool::getNewForUnitTests(4)); - tools::threadpool::waiter waiter(*tpool); - - std::atomic counter(0); - for (int i = 0; i < 1000; ++i) - { - tpool->submit(&waiter, [&](){ - tools::threadpool::waiter waiter(*tpool); - for (int j = 0; j < 500; ++j) - { - tpool->submit(&waiter, [&](){ ++counter; }, true); - } - waiter.wait(); - }); - } - waiter.wait(); - ASSERT_EQ(counter, 500000); -}