From b547444a72cf70dbb74820bd1993d9e6f93c465d Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Tue, 28 Oct 2025 21:08:57 -0500 Subject: [PATCH] Update the LCI parcelport to LCI v2 Signed-off-by: Jiakun Yan --- .gitignore | 3 +- .jenkins/lsu-test-coverage/batch.sh | 1 + .jenkins/lsu/env-clang-16.sh | 1 + .jenkins/lsu/env-clang-17.sh | 1 + .jenkins/lsu/env-clang-18.sh | 1 + .jenkins/lsu/env-clang-19.sh | 1 + .jenkins/lsu/env-clang-20.sh | 1 + .jenkins/lsu/env-gcc-13.sh | 1 + .jenkins/lsu/env-gcc-14.sh | 1 + .jenkins/lsu/env-gcc-15.sh | 1 + CMakeLists.txt | 14 +- cmake/HPX_SetupLCI.cmake | 29 +- libs/core/lci_base/CMakeLists.txt | 4 +- .../lci_base/include/hpx/lci_base/lci.hpp | 28 -- .../include/hpx/lci_base/lci_environment.hpp | 16 +- libs/core/lci_base/src/lci_environment.cpp | 67 ++-- libs/core/version/src/version.cpp | 2 +- libs/full/parcelport_lci/CMakeLists.txt | 7 - .../hpx/parcelport_lci/backlog_queue.hpp | 38 -- .../completion_manager_queue.hpp | 24 +- .../completion_manager_sync.hpp | 22 +- .../completion_manager_sync_single.hpp | 22 +- .../completion_manager_sync_single_nolock.hpp | 22 +- .../completion_manager_base.hpp | 12 +- .../include/hpx/parcelport_lci/config.hpp | 17 +- .../include/hpx/parcelport_lci/header.hpp | 14 +- .../hpx/parcelport_lci/parcelport_lci.hpp | 11 +- .../parcelport_lci/putva/receiver_putva.hpp | 200 ----------- .../putva/sender_connection_putva.hpp | 63 ---- .../hpx/parcelport_lci/putva/sender_putva.hpp | 68 ---- .../hpx/parcelport_lci/receiver_base.hpp | 33 -- .../parcelport_lci/sender_connection_base.hpp | 4 +- .../sendrecv/receiver_connection_sendrecv.hpp | 10 +- .../sendrecv/receiver_sendrecv.hpp | 29 +- .../sendrecv/sender_connection_sendrecv.hpp | 15 +- .../sendrecv/sender_sendrecv.hpp | 1 - .../full/parcelport_lci/src/backlog_queue.cpp | 93 ----- .../completion_manager_queue.cpp | 10 +- .../completion_manager_sync.cpp | 18 +- .../completion_manager_sync_single.cpp | 11 +- .../completion_manager_sync_single_nolock.cpp | 11 +- libs/full/parcelport_lci/src/config.cpp | 51 ++- .../parcelport_lci/src/parcelport_lci.cpp | 154 ++++----- .../src/putva/sender_connection_putva.cpp | 324 ------------------ .../parcelport_lci/src/putva/sender_putva.cpp | 31 -- libs/full/parcelport_lci/src/sender_base.cpp | 8 +- .../src/sender_connection_base.cpp | 69 ++-- .../sendrecv/receiver_connection_sendrecv.cpp | 128 +++---- .../src/sendrecv/receiver_sendrecv.cpp | 55 ++- .../sendrecv/sender_connection_sendrecv.cpp | 218 ++++-------- 50 files changed, 480 insertions(+), 1485 deletions(-) delete mode 100644 libs/core/lci_base/include/hpx/lci_base/lci.hpp delete mode 100644 libs/full/parcelport_lci/include/hpx/parcelport_lci/backlog_queue.hpp delete mode 100644 libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp delete mode 100644 libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_connection_putva.hpp delete mode 100644 libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_putva.hpp delete mode 100644 libs/full/parcelport_lci/src/backlog_queue.cpp delete mode 100644 libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp delete mode 100644 libs/full/parcelport_lci/src/putva/sender_putva.cpp diff --git a/.gitignore b/.gitignore index 3ee66370b0e5..ec5bff9f3eb8 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,5 @@ packages .vs .idea cmake-build* -/.cache \ No newline at end of file +/.cache +vscode-build \ No newline at end of file diff --git a/.jenkins/lsu-test-coverage/batch.sh b/.jenkins/lsu-test-coverage/batch.sh index 07537741a0f7..600045659abd 100755 --- a/.jenkins/lsu-test-coverage/batch.sh +++ b/.jenkins/lsu-test-coverage/batch.sh @@ -31,6 +31,7 @@ cmake \ -DHPX_WITH_PARCELPORT_MPI=ON \ -DHPX_WITH_PARCELPORT_LCI=ON \ -DHPX_WITH_FETCH_LCI=ON \ + -DHPX_WITH_LCI_BOOTSTRAP_MPI=ON \ -DCMAKE_CXX_FLAGS="-O0 --coverage" \ -DCMAKE_EXE_LINKER_FLAGS=--coverage diff --git a/.jenkins/lsu/env-clang-16.sh b/.jenkins/lsu/env-clang-16.sh index c52a6f3ead0b..2a811ec1b480 100644 --- a/.jenkins/lsu/env-clang-16.sh +++ b/.jenkins/lsu/env-clang-16.sh @@ -25,6 +25,7 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" +configure_extra_options+=" -DHPX_WITH_LCI_BOOTSTRAP_MPI=ON" configure_extra_options+=" -DCMAKE_C_COMPILER=clang" configure_extra_options+=" -DCMAKE_C_FLAGS=-fPIC" configure_extra_options+=" -DHPX_WITH_LOGGING=OFF" diff --git a/.jenkins/lsu/env-clang-17.sh b/.jenkins/lsu/env-clang-17.sh index 07db8d953f99..0eaf6554f21f 100644 --- a/.jenkins/lsu/env-clang-17.sh +++ b/.jenkins/lsu/env-clang-17.sh @@ -26,6 +26,7 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" +configure_extra_options+=" -DHPX_WITH_LCI_BOOTSTRAP_MPI=ON" configure_extra_options+=" -DCMAKE_C_COMPILER=clang" configure_extra_options+=" -DCMAKE_C_FLAGS=-fPIC" configure_extra_options+=" -DHPX_WITH_LOGGING=OFF" diff --git a/.jenkins/lsu/env-clang-18.sh b/.jenkins/lsu/env-clang-18.sh index bc7573328fcb..31515425b71a 100644 --- a/.jenkins/lsu/env-clang-18.sh +++ b/.jenkins/lsu/env-clang-18.sh @@ -25,6 +25,7 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" +configure_extra_options+=" -DHPX_WITH_LCI_BOOTSTRAP_MPI=ON" configure_extra_options+=" -DCMAKE_C_COMPILER=clang" configure_extra_options+=" -DCMAKE_C_FLAGS=-fPIC" configure_extra_options+=" -DHPX_WITH_LOGGING=OFF" diff --git a/.jenkins/lsu/env-clang-19.sh b/.jenkins/lsu/env-clang-19.sh index ec7844be6bd2..3a4c8466befc 100644 --- a/.jenkins/lsu/env-clang-19.sh +++ b/.jenkins/lsu/env-clang-19.sh @@ -25,6 +25,7 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" +configure_extra_options+=" -DHPX_WITH_LCI_BOOTSTRAP_MPI=ON" configure_extra_options+=" -DCMAKE_C_COMPILER=clang" configure_extra_options+=" -DCMAKE_C_FLAGS=-fPIC" configure_extra_options+=" -DHPX_WITH_LOGGING=OFF" diff --git a/.jenkins/lsu/env-clang-20.sh b/.jenkins/lsu/env-clang-20.sh index 27a8629fb41b..541bbf5384b6 100644 --- a/.jenkins/lsu/env-clang-20.sh +++ b/.jenkins/lsu/env-clang-20.sh @@ -25,6 +25,7 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" +configure_extra_options+=" -DHPX_WITH_LCI_BOOTSTRAP_MPI=ON" configure_extra_options+=" -DCMAKE_C_COMPILER=clang" configure_extra_options+=" -DCMAKE_C_FLAGS=-fPIC" configure_extra_options+=" -DHPX_WITH_LOGGING=OFF" diff --git a/.jenkins/lsu/env-gcc-13.sh b/.jenkins/lsu/env-gcc-13.sh index b2a2a1d4082b..8fd73478dea8 100644 --- a/.jenkins/lsu/env-gcc-13.sh +++ b/.jenkins/lsu/env-gcc-13.sh @@ -25,6 +25,7 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" +configure_extra_options+=" -DHPX_WITH_LCI_BOOTSTRAP_MPI=ON" configure_extra_options+=" -DCMAKE_C_COMPILER=gcc" configure_extra_options+=" -DCMAKE_C_FLAGS=-fPIC" configure_extra_options+=" -DHPX_WITH_DATAPAR_BACKEND=EVE" diff --git a/.jenkins/lsu/env-gcc-14.sh b/.jenkins/lsu/env-gcc-14.sh index 713d9f29c05f..811861806823 100644 --- a/.jenkins/lsu/env-gcc-14.sh +++ b/.jenkins/lsu/env-gcc-14.sh @@ -24,6 +24,7 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" +configure_extra_options+=" -DHPX_WITH_LCI_BOOTSTRAP_MPI=ON" configure_extra_options+=" -DCMAKE_C_COMPILER=gcc" configure_extra_options+=" -DCMAKE_C_FLAGS=-fPIC" configure_extra_options+=" -DHPX_WITH_DATAPAR_BACKEND=STD_EXPERIMENTAL_SIMD" diff --git a/.jenkins/lsu/env-gcc-15.sh b/.jenkins/lsu/env-gcc-15.sh index 8237b1618898..25aa6517d41b 100644 --- a/.jenkins/lsu/env-gcc-15.sh +++ b/.jenkins/lsu/env-gcc-15.sh @@ -24,6 +24,7 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" +configure_extra_options+=" -DHPX_WITH_LCI_BOOTSTRAP_MPI=ON" configure_extra_options+=" -DCMAKE_C_COMPILER=gcc" configure_extra_options+=" -DCMAKE_C_FLAGS=-fPIC" configure_extra_options+=" -DHPX_WITH_DATAPAR_BACKEND=STD_EXPERIMENTAL_SIMD" diff --git a/CMakeLists.txt b/CMakeLists.txt index 44383142582c..25963a1f2672 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1270,7 +1270,15 @@ if(HPX_WITH_NETWORKING) ADVANCED ) hpx_option( - HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" "v1.7.9" + HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" + "2c4d2c59d2b781f54d6c4e64421adcca16502df9" # latest commit as of 2025-10-29 + CATEGORY "Build Targets" + ADVANCED + ) + hpx_option( + HPX_WITH_LCI_BOOTSTRAP_MPI BOOL + "Configure the autofetched LCI with mpi bootstrap support (default: OFF)" + OFF CATEGORY "Build Targets" ADVANCED ) @@ -1405,8 +1413,8 @@ if(HPX_WITH_NETWORKING AND HPX_WITH_PARCELPORT_LCI) hpx_option( HPX_WITH_PARCELPORT_LCI_ENV STRING - "List of environment variables checked to detect LCI (default: MV2_COMM_WORLD_RANK;PMI_RANK;OMPI_COMM_WORLD_SIZE;ALPS_APP_PE;PMIX_RANK;PALS_NODEID)." - "MV2_COMM_WORLD_RANK;PMI_RANK;OMPI_COMM_WORLD_SIZE;ALPS_APP_PE;PMIX_RANK;PALS_NODEID" + "List of environment variables checked to detect LCI (default: MV2_COMM_WORLD_RANK;PMI_RANK;OMPI_COMM_WORLD_SIZE;ALPS_APP_PE;PMIX_RANK;PALS_NODEID;LCT_PMI_FILE_NRANKS)." + "MV2_COMM_WORLD_RANK;PMI_RANK;OMPI_COMM_WORLD_SIZE;ALPS_APP_PE;PMIX_RANK;PALS_NODEID;LCT_PMI_FILE_NRANKS" CATEGORY "Parcelport" ADVANCED ) diff --git a/cmake/HPX_SetupLCI.cmake b/cmake/HPX_SetupLCI.cmake index 99babdc2bf44..077b49e06929 100644 --- a/cmake/HPX_SetupLCI.cmake +++ b/cmake/HPX_SetupLCI.cmake @@ -51,20 +51,31 @@ macro(hpx_setup_lci) fetchcontent_getproperties(lci) if(NOT lci_POPULATED) fetchcontent_populate(lci) + if(NOT LCT_PMI_BACKEND_ENABLE_MPI AND HPX_WITH_LCI_BOOTSTRAP_MPI) + # Configure LCI with MPI bootstrap support + set(LCT_PMI_BACKEND_ENABLE_MPI + ON + CACHE INTERNAL "" + ) + # Set MPI as the first PMI backend to try + set(LCI_PMI_BACKEND_DEFAULT + "mpi;pmix;pmi2;pmi1;file;local" + CACHE INTERNAL "" + ) + endif() set(LCI_FETCHCONTENT_INSTALL OFF CACHE INTERNAL "" ) - set(LCI_COMPILE_DREG - OFF - CACHE INTERNAL "" - ) - set(LCI_USE_DREG_DEFAULT - OFF - CACHE INTERNAL "" - ) enable_language(C) add_subdirectory(${lci_SOURCE_DIR} ${lci_BINARY_DIR}) + + if(NOT LCI_VERSION OR LCI_VERSION VERSION_LESS 2.0.0) + message( + FATAL_ERROR + "LCI version ${LCI_VERSION} found, but version 2.0.0 or higher is required" + ) + endif() # Move LCI target into its own FOLDER set_target_properties(LCI PROPERTIES FOLDER "Core/Dependencies") set(HPX_CMAKE_ADDITIONAL_MODULE_PATH_BUILD @@ -82,7 +93,7 @@ macro(hpx_setup_lci) ) install( - DIRECTORY ${lci_SOURCE_DIR}/lci/api/ ${lci_BINARY_DIR}/lci/api/ + DIRECTORY ${lci_SOURCE_DIR}/lci/src/api/ ${lci_BINARY_DIR}/lci/src/api/ ${lci_SOURCE_DIR}/lct/api/ ${lci_BINARY_DIR}/lct/api/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} COMPONENT core diff --git a/libs/core/lci_base/CMakeLists.txt b/libs/core/lci_base/CMakeLists.txt index 1130d3baf7f7..de53f6a3c61b 100644 --- a/libs/core/lci_base/CMakeLists.txt +++ b/libs/core/lci_base/CMakeLists.txt @@ -15,7 +15,7 @@ include(HPX_SetupLCI) hpx_setup_lci() # Default location is $HPX_ROOT/libs/lci_base/include -set(lci_base_headers hpx/lci_base/lci.hpp hpx/lci_base/lci_environment.hpp) +set(lci_base_headers hpx/lci_base/lci_environment.hpp) set(lci_base_sources lci_environment.cpp) @@ -27,6 +27,6 @@ add_hpx_module( HEADERS ${lci_base_headers} MODULE_DEPENDENCIES hpx_logging hpx_runtime_configuration hpx_string_util hpx_util - DEPENDENCIES LCI::LCI + DEPENDENCIES LCI::LCI LCI::LCT CMAKE_SUBDIRS examples tests ) diff --git a/libs/core/lci_base/include/hpx/lci_base/lci.hpp b/libs/core/lci_base/include/hpx/lci_base/lci.hpp deleted file mode 100644 index 0eb18e4d0a08..000000000000 --- a/libs/core/lci_base/include/hpx/lci_base/lci.hpp +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (c) 2017 Mikael Simberg -// -// SPDX-License-Identifier: BSL-1.0 -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) - -#pragma once - -#if (defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI)) || \ - defined(HPX_HAVE_MODULE_LCI_BASE) - -#if defined(__clang__) -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wcast-qual" -#elif defined(__GNUC__) -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wcast-qual" -#endif - -#include "lci.h" - -#if defined(__clang__) -#pragma clang diagnostic pop -#elif defined(__GNUC__) -#pragma GCC diagnostic pop -#endif - -#endif diff --git a/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp b/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp index 25bba48818b8..0084d2ee564c 100644 --- a/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp +++ b/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2013-2015 Thomas Heller +// Copyright (c) 2025 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -11,7 +11,6 @@ #if (defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI)) || \ defined(HPX_HAVE_MODULE_LCI_BASE) -#include #include #include @@ -23,18 +22,17 @@ #include +#include "lci.hpp" +#include "lct.h" + namespace hpx { namespace util { struct HPX_EXPORT lci_environment { static bool check_lci_environment(runtime_configuration& cfg); - static void init_config(runtime_configuration& cfg); static void init(int* argc, char*** argv, runtime_configuration& cfg); static void finalize(); - static bool do_progress(LCI_device_t device); - static bool do_progress(); - static bool enabled(); static int rank(); @@ -42,6 +40,11 @@ namespace hpx { namespace util { static std::string get_processor_name(); + static int get_max_tag(); + + // progress + static bool do_progress(::lci::device_t device); + // log enum class log_level_t { @@ -55,6 +58,7 @@ namespace hpx { namespace util { #endif static void log( log_level_t level, const char* tag, const char* format, ...); + // performance counter // clang-format off #define HPX_LCI_PCOUNTER_NONE_FOR_EACH(_macro) diff --git a/libs/core/lci_base/src/lci_environment.cpp b/libs/core/lci_base/src/lci_environment.cpp index f3e51a29eb30..05d82d105a87 100644 --- a/libs/core/lci_base/src/lci_environment.cpp +++ b/libs/core/lci_base/src/lci_environment.cpp @@ -1,5 +1,4 @@ -// Copyright (c) 2013-2015 Thomas Heller -// Copyright (c) 2020 Google +// Copyright (c) 2025 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -104,6 +103,7 @@ namespace hpx { namespace util { HPX_LCI_PCOUNTER_NONE_FOR_EACH(HPX_LCI_PCOUNTER_HANDLE_DEF) HPX_LCI_PCOUNTER_TREND_FOR_EACH(HPX_LCI_PCOUNTER_HANDLE_DEF) HPX_LCI_PCOUNTER_TIMER_FOR_EACH(HPX_LCI_PCOUNTER_HANDLE_DEF) + /////////////////////////////////////////////////////////////////////////// void lci_environment::init( int*, char***, util::runtime_configuration& rtcfg) @@ -111,20 +111,8 @@ namespace hpx { namespace util { if (enabled_) return; // don't call twice - LCI_error_t retval; - int lci_initialized = 0; - LCI_initialized(&lci_initialized); - if (!lci_initialized) - { - retval = LCI_initialize(); - if (LCI_OK != retval) - { - rtcfg.add_entry("hpx.parcel.lci.enable", "0"); - enabled_ = false; - throw std::runtime_error( - "lci_environment::init: LCI_initialize failed"); - } - } + LCT_init(); + ::lci::global_initialize(); int this_rank = rank(); @@ -145,8 +133,7 @@ namespace hpx { namespace util { rtcfg.add_entry("hpx.parcel.bootstrap", "lci"); rtcfg.add_entry("hpx.parcel.lci.rank", std::to_string(this_rank)); - LCT_init(); - // initialize the log context + #ifdef HPX_HAVE_PARCELPORT_LCI_LOG const char* const log_levels[] = {"none", "profile", "debug"}; log_ctx = LCT_log_ctx_alloc(log_levels, @@ -158,7 +145,6 @@ namespace hpx { namespace util { log_level = log_level_t::none; #endif #ifdef HPX_HAVE_PARCELPORT_LCI_PCOUNTER - // initialize the performance counters pcounter_ctx = LCT_pcounter_ctx_alloc("hpx-lci"); #define HPX_LCI_PCOUNTER_NONE_REGISTER(name) \ @@ -192,25 +178,11 @@ namespace hpx { namespace util { #ifdef HPX_HAVE_PARCELPORT_LCI_LOG LCT_log_ctx_free(&log_ctx); #endif + ::lci::global_finalize(); LCT_fina(); - int lci_init = 0; - LCI_initialized(&lci_init); - if (lci_init) - { - LCI_finalize(); - } } } - bool lci_environment::do_progress(LCI_device_t device) - { - if (!device) - return false; - LCI_error_t ret = LCI_progress(device); - HPX_ASSERT(ret == LCI_OK || ret == LCI_ERR_RETRY); - return ret == LCI_OK; - } - bool lci_environment::enabled() { return enabled_; @@ -220,7 +192,7 @@ namespace hpx { namespace util { { int res(-1); if (enabled()) - res = LCI_NUM_PROCESSES; + res = lci::get_rank_n(); return res; } @@ -228,10 +200,25 @@ namespace hpx { namespace util { { int res(-1); if (enabled()) - res = LCI_RANK; + res = lci::get_rank_me(); return res; } + int lci_environment::get_max_tag() + { + if (enabled()) + return ::lci::get_g_runtime().get_attr_max_tag(); + return -1; + } + + bool lci_environment::do_progress(::lci::device_t device) + { + if (device.is_empty()) + return false; + auto ret = ::lci::progress_x().device(device)(); + return ret.is_done(); + } + void lci_environment::log([[maybe_unused]] log_level_t level, [[maybe_unused]] const char* tag, [[maybe_unused]] const char* format, ...) @@ -241,9 +228,7 @@ namespace hpx { namespace util { return; va_list args; va_start(args, format); - LCT_Logv(log_ctx, static_cast(level), tag, format, args); - va_end(args); #endif } @@ -252,16 +237,18 @@ namespace hpx { namespace util { { #ifdef HPX_HAVE_PARCELPORT_LCI_PCOUNTER return static_cast(LCT_now()); -#endif +#else return 0; +#endif } int64_t lci_environment::pcounter_since([[maybe_unused]] int64_t then) { #ifdef HPX_HAVE_PARCELPORT_LCI_PCOUNTER return static_cast(LCT_now()) - then; -#endif +#else return 0; +#endif } void lci_environment::pcounter_add( diff --git a/libs/core/version/src/version.cpp b/libs/core/version/src/version.cpp index 9f15b43790e6..47772e7561c7 100644 --- a/libs/core/version/src/version.cpp +++ b/libs/core/version/src/version.cpp @@ -43,7 +43,7 @@ #if (defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI)) || \ defined(HPX_HAVE_MODULE_LCI_BASE) -#include +#include #endif #if (defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_GASNET)) || \ diff --git a/libs/full/parcelport_lci/CMakeLists.txt b/libs/full/parcelport_lci/CMakeLists.txt index 698994666759..c2bc7bdc0570 100644 --- a/libs/full/parcelport_lci/CMakeLists.txt +++ b/libs/full/parcelport_lci/CMakeLists.txt @@ -19,11 +19,7 @@ set(parcelport_lci_headers hpx/parcelport_lci/receiver_base.hpp hpx/parcelport_lci/sender_base.hpp hpx/parcelport_lci/sender_connection_base.hpp - hpx/parcelport_lci/backlog_queue.hpp hpx/parcelport_lci/parcelport_lci.hpp - hpx/parcelport_lci/putva/sender_putva.hpp - hpx/parcelport_lci/putva/sender_connection_putva.hpp - hpx/parcelport_lci/putva/receiver_putva.hpp hpx/parcelport_lci/sendrecv/sender_sendrecv.hpp hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp @@ -43,11 +39,8 @@ set(parcelport_lci_sources config.cpp locality.cpp parcelport_lci.cpp - backlog_queue.cpp sender_connection_base.cpp sender_base.cpp - putva/sender_putva.cpp - putva/sender_connection_putva.cpp sendrecv/sender_sendrecv.cpp sendrecv/sender_connection_sendrecv.cpp sendrecv/receiver_sendrecv.cpp diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/backlog_queue.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/backlog_queue.hpp deleted file mode 100644 index 3cb583ce7a88..000000000000 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/backlog_queue.hpp +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) 2023-2024 Jiakun Yan -// Copyright (c) 2014-2023 Thomas Heller -// -// SPDX-License-Identifier: BSL-1.0 -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) - -#pragma once - -#include - -#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) - -#include -#include -#include - -namespace hpx::parcelset::policies::lci { - struct completion_manager_base; - struct sender_connection_base; - namespace backlog_queue { - using message_type = sender_connection_base; - using message_ptr = std::shared_ptr; - struct backlog_queue_t - { - // pending messages per destination - std::vector> messages; - }; - - void push(message_ptr message); - bool empty(int dst_rank); - bool background_work(completion_manager_base* completion_manager, - size_t num_thread) noexcept; - void free(); - } // namespace backlog_queue -} // namespace hpx::parcelset::policies::lci - -#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp index 1a8c15a63404..8da38f5532b8 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp @@ -17,39 +17,41 @@ namespace hpx::parcelset::policies::lci { struct completion_manager_queue : public completion_manager_base { - completion_manager_queue(parcelport* pp) + completion_manager_queue(parcelport* pp, bool zero_copy_am_ = false) : completion_manager_base(pp) { - // LCI_queue_create(LCI_UR_DEVICE, &queue); - // Hack for now - LCI_queue_createx(LCI_UR_DEVICE, - LCI_SERVER_NUM_PKTS * (size_t) config_t::ndevices, &queue); + queue = ::lci::alloc_cq_x().zero_copy_am(zero_copy_am_)(); } ~completion_manager_queue() { - LCI_queue_free(&queue); + ::lci::free_comp(&queue); } - LCI_comp_t alloc_completion() + ::lci::comp_t alloc_completion() { return queue; } - void enqueue_completion(LCI_comp_t comp) + void free_completion(::lci::comp_t comp) { HPX_UNUSED(comp); } - LCI_request_t poll(); + void enqueue_completion(::lci::comp_t comp) + { + HPX_UNUSED(comp); + } + + ::lci::status_t poll(); - LCI_comp_t get_completion_object() + ::lci::comp_t get_completion_object() { return queue; } private: - LCI_comp_t queue; + ::lci::comp_t queue; }; } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp index ab2c10cca190..4aa4b1f3bcbc 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp @@ -25,24 +25,34 @@ namespace hpx::parcelset::policies::lci { ~completion_manager_sync() {} - LCI_comp_t alloc_completion() + ::lci::comp_t alloc_completion() { - LCI_comp_t sync; - LCI_sync_create(LCI_UR_DEVICE, 1, &sync); + ::lci::comp_t sync = ::lci::alloc_sync(); return sync; } - void enqueue_completion(LCI_comp_t comp) + void free_completion(::lci::comp_t comp) + { + ::lci::free_comp(&comp); + } + + void enqueue_completion(::lci::comp_t comp) { std::unique_lock l(lock); sync_list.push_back(comp); } - LCI_request_t poll(); + ::lci::status_t poll(); + + ::lci::comp_t get_completion_object() + { + throw std::runtime_error("completion_manager_sync does not have a " + "single completion object"); + } private: hpx::spinlock lock; - std::deque sync_list; + std::deque<::lci::comp_t> sync_list; }; } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single.hpp index 8e6bdb7ab05d..daea8fc60f9b 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single.hpp @@ -19,30 +19,40 @@ namespace hpx::parcelset::policies::lci { completion_manager_sync_single(parcelport* pp) : completion_manager_base(pp) { - LCI_sync_create(LCI_UR_DEVICE, 1, &sync); + sync = ::lci::alloc_sync(); } ~completion_manager_sync_single() { - LCI_sync_free(&sync); + ::lci::free_comp(&sync); } - LCI_comp_t alloc_completion() + ::lci::comp_t alloc_completion() { return sync; } - void enqueue_completion(LCI_comp_t comp) + void free_completion(::lci::comp_t comp) + { + HPX_UNUSED(comp); + } + + void enqueue_completion(::lci::comp_t comp) { HPX_UNUSED(comp); lock.unlock(); } - LCI_request_t poll(); + ::lci::status_t poll(); + + ::lci::comp_t get_completion_object() + { + return sync; + } private: hpx::spinlock lock; - LCI_comp_t sync; + ::lci::comp_t sync; }; } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single_nolock.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single_nolock.hpp index 2eef58f28964..ec85820ce83d 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single_nolock.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single_nolock.hpp @@ -20,28 +20,38 @@ namespace hpx::parcelset::policies::lci { completion_manager_sync_single_nolock(parcelport* pp) : completion_manager_base(pp) { - LCI_sync_create(LCI_UR_DEVICE, 1, &sync); + sync = ::lci::alloc_sync(); } ~completion_manager_sync_single_nolock() { - LCI_sync_free(&sync); + ::lci::free_comp(&sync); } - LCI_comp_t alloc_completion() + ::lci::comp_t alloc_completion() { return sync; } - void enqueue_completion(LCI_comp_t comp) + void free_completion(::lci::comp_t comp) { HPX_UNUSED(comp); } - LCI_request_t poll(); + void enqueue_completion(::lci::comp_t comp) + { + HPX_UNUSED(comp); + } + + ::lci::status_t poll(); + + ::lci::comp_t get_completion_object() + { + return sync; + } private: - LCI_comp_t sync; + ::lci::comp_t sync; }; } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager_base.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager_base.hpp index fe005e3058a1..cb25deaf2234 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager_base.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager_base.hpp @@ -20,13 +20,11 @@ namespace hpx::parcelset::policies::lci { completion_manager_base(parcelport* pp) noexcept : pp_(pp) {}; virtual ~completion_manager_base() {} - virtual LCI_comp_t alloc_completion() = 0; - virtual void enqueue_completion(LCI_comp_t comp) = 0; - virtual LCI_request_t poll() = 0; - virtual LCI_comp_t get_completion_object() - { - return nullptr; - } + virtual ::lci::comp_t alloc_completion() = 0; + virtual void free_completion(::lci::comp_t comp) = 0; + virtual void enqueue_completion(::lci::comp_t comp) = 0; + virtual ::lci::status_t poll() = 0; + virtual ::lci::comp_t get_completion_object() = 0; parcelport* pp_; }; } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp index 6859a7c76127..8ab7eabd6bfb 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp @@ -19,19 +19,13 @@ namespace hpx::parcelset::policies::lci { static bool is_initialized; // whether to bypass the parcel queue and connection cache. static bool enable_send_immediate; - // whether to enable the backlog queue and eager message aggregation - static bool enable_lci_backlog_queue; // which protocol to use enum class protocol_t { - putva, sendrecv, putsendrecv, }; static protocol_t protocol; - // Whether sending header requires completion - static bool enable_sendmc; - // which completion mechanism to use for header messages enum class comp_type_t { queue, @@ -39,6 +33,7 @@ namespace hpx::parcelset::policies::lci { sync_single, sync_single_nolock, }; + // which completion mechanism to use for header messages static comp_type_t completion_type_header; // which completion mechanism to use for followup messages static comp_type_t completion_type_followup; @@ -52,13 +47,19 @@ namespace hpx::parcelset::policies::lci { poll, // progress when polling completion }; static progress_type_t progress_type; + // which device to make progress when a worker thread calls progress + enum class progress_strategy_t + { + local, // HPX resource partitioner + global, // Normal progress pthread + random, // HPX worker thread + }; + static progress_strategy_t progress_strategy; // How many progress threads to create static int progress_thread_num; // How many pre-posted receives for new messages // (can only be applied to `sendrecv` protocol). static int prepost_recv_num; - // Whether to register the buffer in HPX (or rely on LCI to register it) - static bool reg_mem; // How many devices to use static int ndevices; // How many completion managers to use diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp index 213a72e6d5ae..117888aa0b67 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp @@ -28,8 +28,6 @@ namespace hpx::parcelset::policies::lci { { // signature for assert_valid int signature; - // device idx - int device_idx; // tag int tag; // non-zero-copy chunk size @@ -166,17 +164,7 @@ namespace hpx::parcelset::policies::lci { return reinterpret_cast(data_)->signature; } - void set_device_idx(int device_idx) noexcept - { - reinterpret_cast(data_)->device_idx = device_idx; - } - - [[nodiscard]] int get_device_idx() const noexcept - { - return reinterpret_cast(data_)->device_idx; - } - - void set_tag(LCI_tag_t tag) noexcept + void set_tag(::lci::tag_t tag) noexcept { reinterpret_cast(data_)->tag = tag; } diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp index 8c83a0423112..d93e061dbc27 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp @@ -15,7 +15,6 @@ #include #include -#include #include #include #include @@ -130,9 +129,7 @@ namespace hpx::parcelset { // by LCI. They would not be modified once initialized. // So we should not have false sharing here. int idx; - LCI_device_t device; - LCI_endpoint_t endpoint_new; - LCI_endpoint_t endpoint_followup; + ::lci::device_t device; completion_manager_t* completion_manager_p; }; std::vector devices; @@ -145,10 +142,12 @@ namespace hpx::parcelset { std::shared_ptr send; std::shared_ptr recv_new; std::shared_ptr recv_followup; + ::lci::rcomp_t recv_new_rcomp; }; std::vector completion_managers; bool do_progress_local(); + std::size_t get_tls_device_idx(); device_t& get_tls_device(); private: @@ -259,14 +258,13 @@ namespace hpx::traits { "log_level = none\n" "log_outfile = stderr\n" "sendimm = 1\n" - "backlog_queue = 0\n" "prg_thread_num = 1\n" "protocol = putsendrecv\n" "comp_type_header = queue\n" "comp_type_followup = queue\n" "progress_type = worker\n" + "progress_strategy = local\n" "prepost_recv_num = 1\n" - "reg_mem = 1\n" "ndevices = 2\n" "ncomps = 1\n" "enable_in_buffer_assembly = 1\n" @@ -274,7 +272,6 @@ namespace hpx::traits { "mbuffer_alloc_max_retry = 32\n" "bg_work_max_count = 32\n" "bg_work_when_send = 0\n" - "enable_sendmc = 0\n" "comp_type = deprecated\n"; } }; diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp deleted file mode 100644 index cdfdd1d01f82..000000000000 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright (c) 2023-2024 Jiakun Yan -// Copyright (c) 2007-2023 Hartmut Kaiser -// Copyright (c) 2014-2015 Thomas Heller -// -// SPDX-License-Identifier: BSL-1.0 -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) - -#pragma once - -#include - -#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace hpx::parcelset::policies::lci { - struct receiver_putva : public receiver_base - { - explicit receiver_putva(parcelport* pp) noexcept - : receiver_base(pp) - { - } - - bool background_work() noexcept - { - bool did_some_work = false; - - auto poll_comp_start = util::lci_environment::pcounter_now(); - auto completion_manager_p = - pp_->get_tls_device().completion_manager_p; - request_wrapper_t request; - request.request = completion_manager_p->recv_new->poll(); - util::lci_environment::pcounter_add( - util::lci_environment::poll_comp, - util::lci_environment::pcounter_since(poll_comp_start)); - - if (request.request.flag == LCI_OK) - { - auto useful_bg_start = util::lci_environment::pcounter_now(); - HPX_ASSERT(request.request.flag == LCI_OK); - process_request(request.request); - util::lci_environment::pcounter_add( - util::lci_environment::useful_bg_work, - util::lci_environment::pcounter_since(useful_bg_start)); - did_some_work = true; - } - return did_some_work; - } - - private: - void process_request(LCI_request_t request) - { - if (request.type == LCI_MEDIUM) - { - size_t consumed = 0; - while (consumed < request.data.mbuffer.length) - { - buffer_type buffer; - consumed += decode_eager( - (char*) request.data.mbuffer.address + consumed, - buffer); - handle_received_parcels( - decode_parcels(*pp_, HPX_MOVE(buffer))); - } - HPX_ASSERT(consumed == request.data.mbuffer.length); - } - else - { - // iovec - HPX_ASSERT(request.type == LCI_IOVEC); - buffer_type buffer; - decode_iovec(request.data.iovec, buffer); - handle_received_parcels(decode_parcels(*pp_, HPX_MOVE(buffer))); - } - } - - size_t decode_eager(void* address, buffer_type& buffer) - { -#if defined(HPX_HAVE_PARCELPORT_COUNTERS) - hpx::chrono::high_resolution_timer timer_; - parcelset::data_point& data = buffer.data_point_; -#endif - util::lci_environment::pcounter_add( - util::lci_environment::recv_conn_start, 1); - // decode header - header header_ = header((char*) address); - header_.assert_valid(); - HPX_ASSERT( - header_.piggy_back_data() && !header_.piggy_back_tchunk()); - HPX_ASSERT(header_.num_zero_copy_chunks() == 0); // decode data - // decode data - buffer.data_.length = header_.numbytes_nonzero_copy(); - buffer.data_.ptr = header_.piggy_back_data(); - // decode transmission chunk - int num_zero_copy_chunks = header_.num_zero_copy_chunks(); - int num_non_zero_copy_chunks = header_.num_non_zero_copy_chunks(); - buffer.num_chunks_.first = num_zero_copy_chunks; - buffer.num_chunks_.second = num_non_zero_copy_chunks; - util::lci_environment::pcounter_add( - util::lci_environment::recv_conn_end, 1); -#if defined(HPX_HAVE_PARCELPORT_COUNTERS) - data.bytes_ = static_cast(header_.numbytes()); - data.time_ = timer_.elapsed_nanoseconds() - data.time_; -#endif - return header_.size(); - } - - void decode_iovec(LCI_iovec_t iovec, buffer_type& buffer) - { -#if defined(HPX_HAVE_PARCELPORT_COUNTERS) - hpx::chrono::high_resolution_timer timer_; - parcelset::data_point& data = buffer.data_point_; - data.time_ = timer_.elapsed_nanoseconds(); -#endif - util::lci_environment::pcounter_add( - util::lci_environment::recv_conn_start, 1); - // decode header - header header_ = header((char*) iovec.piggy_back.address); - header_.assert_valid(); - int i = 0; - // decode data - char* piggy_back_data = header_.piggy_back_data(); - if (piggy_back_data) - { - buffer.data_.length = header_.numbytes_nonzero_copy(); - buffer.data_.ptr = piggy_back_data; - } - else - { - HPX_ASSERT((size_t) header_.numbytes_nonzero_copy() == - iovec.lbuffers[i].length); - buffer.data_.length = header_.numbytes_nonzero_copy(); - buffer.data_.ptr = iovec.lbuffers[i].address; - ++i; - } - if (header_.num_zero_copy_chunks() != 0) - { - // decode transmission chunk - int num_zero_copy_chunks = header_.num_zero_copy_chunks(); - int num_non_zero_copy_chunks = - header_.num_non_zero_copy_chunks(); - buffer.num_chunks_.first = num_zero_copy_chunks; - buffer.num_chunks_.second = num_non_zero_copy_chunks; - auto& tchunks = buffer.transmission_chunks_; - tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks); - size_t tchunks_length = tchunks.size() * - sizeof(buffer_type::transmission_chunk_type); - char* piggy_back_tchunk = header_.piggy_back_tchunk(); - if (piggy_back_tchunk) - { - std::memcpy((void*) tchunks.data(), piggy_back_tchunk, - tchunks_length); - } - else - { - HPX_ASSERT( - (size_t) tchunks_length == iovec.lbuffers[i].length); - std::memcpy((void*) tchunks.data(), - iovec.lbuffers[i].address, tchunks_length); - ++i; - } - // zero-copy chunks - buffer.chunks_.resize(num_zero_copy_chunks); - for (int j = 0; j < num_zero_copy_chunks; ++j) - { - size_t chunk_size = buffer.transmission_chunks_[j].second; - HPX_ASSERT(iovec.lbuffers[i].length == chunk_size); - buffer.chunks_[j] = serialization::create_pointer_chunk( - iovec.lbuffers[i].address, chunk_size); - ++i; - } - } - HPX_ASSERT(i == iovec.count); - util::lci_environment::pcounter_add( - util::lci_environment::recv_conn_end, 1); -#if defined(HPX_HAVE_PARCELPORT_COUNTERS) - data.bytes_ = static_cast(header_.numbytes()); - data.time_ = timer_.elapsed_nanoseconds(); -#endif - } - }; - -} // namespace hpx::parcelset::policies::lci - -#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_connection_putva.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_connection_putva.hpp deleted file mode 100644 index 951bb91a0091..000000000000 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_connection_putva.hpp +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright (c) 2023-2024 Jiakun Yan -// Copyright (c) 2014-2015 Thomas Heller -// -// SPDX-License-Identifier: BSL-1.0 -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) - -#pragma once - -#include - -#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) - -#include - -#include -#include -#include -#include -#include -#include -#include - -namespace hpx::parcelset::policies::lci { - struct sender_connection_putva : public sender_connection_base - { - public: - sender_connection_putva(int dst, parcelset::parcelport* pp) - : sender_connection_base(dst, pp) - { - } - ~sender_connection_putva() {} - void load(handler_type&& handler, - postprocess_handler_type&& parcel_postprocess); - return_t send_nb(); - void done(); - bool tryMerge( - const std::shared_ptr& other_base); - - private: - enum class connection_state - { - initialized, - sent, - locked, - }; - bool can_be_eager_message(size_t max_header_size); - bool isEager(); - void cleanup(); - return_t send_msg(); - - std::atomic state; - bool is_eager; - LCI_mbuffer_t mbuffer; - LCI_iovec_t iovec; - std::shared_ptr* - sharedPtr_p; // for LCI_putva - // for profiling - LCT_time_t conn_start_time; - }; -} // namespace hpx::parcelset::policies::lci - -#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_putva.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_putva.hpp deleted file mode 100644 index 0ec39fcfdc05..000000000000 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_putva.hpp +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright (c) 2023-2024 Jiakun Yan -// Copyright (c) 2007-2013 Hartmut Kaiser -// Copyright (c) 2014-2015 Thomas Heller -// -// SPDX-License-Identifier: BSL-1.0 -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) - -#pragma once - -#include - -#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -namespace hpx::parcelset::policies::lci { - struct sender_connection_putva; - struct sender_putva : public sender_base - { - explicit sender_putva(parcelport* pp) noexcept - : sender_base(pp) - { - } - - connection_ptr create_connection(int dest, parcelset::parcelport* pp); - }; - -} // namespace hpx::parcelset::policies::lci - -#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/receiver_base.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/receiver_base.hpp index bb7cd27ff0c7..bbd999245208 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/receiver_base.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/receiver_base.hpp @@ -83,39 +83,6 @@ namespace hpx::parcelset::policies::lci { } }; - struct request_wrapper_t - { - LCI_request_t request; - request_wrapper_t() - { - request.flag = LCI_ERR_RETRY; - } - ~request_wrapper_t() - { - if (request.flag == LCI_OK) - { - if (request.type == LCI_IOVEC) - { - for (int j = 0; j < request.data.iovec.count; ++j) - { - LCI_lbuffer_free(request.data.iovec.lbuffers[j]); - } - free(request.data.iovec.lbuffers); - free(request.data.iovec.piggy_back.address); - } - else - { - HPX_ASSERT(request.type = LCI_MEDIUM); - LCI_mbuffer_free(request.data.mbuffer); - } - } - else - { - HPX_ASSERT(request.flag == LCI_ERR_RETRY); - } - } - }; - struct receiver_base { using buffer_type = parcel_buffer; diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp index ecc51aff7a3c..d0f3dfad110a 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp @@ -46,7 +46,7 @@ namespace hpx::parcelset::policies::lci { struct return_t { return_status_t status; - LCI_comp_t completion; + ::lci::comp_t completion; }; sender_connection_base(int dst, parcelset::parcelport* pp) : dst_rank(dst) @@ -75,8 +75,6 @@ namespace hpx::parcelset::policies::lci { return_t send(bool in_bg_work); virtual return_t send_nb() = 0; virtual void done() = 0; - virtual bool tryMerge( - const std::shared_ptr& other_base) = 0; void profile_start_hook(const header& header_); void profile_end_hook(); diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp index 67b1cf4fc50a..cb7b1857959a 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp @@ -29,7 +29,7 @@ namespace hpx::parcelset::policies::lci { struct return_t { bool isDone; - LCI_comp_t completion; + ::lci::comp_t completion; }; public: @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci { rcvd_chunks, locked }; - LCI_comp_t unified_recv(void* address, size_t length); + return_t unified_recv(void* address, size_t length); return_t receive_transmission_chunks(); return_t receive_data(); return_t receive_chunks(); @@ -64,16 +64,14 @@ namespace hpx::parcelset::policies::lci { int dst_rank; bool need_recv_data; bool need_recv_tchunks; - LCI_tag_t tag; - LCI_tag_t original_tag; + ::lci::tag_t tag; + ::lci::tag_t original_tag; receiver_base::buffer_type buffer; std::vector parcels_; std::vector> chunk_buffers_; parcelport* pp_; parcelport::device_t* device_p; std::shared_ptr* sharedPtr_p; - // temporary data - LCI_segment_t segment_used; // for profiling LCT_time_t conn_start_time; }; diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp index 6322048371bc..d1229f621e0b 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp @@ -42,20 +42,21 @@ namespace hpx::parcelset::policies::lci { { if (config_t::protocol == config_t::protocol_t::sendrecv) { - for (std::size_t i = 0; i < pp_->devices.size(); ++i) - { - auto& device = pp->devices[i]; - for (int j = 0; j < config_t::prepost_recv_num; ++j) - { - LCI_comp_t completion = - device.completion_manager_p->recv_new - ->alloc_completion(); - LCI_recvmn(device.endpoint_new, LCI_RANK_ANY, 0, - completion, reinterpret_cast(i)); - device.completion_manager_p->recv_new - ->enqueue_completion(completion); - } - } + HPX_ASSERT(false); + // for (std::size_t i = 0; i < pp_->devices.size(); ++i) + // { + // auto& device = pp->devices[i]; + // for (int j = 0; j < config_t::prepost_recv_num; ++j) + // { + // ::lci::comp_t completion = + // device.completion_manager_p->recv_new + // ->alloc_completion(); + // ::lci::post_recv(device.endpoint_new, LCI_RANK_ANY, 0, + // completion, reinterpret_cast(i)); + // device.completion_manager_p->recv_new + // ->enqueue_completion(completion); + // } + // } } } diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp index e18b1419b4d6..13555e806acf 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp @@ -34,8 +34,6 @@ namespace hpx::parcelset::policies::lci { postprocess_handler_type&& parcel_postprocess); return_t send_nb(); void done(); - bool tryMerge( - const std::shared_ptr& other_base); private: enum class connection_state @@ -58,20 +56,19 @@ namespace hpx::parcelset::policies::lci { // related information about this connection hpx::chrono::high_resolution_timer timer_; header header_; - LCI_mbuffer_t header_buffer; - std::vector header_buffer_vector; + void* header_buffer; + size_t header_buffer_size; bool need_send_data; bool need_send_tchunks; - LCI_tag_t tag; - LCI_tag_t original_tag; + ::lci::tag_t tag; + ::lci::tag_t original_tag; std::shared_ptr* sharedPtr_p; // temporary data - LCI_comp_t completion; - LCI_segment_t segment_to_use, segment_used; + ::lci::comp_t completion; // for profiling LCT_time_t conn_start_time; - static std::atomic next_tag; + static std::atomic<::lci::tag_t> next_tag; }; } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_sendrecv.hpp index e0a6ec156fb7..891c6bb63ca0 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_sendrecv.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_sendrecv.hpp @@ -37,7 +37,6 @@ #include #include -#include #include #include #include diff --git a/libs/full/parcelport_lci/src/backlog_queue.cpp b/libs/full/parcelport_lci/src/backlog_queue.cpp deleted file mode 100644 index 0533d08ff148..000000000000 --- a/libs/full/parcelport_lci/src/backlog_queue.cpp +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright (c) 2023-2024 Jiakun Yan -// Copyright (c) 2007-2013 Hartmut Kaiser -// Copyright (c) 2014-2015 Thomas Heller -// Copyright (c) 2020 Google -// -// SPDX-License-Identifier: BSL-1.0 -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) - -#include - -#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace hpx::parcelset::policies::lci::backlog_queue { - thread_local backlog_queue_t tls_backlog_queue; - - void push(message_ptr message) - { - if (tls_backlog_queue.messages.size() <= (size_t) message->dst_rank) - { - tls_backlog_queue.messages.resize(message->dst_rank + 1); - } - auto& message_queue = tls_backlog_queue.messages[message->dst_rank]; - if (!message_queue.empty()) - { - bool succeed = message_queue.back()->tryMerge(message); - if (succeed) - { - message->done(); - return; - } - } - tls_backlog_queue.messages[message->dst_rank].push_back( - HPX_MOVE(message)); - } - - bool empty(int dst_rank) - { - if (tls_backlog_queue.messages.size() <= (size_t) dst_rank) - { - tls_backlog_queue.messages.resize(dst_rank + 1); - } - bool ret = tls_backlog_queue.messages[dst_rank].empty(); - return ret; - } - - bool background_work( - completion_manager_base* completion_manager, size_t num_thread) noexcept - { - bool did_some_work = false; - for (size_t i = 0; i < tls_backlog_queue.messages.size(); ++i) - { - size_t idx = (num_thread + i) % tls_backlog_queue.messages.size(); - while (idx < tls_backlog_queue.messages.size() && - !tls_backlog_queue.messages[idx].empty()) - { - message_ptr message = tls_backlog_queue.messages[idx].front(); - auto ret = message->send_nb(); - if (ret.status == sender_connection_base::return_status_t::done) - { - tls_backlog_queue.messages[idx].pop_front(); - message->done(); - did_some_work = true; - } - else if (ret.status == - sender_connection_base::return_status_t::wait) - { - tls_backlog_queue.messages[idx].pop_front(); - did_some_work = true; - completion_manager->enqueue_completion(ret.completion); - } - else - { - HPX_ASSERT(ret.status == - sender_connection_base::return_status_t::retry); - break; - } - } - } - return did_some_work; - } -} // namespace hpx::parcelset::policies::lci::backlog_queue - -#endif diff --git a/libs/full/parcelport_lci/src/completion_manager/completion_manager_queue.cpp b/libs/full/parcelport_lci/src/completion_manager/completion_manager_queue.cpp index 76342f230b1d..4aa86a13a982 100644 --- a/libs/full/parcelport_lci/src/completion_manager/completion_manager_queue.cpp +++ b/libs/full/parcelport_lci/src/completion_manager/completion_manager_queue.cpp @@ -8,14 +8,12 @@ #include namespace hpx::parcelset::policies::lci { - LCI_request_t completion_manager_queue::poll() + ::lci::status_t completion_manager_queue::poll() { - LCI_request_t request; - request.flag = LCI_ERR_RETRY; - LCI_queue_pop(queue, &request); - if (request.flag == LCI_ERR_RETRY) + ::lci::status_t status = ::lci::cq_pop(queue); + if (status.is_retry()) if (config_t::progress_type == config_t::progress_type_t::poll) pp_->do_progress_local(); - return request; + return status; } } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync.cpp b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync.cpp index cf13e0acdcc0..978fe8f1470c 100644 --- a/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync.cpp +++ b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync.cpp @@ -9,12 +9,11 @@ #include namespace hpx::parcelset::policies::lci { - LCI_request_t completion_manager_sync::poll() + ::lci::status_t completion_manager_sync::poll() { - LCI_request_t request; - request.flag = LCI_ERR_RETRY; + ::lci::status_t status; - LCI_comp_t sync = nullptr; + ::lci::comp_t sync; { std::unique_lock l(lock, std::try_to_lock); if (l.owns_lock() && !sync_list.empty()) @@ -23,13 +22,12 @@ namespace hpx::parcelset::policies::lci { sync_list.pop_front(); } } - if (sync) + if (!sync.is_empty()) { - LCI_error_t ret = LCI_sync_test(sync, &request); - if (ret == LCI_OK) + ::lci::sync_test(sync, &status); + if (status.is_done()) { - HPX_ASSERT(request.flag == LCI_OK); - LCI_sync_free(&sync); + ::lci::free_comp(&sync); } else { @@ -39,6 +37,6 @@ namespace hpx::parcelset::policies::lci { sync_list.push_back(sync); } } - return request; + return status; } } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single.cpp b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single.cpp index 7a88b9b9e82e..a709c49dc3aa 100644 --- a/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single.cpp +++ b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single.cpp @@ -8,22 +8,21 @@ #include namespace hpx::parcelset::policies::lci { - LCI_request_t completion_manager_sync_single::poll() + ::lci::status_t completion_manager_sync_single::poll() { - LCI_request_t request; - request.flag = LCI_ERR_RETRY; + ::lci::status_t status; bool succeed = lock.try_lock(); if (succeed) { - LCI_error_t ret = LCI_sync_test(sync, &request); - if (ret == LCI_ERR_RETRY) + ::lci::sync_test(sync, &status); + if (status.is_retry()) { if (config_t::progress_type == config_t::progress_type_t::poll) pp_->do_progress_local(); lock.unlock(); } } - return request; + return status; } } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single_nolock.cpp b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single_nolock.cpp index 697340240ab9..b89f25e1b509 100644 --- a/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single_nolock.cpp +++ b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single_nolock.cpp @@ -8,15 +8,14 @@ #include namespace hpx::parcelset::policies::lci { - LCI_request_t completion_manager_sync_single_nolock::poll() + ::lci::status_t completion_manager_sync_single_nolock::poll() { - LCI_request_t request; - request.flag = LCI_ERR_RETRY; + ::lci::status_t status; - LCI_sync_test(sync, &request); - if (request.flag == LCI_ERR_RETRY) + ::lci::sync_test(sync, &status); + if (status.is_retry()) if (config_t::progress_type == config_t::progress_type_t::poll) pp_->do_progress_local(); - return request; + return status; } } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/src/config.cpp b/libs/full/parcelport_lci/src/config.cpp index d9164aa8eb4f..b3ca0cb458d8 100644 --- a/libs/full/parcelport_lci/src/config.cpp +++ b/libs/full/parcelport_lci/src/config.cpp @@ -19,14 +19,13 @@ namespace hpx::parcelset::policies::lci { bool config_t::is_initialized = false; bool config_t::enable_send_immediate; - bool config_t::enable_lci_backlog_queue; config_t::protocol_t config_t::protocol; config_t::comp_type_t config_t::completion_type_header; config_t::comp_type_t config_t::completion_type_followup; config_t::progress_type_t config_t::progress_type; + config_t::progress_strategy_t config_t::progress_strategy; int config_t::progress_thread_num; int config_t::prepost_recv_num; - bool config_t::reg_mem; int config_t::ndevices; int config_t::ncomps; bool config_t::enable_in_buffer_assembly; @@ -34,7 +33,6 @@ namespace hpx::parcelset::policies::lci { int config_t::mbuffer_alloc_max_retry; int config_t::bg_work_max_count; bool config_t::bg_work_when_send; - bool config_t::enable_sendmc; void config_t::init_config(util::runtime_configuration const& rtcfg) { @@ -44,16 +42,10 @@ namespace hpx::parcelset::policies::lci { // The default value here does not matter here enable_send_immediate = util::get_entry_as( rtcfg, "hpx.parcel.lci.sendimm", false /* Does not matter*/); - enable_lci_backlog_queue = util::get_entry_as( - rtcfg, "hpx.parcel.lci.backlog_queue", false /* Does not matter*/); // set protocol to use std::string protocol_str = util::get_entry_as( rtcfg, "hpx.parcel.lci.protocol", ""); - if (protocol_str == "putva") - { - protocol = protocol_t::putva; - } - else if (protocol_str == "putsendrecv") + if (protocol_str == "putsendrecv") { protocol = protocol_t::putsendrecv; } @@ -142,12 +134,30 @@ namespace hpx::parcelset::policies::lci { throw std::runtime_error( "Unknown progress type " + progress_type_str); } + // set the progress strategy + std::string progress_strategy_str = util::get_entry_as( + rtcfg, "hpx.parcel.lci.progress_strategy", ""); + if (progress_strategy_str == "local") + { + progress_strategy = progress_strategy_t::local; + } + else if (progress_strategy_str == "global") + { + progress_strategy = progress_strategy_t::global; + } + else if (progress_strategy_str == "random") + { + progress_strategy = progress_strategy_t::random; + } + else + { + throw std::runtime_error( + "Unknown progress strategy " + progress_strategy_str); + } progress_thread_num = util::get_entry_as( rtcfg, "hpx.parcel.lci.prg_thread_num", -1 /* Does not matter*/); prepost_recv_num = util::get_entry_as( rtcfg, "hpx.parcel.lci.prepost_recv_num", 1 /* Does not matter*/); - reg_mem = util::get_entry_as( - rtcfg, "hpx.parcel.lci.reg_mem", 1 /* Does not matter*/); ndevices = util::get_entry_as( rtcfg, "hpx.parcel.lci.ndevices", 1 /* Does not matter*/); ncomps = util::get_entry_as( @@ -162,15 +172,7 @@ namespace hpx::parcelset::policies::lci { rtcfg, "hpx.parcel.lci.bg_work_max_count", 0 /* Does not matter*/); bg_work_when_send = util::get_entry_as( rtcfg, "hpx.parcel.lci.bg_work_when_send", 0 /* Does not matter*/); - enable_sendmc = util::get_entry_as( - rtcfg, "hpx.parcel.lci.enable_sendmc", 0 /* Does not matter*/); - if (!enable_send_immediate && enable_lci_backlog_queue) - { - enable_lci_backlog_queue = false; - fprintf( - stderr, "WARNING: set enable_lci_backlog_queue to false!\n"); - } std::size_t num_threads = util::get_entry_as(rtcfg, "hpx.os_threads", 1); if (progress_type == progress_type_t::rp && num_threads <= 1) @@ -178,15 +180,6 @@ namespace hpx::parcelset::policies::lci { progress_type = progress_type_t::pthread; fprintf(stderr, "WARNING: set progress_type to pthread!\n"); } -#ifndef LCI_ENABLE_MULTITHREAD_PROGRESS - if (progress_type == progress_type_t::worker || - progress_thread_num > ndevices) - { - fprintf(stderr, - "WARNING: Thread-safe LCI_progress is needed " - "but not enabled during compilation!\n"); - } -#endif if (ncomps > ndevices) { int old_ncomps = ncomps; diff --git a/libs/full/parcelport_lci/src/parcelport_lci.cpp b/libs/full/parcelport_lci/src/parcelport_lci.cpp index b9d1bcf293e9..87d6891175ba 100644 --- a/libs/full/parcelport_lci/src/parcelport_lci.cpp +++ b/libs/full/parcelport_lci/src/parcelport_lci.cpp @@ -15,15 +15,12 @@ #include #include -#include #include #include #include #include #include #include -#include -#include #include #include #include @@ -94,6 +91,7 @@ namespace hpx::parcelset::policies::lci { // Start the handling of connections. bool parcelport::do_run() { + HPX_ASSERT(util::lci_environment::enabled()); receiver_p->run(); sender_p->run(); for (std::size_t i = 0; i != io_service_pool_.size(); ++i) @@ -237,13 +235,6 @@ namespace hpx::parcelset::policies::lci { if (mode & parcelport_background_mode::send) { has_work = sender_p->background_work(num_thread) || has_work; - if (config_t::enable_lci_backlog_queue) - // try to send pending messages - has_work = - backlog_queue::background_work( - get_tls_device().completion_manager_p->send.get(), - num_thread) || - has_work; } return has_work; } @@ -321,7 +312,26 @@ namespace hpx::parcelset::policies::lci { { HPX_UNUSED(rtcfg); + auto attr = ::lci::get_g_default_attr(); + // We will make sure the total packet number is always + // at least twice as large as the total preposted receives. + // We also set a minimum of 1024 preposted receives per device + // and increase the total number of packets if needed. + if (attr.net_max_recvs * config_t::ndevices > attr.npackets / 2) + { + attr.net_max_recvs = attr.npackets / 2 / config_t::ndevices; + if (attr.net_max_recvs < 1024) + { + attr.net_max_recvs = 1024; + attr.npackets = 1024 * config_t::ndevices * 2; + } + } + ::lci::set_g_default_attr(attr); + ::lci::g_runtime_init(); + // Create completion managers + bool zero_copy_am = + config_t::protocol == config_t::protocol_t::putsendrecv; completion_managers.resize(config_t::ncomps); for (auto& completion_manager : completion_managers) { @@ -329,7 +339,8 @@ namespace hpx::parcelset::policies::lci { { case config_t::comp_type_t::queue: completion_manager.recv_new = - std::make_shared(this); + std::make_shared( + this, zero_copy_am); break; case config_t::comp_type_t::sync: completion_manager.recv_new = @@ -366,6 +377,18 @@ namespace hpx::parcelset::policies::lci { } } + // Register rcomp if needed + if (config_t::protocol == config_t::protocol_t::putsendrecv) + { + for (auto& completion_manager : completion_managers) + { + ::lci::comp_t comp = + completion_manager.recv_new->get_completion_object(); + ::lci::rcomp_t rcomp = ::lci::register_rcomp(comp); + completion_manager.recv_new_rcomp = rcomp; + } + } + // Create device devices.resize(config_t::ndevices); for (int i = 0; i < config_t::ndevices; ++i) @@ -373,58 +396,9 @@ namespace hpx::parcelset::policies::lci { auto& device = devices[i]; // Create the LCI device device.idx = i; - if (i == 0) - { - device.device = LCI_UR_DEVICE; - } - else - { - LCI_device_init(&device.device); - } + device.device = ::lci::alloc_device(); int comp_idx = i * config_t::ncomps / config_t::ndevices; device.completion_manager_p = &completion_managers[comp_idx]; - // Create the LCI endpoint - LCI_plist_t plist_; - LCI_plist_create(&plist_); - switch (config_t::completion_type_followup) - { - case config_t::comp_type_t::queue: - LCI_plist_set_comp_type( - plist_, LCI_PORT_COMMAND, LCI_COMPLETION_QUEUE); - LCI_plist_set_comp_type( - plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_QUEUE); - break; - case config_t::comp_type_t::sync: - LCI_plist_set_comp_type( - plist_, LCI_PORT_COMMAND, LCI_COMPLETION_SYNC); - LCI_plist_set_comp_type( - plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_SYNC); - break; - default: - throw std::runtime_error("Unknown completion type!"); - } - LCI_endpoint_init(&device.endpoint_followup, device.device, plist_); - LCI_plist_set_default_comp(plist_, - device.completion_manager_p->recv_new->get_completion_object()); - switch (config_t::completion_type_header) - { - case config_t::comp_type_t::queue: - LCI_plist_set_comp_type( - plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_QUEUE); - break; - case config_t::comp_type_t::sync: - case config_t::comp_type_t::sync_single: - case config_t::comp_type_t::sync_single_nolock: - LCI_plist_set_comp_type( - plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_SYNC); - break; - default: - throw std::runtime_error("Unknown completion type!"); - } - if (config_t::protocol == config_t::protocol_t::sendrecv) - LCI_plist_set_match_type(plist_, LCI_MATCH_TAG); - LCI_endpoint_init(&device.endpoint_new, device.device, plist_); - LCI_plist_free(&plist_); } // Create progress threads @@ -437,10 +411,6 @@ namespace hpx::parcelset::policies::lci { // Create the sender and receiver switch (config_t::protocol) { - case config_t::protocol_t::putva: - sender_p = std::make_shared(this); - receiver_p = std::make_shared(this); - break; case config_t::protocol_t::sendrecv: case config_t::protocol_t::putsendrecv: sender_p = std::make_shared(this); @@ -457,13 +427,9 @@ namespace hpx::parcelset::policies::lci { // Free devices for (auto& device : devices) { - LCI_endpoint_free(&device.endpoint_followup); - LCI_endpoint_free(&device.endpoint_new); - if (device.device != LCI_UR_DEVICE) - { - LCI_device_free(&device.device); - } + ::lci::free_device(&device.device); } + ::lci::g_runtime_fina(); } void parcelport::join_prg_thread_if_running() @@ -482,12 +448,38 @@ namespace hpx::parcelset::policies::lci { bool parcelport::do_progress_local() { bool ret = false; - auto device = get_tls_device(); - ret = util::lci_environment::do_progress(device.device) || ret; + switch (config_t::progress_strategy) + { + case config_t::progress_strategy_t::local: + { + auto device = get_tls_device(); + ret = util::lci_environment::do_progress(device.device) || ret; + break; + } + case config_t::progress_strategy_t::global: + { + std::size_t start_idx = get_tls_device_idx(); + for (std::size_t i = 0; i < devices.size(); ++i) + { + auto& device = devices[(start_idx + i) % devices.size()]; + ret = util::lci_environment::do_progress(device.device) || ret; + } + break; + } + case config_t::progress_strategy_t::random: + { + static thread_local unsigned int tls_rand_seed = rand(); + auto device = devices[rand_r(&tls_rand_seed) % devices.size()]; + ret = util::lci_environment::do_progress(device.device) || ret; + break; + } + default: + throw std::runtime_error("Unknown progress strategy"); + } return ret; } - parcelport::device_t& parcelport::get_tls_device() + std::size_t parcelport::get_tls_device_idx() { static thread_local std::size_t tls_device_idx = -1; @@ -495,7 +487,7 @@ namespace hpx::parcelset::policies::lci { hpx::threads::get_self_id() == hpx::threads::invalid_thread_id)) { static thread_local unsigned int tls_rand_seed = rand(); - return devices[rand_r(&tls_rand_seed) % devices.size()]; + return rand_r(&tls_rand_seed) % devices.size(); } if (tls_device_idx == std::size_t(-1)) { @@ -514,10 +506,16 @@ namespace hpx::parcelset::policies::lci { tls_device_idx = num_thread / nthreads_per_device; util::lci_environment::log( util::lci_environment::log_level_t::debug, "device", - "Rank %d thread %lu/%lu gets device %lu\n", LCI_RANK, - num_thread, total_thread_num, tls_device_idx); + "Rank %d thread %lu/%lu gets device %lu\n", + util::lci_environment::rank(), num_thread, total_thread_num, + tls_device_idx); } - return devices[tls_device_idx]; + return tls_device_idx; + } + + parcelport::device_t& parcelport::get_tls_device() + { + return devices[get_tls_device_idx()]; } } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp b/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp deleted file mode 100644 index 8f05aed10550..000000000000 --- a/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp +++ /dev/null @@ -1,324 +0,0 @@ -// Copyright (c) 2023-2024 Jiakun Yan -// Copyright (c) 2007-2013 Hartmut Kaiser -// Copyright (c) 2014-2015 Thomas Heller -// Copyright (c) 2020 Google -// -// SPDX-License-Identifier: BSL-1.0 -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) - -#include - -#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) - -#include - -#include -#include -#include -#include -#include -#include -#include "hpx/parcelport_lci/putva/sender_connection_putva.hpp" - -#include -#include -#include - -namespace hpx::parcelset::policies::lci { - - bool sender_connection_putva::can_be_eager_message(size_t eager_threshold) - { - int num_zero_copy_chunks = static_cast(buffer_.num_chunks_.first); - if (num_zero_copy_chunks > 0) - // if there are non-zero-copy chunks, we have to use iovec - return false; - size_t header_size = sizeof(header::header_format_t); - size_t data_size = buffer_.data_.size(); - size_t tchunk_size = buffer_.transmission_chunks_.size() * - sizeof(parcel_buffer_type::transmission_chunk_type); - if (header_size + data_size + tchunk_size <= eager_threshold) - return true; - else - return false; - } - - void sender_connection_putva::load( - sender_connection_putva::handler_type&& handler, - sender_connection_putva::postprocess_handler_type&& parcel_postprocess) - { -#if defined(HPX_HAVE_PARCELPORT_COUNTERS) - data_point_ = buffer_.data_point_; - data_point_.time_ = hpx::chrono::high_resolution_clock::now(); -#endif - conn_start_time = util::lci_environment::pcounter_now(); - HPX_ASSERT(!handler_); - HPX_ASSERT(!postprocess_handler_); - HPX_ASSERT(!buffer_.data_.empty()); - handler_ = HPX_MOVE(handler); - postprocess_handler_ = HPX_MOVE(parcel_postprocess); - - // build header - header header_; - is_eager = can_be_eager_message(LCI_MEDIUM_SIZE); - int num_zero_copy_chunks = static_cast(buffer_.num_chunks_.first); - if (is_eager) - { - int retry_count = 0; - while (LCI_mbuffer_alloc(device_p->device, &mbuffer) != LCI_OK) - yield_k(retry_count, config_t::mbuffer_alloc_max_retry); - HPX_ASSERT(mbuffer.length == (size_t) LCI_MEDIUM_SIZE); - header_ = header(buffer_, (char*) mbuffer.address, mbuffer.length); - mbuffer.length = header_.size(); - cleanup(); - } - else - { - size_t max_header_size = - LCI_get_iovec_piggy_back_size(num_zero_copy_chunks + 2); - char* header_buffer = (char*) malloc(max_header_size); - header_ = header(buffer_, header_buffer, max_header_size); - - // calculate the exact number of long messages to send - int long_msg_num = num_zero_copy_chunks; - if (!header_.piggy_back_data()) - ++long_msg_num; - // transmission chunks - if (num_zero_copy_chunks != 0 && !header_.piggy_back_tchunk()) - ++long_msg_num; - - // initialize iovec - iovec = LCI_iovec_t(); - iovec.piggy_back.address = header_.data(); - iovec.piggy_back.length = header_.size(); - iovec.count = long_msg_num; - int i = 0; - iovec.lbuffers = - (LCI_lbuffer_t*) malloc(iovec.count * sizeof(LCI_lbuffer_t)); - if (!header_.piggy_back_data()) - { - // data (non-zero-copy chunks) - iovec.lbuffers[i].address = buffer_.data_.data(); - iovec.lbuffers[i].length = buffer_.data_.size(); - if (config_t::reg_mem) - { - LCI_memory_register(device_p->device, - iovec.lbuffers[i].address, iovec.lbuffers[i].length, - &iovec.lbuffers[i].segment); - } - else - { - iovec.lbuffers[i].segment = LCI_SEGMENT_ALL; - } - ++i; - } - if (num_zero_copy_chunks != 0) - { - // transmission chunk - if (!header_.piggy_back_tchunk()) - { - std::vector< - typename parcel_buffer_type::transmission_chunk_type>& - tchunks = buffer_.transmission_chunks_; - size_t tchunks_length = tchunks.size() * - sizeof(parcel_buffer_type::transmission_chunk_type); - iovec.lbuffers[i].address = tchunks.data(); - iovec.lbuffers[i].length = tchunks_length; - if (config_t::reg_mem) - { - LCI_memory_register(device_p->device, - iovec.lbuffers[i].address, iovec.lbuffers[i].length, - &iovec.lbuffers[i].segment); - } - else - { - iovec.lbuffers[i].segment = LCI_SEGMENT_ALL; - } - ++i; - } - // zero-copy chunks - for (int j = 0; j < (int) buffer_.chunks_.size(); ++j) - { - serialization::serialization_chunk& c = buffer_.chunks_[j]; - if (c.type_ == - serialization::chunk_type::chunk_type_pointer || - c.type_ == - serialization::chunk_type::chunk_type_const_pointer) - { - HPX_ASSERT(long_msg_num > i); - iovec.lbuffers[i].address = - const_cast(c.data_.cpos_); - iovec.lbuffers[i].length = c.size_; - if (config_t::reg_mem) - { - LCI_memory_register(device_p->device, - iovec.lbuffers[i].address, - iovec.lbuffers[i].length, - &iovec.lbuffers[i].segment); - } - else - { - iovec.lbuffers[i].segment = LCI_SEGMENT_ALL; - } - ++i; - } - } - } - HPX_ASSERT(long_msg_num == i); - sharedPtr_p = new std::shared_ptr( - std::dynamic_pointer_cast( - shared_from_this())); - } - profile_start_hook(header_); - state.store(connection_state::initialized, std::memory_order_release); - } - - bool sender_connection_putva::isEager() - { - return is_eager; - } - - sender_connection_putva::return_t sender_connection_putva::send_nb() - { - switch (state.load(std::memory_order_acquire)) - { - case connection_state::initialized: - return send_msg(); - - case connection_state::sent: - return {return_status_t::done, nullptr}; - - case connection_state::locked: - return {return_status_t::retry, nullptr}; - - default: - throw std::runtime_error("Unexpected send state!"); - } - } - - sender_connection_putva::return_t sender_connection_putva::send_msg() - { - const auto current_state = connection_state::initialized; - const auto next_state = connection_state::sent; - HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); - - int ret; - if (is_eager) - { - ret = LCI_putmna(device_p->endpoint_new, mbuffer, dst_rank, 0, - LCI_DEFAULT_COMP_REMOTE); - if (ret == LCI_OK) - { - state.store(next_state, std::memory_order_release); - return {return_status_t::done, nullptr}; - } - } - else - { - void* buffer_to_free = iovec.piggy_back.address; - LCI_comp_t completion = - device_p->completion_manager_p->send->alloc_completion(); - // In order to keep the send_connection object from being - // deallocated. We have to allocate a shared_ptr in the heap - // and pass a pointer to shared_ptr to LCI. - // We will get this pointer back via the send completion queue - // after this send completes. - state.store(connection_state::locked, std::memory_order_relaxed); - ret = LCI_putva(device_p->endpoint_new, iovec, completion, dst_rank, - 0, LCI_DEFAULT_COMP_REMOTE, sharedPtr_p); - // After this point, if ret == OK, this object can be shared by - // two threads (the sending thread and the thread polling the - // completion queue). Care must be taken to avoid data race. - if (ret == LCI_OK) - { - free(buffer_to_free); - state.store(next_state, std::memory_order_release); - return {return_status_t::wait, completion}; - } - else - { - state.store(current_state, std::memory_order_release); - } - } - return {return_status_t::retry, nullptr}; - } - - void sender_connection_putva::cleanup() - { - if (!is_eager) - { - HPX_ASSERT(iovec.count > 0); - for (int i = 0; i < iovec.count; ++i) - { - if (iovec.lbuffers[i].segment != LCI_SEGMENT_ALL) - { - LCI_memory_deregister(&iovec.lbuffers[i].segment); - } - } - free(iovec.lbuffers); - } - error_code ec; - handler_(ec); - handler_.reset(); - buffer_.clear(); - } - - void sender_connection_putva::done() - { - profile_end_hook(); - if (!is_eager) - { - cleanup(); - } -#if defined(HPX_HAVE_PARCELPORT_COUNTERS) - data_point_.time_ = - hpx::chrono::high_resolution_clock::now() - data_point_.time_; - pp_->add_sent_data(data_point_); -#endif - util::lci_environment::pcounter_add( - util::lci_environment::send_conn_timer, - util::lci_environment::pcounter_since(conn_start_time)); - - if (postprocess_handler_) - { - // Return this connection to the connection cache. - // After postprocess_handler is invoked, this connection can be - // obtained by another thread. - // so make sure to call this at the very end. - hpx::move_only_function)> - postprocess_handler; - std::swap(postprocess_handler, postprocess_handler_); - error_code ec2; - postprocess_handler(ec2, there_, shared_from_this()); - } - } - - bool sender_connection_putva::tryMerge( - const std::shared_ptr& other_base) - { - std::shared_ptr other = - std::dynamic_pointer_cast(other_base); - HPX_ASSERT(other); - if (!isEager() || !other->isEager()) - { - // we can only merge eager messages - return false; - } - if (mbuffer.length + other->mbuffer.length > (size_t) LCI_MEDIUM_SIZE) - { - // The sum of two messages are too large - return false; - } - // can merge - memcpy((char*) mbuffer.address + mbuffer.length, other->mbuffer.address, - other->mbuffer.length); - mbuffer.length += other->mbuffer.length; - LCI_mbuffer_free(other->mbuffer); - // merged_connections.push_back(other); - return true; - } -} // namespace hpx::parcelset::policies::lci - -#endif diff --git a/libs/full/parcelport_lci/src/putva/sender_putva.cpp b/libs/full/parcelport_lci/src/putva/sender_putva.cpp deleted file mode 100644 index b95a641526d4..000000000000 --- a/libs/full/parcelport_lci/src/putva/sender_putva.cpp +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) 2023-2024 Jiakun Yan -// Copyright (c) 2007-2013 Hartmut Kaiser -// Copyright (c) 2014-2015 Thomas Heller -// Copyright (c) 2020 Google -// -// SPDX-License-Identifier: BSL-1.0 -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) - -#include - -#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) - -#include -#include -#include -#include -#include "hpx/parcelport_lci/putva/sender_connection_putva.hpp" - -#include -#include - -namespace hpx::parcelset::policies::lci { - sender_putva::connection_ptr sender_putva::create_connection( - int dest, parcelset::parcelport* pp) - { - return std::make_shared(dest, pp); - } -} // namespace hpx::parcelset::policies::lci - -#endif diff --git a/libs/full/parcelport_lci/src/sender_base.cpp b/libs/full/parcelport_lci/src/sender_base.cpp index d6ea05c5805b..97d612341682 100644 --- a/libs/full/parcelport_lci/src/sender_base.cpp +++ b/libs/full/parcelport_lci/src/sender_base.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include @@ -33,21 +32,20 @@ namespace hpx::parcelset::policies::lci { bool did_some_work = false; auto poll_comp_start = util::lci_environment::pcounter_now(); auto completion_manager_p = pp_->get_tls_device().completion_manager_p; - LCI_request_t request = completion_manager_p->send->poll(); + ::lci::status_t status = completion_manager_p->send->poll(); util::lci_environment::pcounter_add(util::lci_environment::poll_comp, util::lci_environment::pcounter_since(poll_comp_start)); - if (request.flag == LCI_OK) + if (status.is_done()) { auto useful_bg_start = util::lci_environment::pcounter_now(); did_some_work = true; - auto* sharedPtr_p = (connection_ptr*) request.user_context; + auto* sharedPtr_p = (connection_ptr*) status.get_user_context(); HPX_ASSERT(sharedPtr_p->get()); sender_connection_base::return_t ret = (*sharedPtr_p)->send(true); if (ret.status == sender_connection_base::return_status_t::done) { (*sharedPtr_p)->done(); - delete sharedPtr_p; } else if (ret.status == sender_connection_base::return_status_t::wait) diff --git a/libs/full/parcelport_lci/src/sender_connection_base.cpp b/libs/full/parcelport_lci/src/sender_connection_base.cpp index 32f45de0fa92..f62297baab0f 100644 --- a/libs/full/parcelport_lci/src/sender_connection_base.cpp +++ b/libs/full/parcelport_lci/src/sender_connection_base.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include @@ -55,57 +54,35 @@ namespace hpx::parcelset::policies::lci { sender_connection_base::return_t sender_connection_base::send( bool in_bg_work) { - // FIXME: set it properly in the future - // if (HPX_LIKELY(pp_->is_initialized)) - // in_bg_work = false; auto start_time = util::lci_environment::pcounter_now(); return_t ret; - if (!config_t::enable_lci_backlog_queue || - HPX_UNLIKELY(!pp_->is_initialized)) + int retry_count = 0; + do { - // If we are sending early parcels, we should not expect the thread - // make progress on the backlog queue. - int retry_count = 0; - do + ret = send_nb(); + if (ret.status == return_status_t::retry) { - ret = send_nb(); - if (ret.status == return_status_t::retry) + if (config_t::bg_work_when_send) { - if (config_t::bg_work_when_send) - { - pp_->do_background_work(0, - in_bg_work ? parcelport_background_mode::receive : - parcelport_background_mode::all); - } - else if (config_t::progress_type == - config_t::progress_type_t::worker || - config_t::progress_type == - config_t::progress_type_t::pthread_worker || - config_t::progress_type == - config_t::progress_type_t::poll) - { - pp_->do_progress_local(); - } - yield_k(retry_count, config_t::send_nb_max_retry); + pp_->do_background_work(0, + in_bg_work ? parcelport_background_mode::receive : + parcelport_background_mode::all); } - } while (ret.status == return_status_t::retry); - } - else - { - if (!backlog_queue::empty(dst_rank)) - { - backlog_queue::push(shared_from_this()); - ret = {return_status_t::retry, nullptr}; - } - else - { - ret = send_nb(); - if (ret.status == return_status_t::retry) + else if (config_t::progress_type == + config_t::progress_type_t::worker || + config_t::progress_type == + config_t::progress_type_t::pthread_worker || + config_t::progress_type == config_t::progress_type_t::poll) { - backlog_queue::push(shared_from_this()); + // We will just make progress on this device + // instead of progress_local that can be affected + // by the progress strategy + while (util::lci_environment::do_progress(device_p->device)) + continue; } + yield_k(retry_count, config_t::send_nb_max_retry); } - } + } while (ret.status == return_status_t::retry); if (config_t::bg_work_when_send) pp_->do_background_work(0, in_bg_work ? parcelport_background_mode::receive : @@ -125,7 +102,8 @@ namespace hpx::parcelset::policies::lci { char buf[1024]; size_t consumed = 0; consumed += snprintf(buf + consumed, sizeof(buf) - consumed, - "%d:%lf:send_connection(%p) start:%d:%lu:%d:%d:[", LCI_RANK, + "%d:%lf:send_connection(%p) start:%d:%lu:%d:%d:[", + util::lci_environment::rank(), hpx::chrono::high_resolution_clock::now() / 1e9, (void*) this, dst_rank, header_.numbytes_nonzero_copy(), header_.numbytes_tchunk(), header_.num_zero_copy_chunks()); @@ -148,7 +126,8 @@ namespace hpx::parcelset::policies::lci { void sender_connection_base::profile_end_hook() { util::lci_environment::log(util::lci_environment::log_level_t::profile, - "send", "%d:%lf:send_connection(%p) end\n", LCI_RANK, + "send", "%d:%lf:send_connection(%p) end\n", + util::lci_environment::rank(), hpx::chrono::high_resolution_clock::now() / 1e9, (void*) this); util::lci_environment::pcounter_add( util::lci_environment::send_conn_end, 1); diff --git a/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp index c46be6d9cd19..d16d16720f7c 100644 --- a/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp +++ b/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp @@ -43,7 +43,7 @@ namespace hpx::parcelset::policies::lci { static_cast(header_.numbytes()); timer_.restart(); #endif - device_p = &pp_->devices[header_.get_device_idx()]; + device_p = &pp_->get_tls_device(); tag = header_.get_tag(); // decode data buffer.data_.allocate(header_.numbytes_nonzero_copy()); @@ -99,11 +99,10 @@ namespace hpx::parcelset::policies::lci { recv_chunks_idx = 0; recv_zero_copy_chunks_idx = 0; original_tag = tag; - segment_used = LCI_SEGMENT_ALL; state.store(connection_state::initialized, std::memory_order_release); util::lci_environment::log(util::lci_environment::log_level_t::debug, - "recv", "recv connection (%d, %d, %d) start!\n", dst_rank, LCI_RANK, - tag); + "recv", "recv connection (%d, %d, %d) start!\n", dst_rank, + util::lci_environment::rank(), tag); } receiver_connection_sendrecv::return_t @@ -134,57 +133,22 @@ namespace hpx::parcelset::policies::lci { } } - LCI_comp_t receiver_connection_sendrecv::unified_recv( - void* address, size_t length) + receiver_connection_sendrecv::return_t + receiver_connection_sendrecv::unified_recv(void* address, size_t length) { - LCI_comp_t completion = + ::lci::comp_t completion = device_p->completion_manager_p->recv_followup->alloc_completion(); - if (length <= (size_t) LCI_MEDIUM_SIZE) - { - LCI_mbuffer_t mbuffer; - mbuffer.address = address; - mbuffer.length = length; - LCI_error_t ret = LCI_recvm(device_p->endpoint_followup, mbuffer, - dst_rank, tag, completion, sharedPtr_p); - HPX_ASSERT(ret == LCI_OK); - HPX_UNUSED(ret); - util::lci_environment::log( - util::lci_environment::log_level_t::debug, "recv", - "recvm (%d, %d, %d) device %d tag %d size %d\n", dst_rank, - LCI_RANK, original_tag, device_p->idx, tag, length); - tag = (tag + 1) % LCI_MAX_TAG; - } - else - { - LCI_lbuffer_t lbuffer; - lbuffer.address = address; - lbuffer.length = length; - if (config_t::reg_mem) - { - LCI_memory_register(device_p->device, lbuffer.address, - lbuffer.length, &lbuffer.segment); - } - else - { - lbuffer.segment = LCI_SEGMENT_ALL; - } - LCI_error_t ret = LCI_recvl(device_p->endpoint_followup, lbuffer, - dst_rank, tag, completion, sharedPtr_p); - HPX_ASSERT(ret == LCI_OK); - HPX_UNUSED(ret); - util::lci_environment::log( - util::lci_environment::log_level_t::debug, "recv", - "recvl (%d, %d, %d) device %d tag %d size %d\n", dst_rank, - LCI_RANK, original_tag, device_p->idx, tag, length); - tag = (tag + 1) % LCI_MAX_TAG; - if (segment_used != LCI_SEGMENT_ALL) - { - LCI_memory_deregister(&segment_used); - segment_used = LCI_SEGMENT_ALL; - } - segment_used = lbuffer.segment; - } - return completion; + // TODO: optimize allow_done + auto status = + ::lci::post_recv_x(dst_rank, address, length, tag, completion) + .user_context(sharedPtr_p) + .device(device_p->device)(); + util::lci_environment::log(util::lci_environment::log_level_t::debug, + "recv", "recvm (%d, %d, %d) device %d tag %d size %d ret %s\n", + dst_rank, util::lci_environment::rank(), original_tag, + device_p->idx, tag, length, status.get_error().get_str()); + tag = (tag + 1) % util::lci_environment::get_max_tag(); + return {status.is_done(), completion}; } receiver_connection_sendrecv::return_t @@ -200,15 +164,16 @@ namespace hpx::parcelset::policies::lci { size_t tchunk_length = tchunks.size() * sizeof(receiver_base::buffer_type::transmission_chunk_type); state.store(connection_state::locked, std::memory_order_relaxed); - LCI_comp_t completion = unified_recv(tchunks.data(), tchunk_length); - state.store(next_state, std::memory_order_release); - return {false, completion}; - } - else - { - state.store(next_state, std::memory_order_release); - return receive_data(); + auto ret = unified_recv(tchunks.data(), tchunk_length); + if (!ret.isDone) + { + state.store(next_state, std::memory_order_release); + return ret; + } } + // either we didn't need to recv tchunks, or the receive was immediately done + state.store(next_state, std::memory_order_release); + return receive_data(); } receiver_connection_sendrecv::return_t @@ -221,16 +186,16 @@ namespace hpx::parcelset::policies::lci { if (need_recv_data) { state.store(connection_state::locked, std::memory_order_relaxed); - LCI_comp_t completion = - unified_recv(buffer.data_.data(), buffer.data_.size()); - state.store(next_state, std::memory_order_release); - return {false, completion}; - } - else - { - state.store(next_state, std::memory_order_release); - return receive_chunks(); + auto ret = unified_recv(buffer.data_.data(), buffer.data_.size()); + if (!ret.isDone) + { + state.store(next_state, std::memory_order_release); + return ret; + } } + // either we didn't need to recv data, or the receive was immediately done + state.store(next_state, std::memory_order_release); + return receive_chunks(); } receiver_connection_sendrecv::return_t @@ -316,9 +281,12 @@ namespace hpx::parcelset::policies::lci { HPX_UNUSED(chunk_size); state.store(connection_state::locked, std::memory_order_relaxed); - LCI_comp_t completion = unified_recv(chunk.data(), chunk.size()); + auto ret = unified_recv(chunk.data(), chunk.size()); state.store(current_state, std::memory_order_release); - return {false, completion}; + if (!ret.isDone) + { + return ret; + } } HPX_ASSERT_MSG(recv_zero_copy_chunks_idx == buffer.num_chunks_.first, "observed: {}, expected {}", recv_zero_copy_chunks_idx, @@ -343,9 +311,12 @@ namespace hpx::parcelset::policies::lci { buffer.chunks_[idx] = serialization::create_pointer_chunk(chunk.data(), chunk.size()); state.store(connection_state::locked, std::memory_order_relaxed); - LCI_comp_t completion = unified_recv(chunk.data(), chunk.size()); + auto ret = unified_recv(chunk.data(), chunk.size()); state.store(current_state, std::memory_order_release); - return {false, completion}; + if (!ret.isDone) + { + return ret; + } } state.store(next_state, std::memory_order_release); return {true, nullptr}; @@ -357,12 +328,8 @@ namespace hpx::parcelset::policies::lci { util::lci_environment::recv_conn_end, 1); util::lci_environment::log(util::lci_environment::log_level_t::debug, "recv", "recv connection (%d, %d, %d, %d) done!\n", dst_rank, - LCI_RANK, original_tag, tag - original_tag + 1); - if (segment_used != LCI_SEGMENT_ALL) - { - LCI_memory_deregister(&segment_used); - segment_used = LCI_SEGMENT_ALL; - } + util::lci_environment::rank(), original_tag, + tag - original_tag + 1); #if defined(HPX_HAVE_PARCELPORT_COUNTERS) buffer.data_point_.time_ = timer_.elapsed_nanoseconds(); #endif @@ -390,6 +357,7 @@ namespace hpx::parcelset::policies::lci { util::lci_environment::pcounter_since(handle_parcels_start_time)); buffer.data_.free(); parcels_.clear(); + delete sharedPtr_p; } } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/src/sendrecv/receiver_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/receiver_sendrecv.cpp index 1d652f5e4597..bd2cc1a0cac4 100644 --- a/libs/full/parcelport_lci/src/sendrecv/receiver_sendrecv.cpp +++ b/libs/full/parcelport_lci/src/sendrecv/receiver_sendrecv.cpp @@ -44,33 +44,37 @@ namespace hpx::parcelset::policies::lci { auto poll_comp_start = util::lci_environment::pcounter_now(); auto completion_manager_p = pp_->get_tls_device().completion_manager_p; - request_wrapper_t request; - request.request = completion_manager_p->recv_new->poll(); + auto status = completion_manager_p->recv_new->poll(); util::lci_environment::pcounter_add(util::lci_environment::poll_comp, util::lci_environment::pcounter_since(poll_comp_start)); - if (request.request.flag == LCI_OK) + if (status.is_done()) { auto useful_bg_start = util::lci_environment::pcounter_now(); if (config_t::protocol == config_t::protocol_t::sendrecv) { - std::size_t device_idx = - (std::size_t) request.request.user_context; - auto& device = pp_->devices[device_idx]; - LCI_comp_t completion = - completion_manager_p->recv_new->alloc_completion(); - LCI_recvmn(device.endpoint_new, LCI_RANK_ANY, 0, completion, - reinterpret_cast(device_idx)); - completion_manager_p->recv_new->enqueue_completion(completion); + HPX_ASSERT(false); + // std::size_t device_idx = + // (std::size_t) status.get_user_context(); + // auto& device = pp_->devices[device_idx]; + // ::lci::comp_t completion = + // completion_manager_p->recv_new->alloc_completion(); + // LCI_recvmn(device.endpoint_new, util::lci_environment::rank()_ANY, + // 0, completion, reinterpret_cast(device_idx)); + // completion_manager_p->recv_new->enqueue_completion(completion); } util::lci_environment::log( util::lci_environment::log_level_t::debug, "recv", - "accept_new (%d, %d, %d) length %lu\n", request.request.rank, - LCI_RANK, request.request.tag, - request.request.data.mbuffer.length); + "accept_new (%d, %d, %d) length %lu\n", status.get_rank(), + util::lci_environment::rank(), status.get_tag(), + status.get_size()); connection_ptr connection = - create_connection(request.request.rank, pp_); - connection->load((char*) request.request.data.mbuffer.address); + create_connection(status.get_rank(), pp_); + connection->load((char*) status.get_buffer()); + if (config_t::protocol == config_t::protocol_t::putsendrecv) + { + ::lci::put_upacket(status.get_buffer()); + } receiver_connection_sendrecv::return_t ret = connection->receive(); if (ret.isDone) { @@ -96,30 +100,25 @@ namespace hpx::parcelset::policies::lci { // should be managed by the connections auto poll_comp_start = util::lci_environment::pcounter_now(); auto completion_manager_p = pp_->get_tls_device().completion_manager_p; - LCI_request_t request = completion_manager_p->recv_followup->poll(); + ::lci::status_t status = completion_manager_p->recv_followup->poll(); util::lci_environment::pcounter_add(util::lci_environment::poll_comp, util::lci_environment::pcounter_since(poll_comp_start)); - if (request.flag == LCI_OK) + if (status.is_done()) { auto useful_bg_start = util::lci_environment::pcounter_now(); - HPX_ASSERT(request.user_context); - auto* sharedPtr_p = (connection_ptr*) request.user_context; - size_t length; - if (request.type == LCI_MEDIUM) - length = request.data.mbuffer.length; - else - length = request.data.lbuffer.length; + HPX_ASSERT(status.get_user_context()); + auto* sharedPtr_p = (connection_ptr*) status.get_user_context(); + size_t length = status.get_size(); util::lci_environment::log( util::lci_environment::log_level_t::debug, "recv", - "followup (%d, %d, %d) length %lu\n", request.rank, LCI_RANK, - request.tag, length); + "followup (%d, %d, %d) length %lu\n", status.get_rank(), + util::lci_environment::rank(), status.get_tag(), length); receiver_connection_sendrecv::return_t ret = (*sharedPtr_p)->receive(); if (ret.isDone) { (*sharedPtr_p)->done(); - delete sharedPtr_p; } else { diff --git a/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp index 8fe70621ad1f..8d518f0ec156 100644 --- a/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp +++ b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp @@ -28,7 +28,7 @@ namespace hpx::parcelset::policies::lci { - std::atomic sender_connection_sendrecv::next_tag = 0; + std::atomic<::lci::tag_t> sender_connection_sendrecv::next_tag = 0; void sender_connection_sendrecv::load( sender_connection_sendrecv::handler_type&& handler, @@ -49,25 +49,23 @@ namespace hpx::parcelset::policies::lci { if (config_t::enable_in_buffer_assembly) { int retry_count = 0; - while ( - LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK) + while ((header_buffer = ::lci::get_upacket()) == nullptr) { if (config_t::bg_work_when_send) pp_->do_background_work(0, parcelport_background_mode::all); yield_k(retry_count, config_t::mbuffer_alloc_max_retry); } - HPX_ASSERT(header_buffer.length == (size_t) LCI_MEDIUM_SIZE); header_ = header( - buffer_, (char*) header_buffer.address, header_buffer.length); - header_buffer.length = header_.size(); + buffer_, (char*) header_buffer, ::lci::get_max_bcopy_size()); + header_buffer_size = header_.size(); } else { - header_buffer_vector.resize( - header::get_header_size(buffer_, LCI_MEDIUM_SIZE)); - header_ = - header(buffer_, static_cast(header_buffer_vector.data()), - header_buffer_vector.size()); + header_buffer_size = + header::get_header_size(buffer_, ::lci::get_max_bcopy_size()); + header_buffer = malloc(header_buffer_size); + header_ = header( + buffer_, static_cast(header_buffer), header_buffer_size); } HPX_ASSERT((header_.num_zero_copy_chunks() == 0) == buffer_.transmission_chunks_.empty()); @@ -90,39 +88,20 @@ namespace hpx::parcelset::policies::lci { num_send += header_.num_zero_copy_chunks(); } tag = 0; // If no need to post send, then tag can be ignored. - sharedPtr_p = nullptr; - if (config_t::enable_sendmc || num_send > 0) - { - sharedPtr_p = new std::shared_ptr( - std::dynamic_pointer_cast( - shared_from_this())); - if (num_send > 0) - tag = next_tag.fetch_add(num_send) % LCI_MAX_TAG; - } - if ((int) tag <= LCI_MAX_TAG && (int) tag + num_send > LCI_MAX_TAG) + sharedPtr_p = new std::shared_ptr( + std::dynamic_pointer_cast( + shared_from_this())); + if (num_send > 0) + tag = next_tag.fetch_add(num_send) % + util::lci_environment::get_max_tag(); + if ((int) tag <= util::lci_environment::get_max_tag() && + (int) tag + num_send > util::lci_environment::get_max_tag()) util::lci_environment::log( util::lci_environment::log_level_t::debug, "tag", - "Rank %d Wrap around!\n", LCI_RANK); - header_.set_device_idx(device_p->idx); + "Rank %d Wrap around!\n", util::lci_environment::rank()); header_.set_tag(tag); - if (!config_t::enable_in_buffer_assembly) - { - int retry_count = 0; - while ( - LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK) - { - if (config_t::bg_work_when_send) - pp_->do_background_work(0, parcelport_background_mode::all); - yield_k(retry_count, config_t::mbuffer_alloc_max_retry); - } - memcpy(header_buffer.address, header_buffer_vector.data(), - header_buffer_vector.size()); - header_buffer.length = header_buffer_vector.size(); - } send_chunks_idx = 0; completion = nullptr; - segment_to_use = LCI_SEGMENT_ALL; - segment_used = LCI_SEGMENT_ALL; // set state profile_start_hook(header_); state.store(connection_state::initialized, std::memory_order_release); @@ -131,7 +110,7 @@ namespace hpx::parcelset::policies::lci { "send", "send connection (%d, %d, %d, %d) tchunks " "%d data %d chunks %d start!\n", - LCI_RANK, dst_rank, original_tag, num_send, + util::lci_environment::rank(), dst_rank, original_tag, num_send, header_.piggy_back_tchunk() != nullptr, header_.piggy_back_data() != nullptr, header_.num_zero_copy_chunks()); @@ -174,38 +153,41 @@ namespace hpx::parcelset::policies::lci { const auto next_state = connection_state::sent_header; HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); HPX_UNUSED(current_state); - LCI_error_t ret; - if (config_t::enable_sendmc) + ::lci::status_t status; + if (completion == nullptr) { - if (completion == nullptr) - { - completion = - device_p->completion_manager_p->send->alloc_completion(); - } - state.store(connection_state::locked, std::memory_order_relaxed); + completion = + device_p->completion_manager_p->send->alloc_completion(); } + state.store(connection_state::locked, std::memory_order_relaxed); if (config_t::protocol == config_t::protocol_t::putsendrecv) { - ret = LCI_putmac(device_p->endpoint_new, header_buffer, dst_rank, 0, - LCI_DEFAULT_COMP_REMOTE, - config_t::enable_sendmc ? completion : nullptr, sharedPtr_p); + ::lci::rcomp_t rcomp = + device_p->completion_manager_p->recv_new_rcomp; + status = ::lci::post_am_x( + dst_rank, header_buffer, header_buffer_size, completion, rcomp) + .device(device_p->device) + .user_context(sharedPtr_p)(); } else { HPX_ASSERT(config_t::protocol == config_t::protocol_t::sendrecv); - ret = LCI_sendmc(device_p->endpoint_new, header_buffer, dst_rank, 0, - config_t::enable_sendmc ? completion : nullptr, sharedPtr_p); + status = ::lci::post_send_x( + dst_rank, header_buffer, header_buffer_size, 0, completion) + .device(device_p->device) + .user_context(sharedPtr_p)(); } - if (ret == LCI_OK) + if (status.is_done() || status.is_posted()) { util::lci_environment::log( util::lci_environment::log_level_t::debug, "send", - "%s (%d, %d, %d) length %lu\n", + "%s (%d, %d, %d) length %lu status %s\n", config_t::protocol == config_t::protocol_t::putsendrecv ? - "LCI_putmna" : - "LCI_sendmn", - LCI_RANK, dst_rank, tag, header_buffer.length); - if (config_t::enable_sendmc) + "post_am" : + "post_send", + util::lci_environment::rank(), dst_rank, tag, + header_buffer_size, status.get_error().get_str()); + if (status.is_posted()) { auto ret_comp = completion; completion = nullptr; @@ -220,9 +202,7 @@ namespace hpx::parcelset::policies::lci { } else { - HPX_ASSERT(ret == LCI_ERR_RETRY); - if (config_t::enable_sendmc) - state.store(current_state, std::memory_order_release); + state.store(current_state, std::memory_order_release); return {return_status_t::retry, nullptr}; } } @@ -231,82 +211,35 @@ namespace hpx::parcelset::policies::lci { sender_connection_sendrecv::unified_followup_send( void* address, size_t length) { - if (length <= (size_t) LCI_MEDIUM_SIZE) + if (completion == nullptr) { - LCI_mbuffer_t buffer; - buffer.address = address; - buffer.length = length; - if (config_t::enable_sendmc && completion == nullptr) - { - completion = - device_p->completion_manager_p->send->alloc_completion(); - } - LCI_error_t ret = LCI_sendmc(device_p->endpoint_followup, buffer, - dst_rank, tag, config_t::enable_sendmc ? completion : nullptr, - sharedPtr_p); - if (ret == LCI_OK) - { - util::lci_environment::log( - util::lci_environment::log_level_t::debug, "send", - "sendm (%d, %d, %d) device %d tag %d size %d\n", LCI_RANK, - dst_rank, original_tag, device_p->idx, tag, length); - tag = (tag + 1) % LCI_MAX_TAG; - if (config_t::enable_sendmc) - { - auto ret_comp = completion; - completion = nullptr; - return {return_status_t::wait, ret_comp}; - } - else - return {return_status_t::done, nullptr}; - } - else - { - HPX_ASSERT(ret == LCI_ERR_RETRY); - return {return_status_t::retry, nullptr}; - } + completion = + device_p->completion_manager_p->send->alloc_completion(); } - else + auto status = + ::lci::post_send_x(dst_rank, address, length, tag, completion) + .user_context(sharedPtr_p) + .device(device_p->device)(); + if (status.is_done() || status.is_posted()) { - if (config_t::reg_mem && segment_to_use == LCI_SEGMENT_ALL) - { - LCI_memory_register( - device_p->device, address, length, &segment_to_use); - } - if (completion == nullptr) - { - completion = - device_p->completion_manager_p->send->alloc_completion(); - } - LCI_lbuffer_t buffer; - buffer.segment = segment_to_use; - buffer.address = address; - buffer.length = length; - LCI_error_t ret = LCI_sendl(device_p->endpoint_followup, buffer, - dst_rank, tag, completion, sharedPtr_p); - if (ret == LCI_OK) + util::lci_environment::log( + util::lci_environment::log_level_t::debug, "send", + "post_send (%d, %d, %d) device %d tag %d size %d status %s\n", + util::lci_environment::rank(), dst_rank, original_tag, + device_p->idx, tag, length, status.get_error().get_str()); + tag = (tag + 1) % util::lci_environment::get_max_tag(); + if (status.is_posted()) { - util::lci_environment::log( - util::lci_environment::log_level_t::debug, "send", - "sendl (%d, %d, %d) device %d, tag %d size %d\n", LCI_RANK, - dst_rank, original_tag, device_p->idx, tag, length); - tag = (tag + 1) % LCI_MAX_TAG; - if (segment_used != LCI_SEGMENT_ALL) - { - LCI_memory_deregister(&segment_used); - segment_used = LCI_SEGMENT_ALL; - } - segment_used = segment_to_use; - segment_to_use = LCI_SEGMENT_ALL; auto ret_comp = completion; completion = nullptr; return {return_status_t::wait, ret_comp}; } else - { - HPX_ASSERT(ret == LCI_ERR_RETRY); - return {return_status_t::retry, nullptr}; - } + return {return_status_t::done, nullptr}; + } + else + { + return {return_status_t::retry, nullptr}; } } @@ -425,8 +358,9 @@ namespace hpx::parcelset::policies::lci { void sender_connection_sendrecv::done() { util::lci_environment::log(util::lci_environment::log_level_t::debug, - "send", "send connection (%d, %d, %d, %d) done!\n", LCI_RANK, - dst_rank, original_tag, tag - original_tag + 1); + "send", "send connection (%d, %d, %d, %d) done!\n", + util::lci_environment::rank(), dst_rank, original_tag, + tag - original_tag + 1); profile_end_hook(); error_code ec; handler_(ec); @@ -435,22 +369,17 @@ namespace hpx::parcelset::policies::lci { data_point_.time_ = timer_.elapsed_nanoseconds(); pp_->add_sent_data(data_point_); #endif - if (segment_used != LCI_SEGMENT_ALL) - { - LCI_memory_deregister(&segment_used); - segment_used = LCI_SEGMENT_ALL; - } - if (config_t::enable_sendmc) + if (!completion.is_empty()) { - LCI_mbuffer_free(header_buffer); + device_p->completion_manager_p->send->free_completion(completion); } - HPX_ASSERT(completion == nullptr); - HPX_ASSERT(segment_to_use == LCI_SEGMENT_ALL); buffer_.clear(); util::lci_environment::pcounter_add( util::lci_environment::send_conn_timer, util::lci_environment::pcounter_since(conn_start_time)); + auto keep_alive = shared_from_this(); + delete sharedPtr_p; if (postprocess_handler_) { // Return this connection to the connection cache. @@ -463,17 +392,10 @@ namespace hpx::parcelset::policies::lci { postprocess_handler; std::swap(postprocess_handler, postprocess_handler_); error_code ec2; - postprocess_handler(ec2, there_, shared_from_this()); + postprocess_handler(ec2, there_, std::move(keep_alive)); } } - bool sender_connection_sendrecv::tryMerge( - const std::shared_ptr& other_base) - { - // We cannot merge any message here. - HPX_UNUSED(other_base); - return false; - } } // namespace hpx::parcelset::policies::lci #endif