From 3a3deee675c29a80a3a41cafc76c5f363423b586 Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Mon, 30 Jun 2025 14:38:54 -0700 Subject: [PATCH 01/11] Retry a few times if directory creation fails This tries to fix the following error reported by @BenWibking Writing checkpoint chk02500 amrex::UtilCreateDirectory:: path errno: chk02500 :: File exists amrex::UtilCreateDirectory:: path errno: chk02500/Level_2 :: Input/output error amrex::Error::0::Couldn't create directory: chk02500/Level_2 !!! --- Src/Base/AMReX_Utility.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Src/Base/AMReX_Utility.cpp b/Src/Base/AMReX_Utility.cpp index aa3d8a2d165..ec4b15161aa 100644 --- a/Src/Base/AMReX_Utility.cpp +++ b/Src/Base/AMReX_Utility.cpp @@ -116,7 +116,13 @@ bool amrex::UtilCreateDirectory (const std::string& path, mode_t mode, bool verbose) { - return FileSystem::CreateDirectories(path, mode, verbose); + double sleep = 0.1; + while (sleep < 2.0) { + if (FileSystem::CreateDirectories(path, mode, verbose)) { return true; } + amrex::Sleep(sleep); + sleep *= 2; + } + return false; } void From cdeb6a06116b3233382b9b60d8c728d614e930c7 Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Mon, 30 Jun 2025 15:22:42 -0700 Subject: [PATCH 02/11] Add more error checks --- Src/Base/AMReX_VisMF.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Src/Base/AMReX_VisMF.cpp b/Src/Base/AMReX_VisMF.cpp index 9fa8aca1407..17af7cd0791 100644 --- a/Src/Base/AMReX_VisMF.cpp +++ b/Src/Base/AMReX_VisMF.cpp @@ -1114,6 +1114,7 @@ VisMF::Write (const FabArray& mf, nfi.Stream().write(allFabData, bytesWritten); nfi.Stream().flush(); delete [] allFabData; + if (! nfi.Stream().good()) { amrex::Error("VisMF::Write failed"); } } else { // ---- write fabs individually for(MFIter mfi(mf); mfi.isValid(); ++mfi) { @@ -1154,6 +1155,8 @@ VisMF::Write (const FabArray& mf, if (!noFlushAfterWrite) { nfi.Stream().flush(); } + + if (! nfi.Stream().good()) { amrex::Error("VisMF::Write failed"); } } } @@ -1485,6 +1488,8 @@ VisMF::readFAB (int idx, #endif } + if (!(infs->good())) { amrex::Error("VisMF::readFAB failed"); } + VisMF::CloseStream(FullName); return fab; @@ -1532,6 +1537,8 @@ VisMF::readFAB (FabArray &mf, fab.readFrom(*infs); } + if (!(infs->good())) { amrex::Error("VisMF::readFAB failed"); } + VisMF::CloseStream(FullName); } @@ -1831,6 +1838,8 @@ VisMF::Read (FabArray &mf, } } + if (! nfi.Stream().good()) { amrex::Error("VisMF::Read failed"); } + } // ---- end NFilesIter } @@ -2524,6 +2533,7 @@ VisMF::AsyncWriteDoit (const FabArray& mf, const std::string& mf_name fabio->write(ofs, fab, 0, fab.nComp()); } ofs.flush(); + if (!ofs.good()) { amrex::Error("VisMF::AsyncWriteDoit failed"); } ofs.close(); } From d74176ec9b8f4752d5d7722a04e5af22362577ac Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Tue, 1 Jul 2025 18:44:53 -0700 Subject: [PATCH 03/11] amrex::FileStream Implement our own file stream class that is somewhat similar to std::fstream. It can be used by amrex::NFilesIter in mesh and particle data I/O. By default amrex::NFilesIter still uses std::fstream. To use amrex::FileStream, one needs to use `USE_OWN_FILE_STREAM=TRUE` for GNU Make and `AMReX_OWN_FILE_STREAM=ON` for CMake. amrex::FileStream::Read has not been tested and currently it is not being used for reading MultiFab and particles. --- .../source/BuildingAMReX.rst | 92 ++-- Src/Base/AMReX.H | 2 +- Src/Base/AMReX.cpp | 4 +- Src/Base/AMReX_FabConv.H | 25 ++ Src/Base/AMReX_FabConv.cpp | 171 +++++++ Src/Base/AMReX_FileStream.H | 149 ++++++ Src/Base/AMReX_FileStream.cpp | 423 ++++++++++++++++++ Src/Base/AMReX_IntConv.H | 8 +- Src/Base/AMReX_NFiles.H | 10 +- Src/Base/AMReX_NFiles.cpp | 8 + Src/Base/AMReX_VectorIO.H | 174 +++++-- Src/Base/AMReX_VectorIO.cpp | 136 ------ Src/Base/AMReX_VisMF.H | 3 + Src/Base/AMReX_VisMF.cpp | 11 + Src/Base/CMakeLists.txt | 3 +- Src/Base/Make.package | 5 +- Src/Particle/AMReX_ParticleContainer.H | 9 +- Src/Particle/AMReX_ParticleIO.H | 9 +- Src/Particle/AMReX_WriteBinaryParticleData.H | 2 +- Tools/CMake/AMReXConfig.cmake.in | 3 +- Tools/CMake/AMReXOptions.cmake | 2 + Tools/CMake/AMReXSetDefines.cmake | 3 + Tools/CMake/AMReX_Config_ND.H.in | 1 + Tools/GNUMake/Make.defs | 10 + 24 files changed, 1037 insertions(+), 226 deletions(-) create mode 100644 Src/Base/AMReX_FileStream.H create mode 100644 Src/Base/AMReX_FileStream.cpp delete mode 100644 Src/Base/AMReX_VectorIO.cpp diff --git a/Docs/sphinx_documentation/source/BuildingAMReX.rst b/Docs/sphinx_documentation/source/BuildingAMReX.rst index 43afa7c337b..cf0a35685c3 100644 --- a/Docs/sphinx_documentation/source/BuildingAMReX.rst +++ b/Docs/sphinx_documentation/source/BuildingAMReX.rst @@ -28,50 +28,52 @@ list of important variables. .. table:: Important make variables - +-----------------+-------------------------------------+--------------------+ - | Variable | Value | Default | - +=================+=====================================+====================+ - | AMREX_HOME | Path to amrex | environment | - +-----------------+-------------------------------------+--------------------+ - | COMP | gnu, cray, ibm, intel, intel-llvm, | | - | | intel-classic, llvm, or pgi | none | - +-----------------+-------------------------------------+--------------------+ - | CXXSTD | C++ standard (``c++17``, ``c++20``) | compiler default, | - | | | at least ``c++17`` | - +-----------------+-------------------------------------+--------------------+ - | DEBUG | TRUE or FALSE | FALSE | - +-----------------+-------------------------------------+--------------------+ - | DIM | 1 or 2 or 3 | 3 | - +-----------------+-------------------------------------+--------------------+ - | PRECISION | DOUBLE or FLOAT | DOUBLE | - +-----------------+-------------------------------------+--------------------+ - | TEST | TRUE or FALSE | FALSE | - +-----------------+-------------------------------------+--------------------+ - | USE_ASSERTION | TRUE or FALSE | FALSE | - +-----------------+-------------------------------------+--------------------+ - | USE_MPI | TRUE or FALSE | FALSE | - +-----------------+-------------------------------------+--------------------+ - | USE_OMP | TRUE or FALSE | FALSE | - +-----------------+-------------------------------------+--------------------+ - | USE_CUDA | TRUE or FALSE | FALSE | - +-----------------+-------------------------------------+--------------------+ - | USE_HIP | TRUE or FALSE | FALSE | - +-----------------+-------------------------------------+--------------------+ - | USE_SYCL | TRUE or FALSE | FALSE | - +-----------------+-------------------------------------+--------------------+ - | USE_RPATH | TRUE or FALSE | FALSE | - +-----------------+-------------------------------------+--------------------+ - | WARN_ALL | TRUE or FALSE | TRUE for DEBUG | - | | | FALSE otherwise | - +-----------------+-------------------------------------+--------------------+ - | AMREX_CUDA_ARCH | CUDA arch such as 70 | 70 if not set | - | or CUDA_ARCH | | or detected | - +-----------------+-------------------------------------+--------------------+ - | AMREX_AMD_ARCH | AMD GPU arch such as gfx908 | none if the | - | or AMD_ARCH | | machine is unknown | - +-----------------+-------------------------------------+--------------------+ - | USE_GPU_RDC | TRUE or FALSE | TRUE | - +-----------------+-------------------------------------+--------------------+ + +---------------------+-------------------------------------+--------------------+ + | Variable | Value | Default | + +=====================+=====================================+====================+ + | AMREX_HOME | Path to amrex | environment | + +---------------------+-------------------------------------+--------------------+ + | COMP | gnu, cray, ibm, intel, intel-llvm, | | + | | intel-classic, llvm, or pgi | none | + +---------------------+-------------------------------------+--------------------+ + | CXXSTD | C++ standard (``c++17``, ``c++20``) | compiler default, | + | | | at least ``c++17`` | + +---------------------+-------------------------------------+--------------------+ + | DEBUG | TRUE or FALSE | FALSE | + +---------------------+-------------------------------------+--------------------+ + | DIM | 1 or 2 or 3 | 3 | + +---------------------+-------------------------------------+--------------------+ + | PRECISION | DOUBLE or FLOAT | DOUBLE | + +---------------------+-------------------------------------+--------------------+ + | TEST | TRUE or FALSE | FALSE | + +---------------------+-------------------------------------+--------------------+ + | USE_ASSERTION | TRUE or FALSE | FALSE | + +---------------------+-------------------------------------+--------------------+ + | USE_MPI | TRUE or FALSE | FALSE | + +---------------------+-------------------------------------+--------------------+ + | USE_OMP | TRUE or FALSE | FALSE | + +---------------------+-------------------------------------+--------------------+ + | USE_CUDA | TRUE or FALSE | FALSE | + +---------------------+-------------------------------------+--------------------+ + | USE_HIP | TRUE or FALSE | FALSE | + +---------------------+-------------------------------------+--------------------+ + | USE_SYCL | TRUE or FALSE | FALSE | + +---------------------+-------------------------------------+--------------------+ + | USE_RPATH | TRUE or FALSE | FALSE | + +---------------------+-------------------------------------+--------------------+ + | WARN_ALL | TRUE or FALSE | TRUE for DEBUG | + | | | FALSE otherwise | + +---------------------+-------------------------------------+--------------------+ + | AMREX_CUDA_ARCH | CUDA arch such as 70 | 70 if not set | + | or CUDA_ARCH | | or detected | + +---------------------+-------------------------------------+--------------------+ + | AMREX_AMD_ARCH | AMD GPU arch such as gfx908 | none if the | + | or AMD_ARCH | | machine is unknown | + +---------------------+-------------------------------------+--------------------+ + | USE_GPU_RDC | TRUE or FALSE | TRUE | + +---------------------+-------------------------------------+--------------------+ + | USE_OWN_FILE_STREAM | TRUE or FALSE | FALSE | + +---------------------+-------------------------------------+--------------------+ .. raw:: latex @@ -551,6 +553,8 @@ The list of available options is reported in the :ref:`table ` bel | AMReX_INLINE_LIMIT | Inline limit. Relevant only when | 43210 | Non-negative number | | | AMReX_COMPILER_DEFAULT_INLINE is NO. | | | +------------------------------+-------------------------------------------------+-------------------------+-----------------------+ + | AMReX_OWN_FILE_STREAM | Use AMReX's own file stream in I/O | NO | YES, NO | + +------------------------------+-------------------------------------------------+-------------------------+-----------------------+ .. raw:: latex \end{center} diff --git a/Src/Base/AMReX.H b/Src/Base/AMReX.H index 257f96c8770..1462516e0d9 100644 --- a/Src/Base/AMReX.H +++ b/Src/Base/AMReX.H @@ -136,7 +136,7 @@ namespace amrex //! Print out message to cerr and exit via amrex::Abort(). void Error (const std::string& msg); - void Error_host (const char* type, const char* msg); + void Error_host (const char* type, const char* msg, bool can_throw = true); AMREX_GPU_HOST_DEVICE AMREX_FORCE_INLINE void Error (const char* msg = nullptr) { diff --git a/Src/Base/AMReX.cpp b/Src/Base/AMReX.cpp index 001f16c8801..ddce1233a03 100644 --- a/Src/Base/AMReX.cpp +++ b/Src/Base/AMReX.cpp @@ -239,7 +239,7 @@ amrex::Warning (const std::string& msg) } void -amrex::Error_host (const char* type, const char * msg) +amrex::Error_host (const char* type, const char * msg, bool can_throw) { amrex::ignore_unused(type); #ifdef AMREX_USE_COVERITY @@ -247,7 +247,7 @@ amrex::Error_host (const char* type, const char * msg) #else if (system::error_handler) { system::error_handler(msg); - } else if (system::throw_exception) { + } else if (system::throw_exception && can_throw) { throw RuntimeError(msg); } else { write_lib_id(type); diff --git a/Src/Base/AMReX_FabConv.H b/Src/Base/AMReX_FabConv.H index 25dae063de7..6d0093af4c6 100644 --- a/Src/Base/AMReX_FabConv.H +++ b/Src/Base/AMReX_FabConv.H @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -182,6 +183,10 @@ public: Long nitems, std::istream& is, const RealDescriptor& id); + static void convertToNativeFormat (Real* out, + Long nitems, + amrex::FileStream& is, + const RealDescriptor& id); /** * \brief Convert nitems Reals in native format to RealDescriptor format @@ -191,6 +196,10 @@ public: Long nitems, const Real* in, const RealDescriptor& od); + static void convertFromNativeFormat (amrex::FileStream& os, + Long nitems, + const Real* in, + const RealDescriptor& od); /** * \brief Convert nitems Reals in native format to RealDescriptor format. * The out array is assumed to be large enough to hold the @@ -209,6 +218,10 @@ public: Long nitems, const float* in, const RealDescriptor& od); + static void convertFromNativeFloatFormat (amrex::FileStream& os, + Long nitems, + const float* in, + const RealDescriptor& od); /** * \brief Convert nitems doubles in native format to RealDescriptor format @@ -218,6 +231,10 @@ public: Long nitems, const double* in, const RealDescriptor& od); + static void convertFromNativeDoubleFormat (amrex::FileStream& os, + Long nitems, + const double* in, + const RealDescriptor& od); /** * \brief Read nitems from istream in RealDescriptor format and @@ -228,6 +245,10 @@ public: Long nitems, std::istream& is, const RealDescriptor& id); + static void convertToNativeFloatFormat (float* out, + Long nitems, + amrex::FileStream& is, + const RealDescriptor& id); /** * \brief Read nitems from istream in RealDescriptor format and @@ -238,6 +259,10 @@ public: Long nitems, std::istream& is, const RealDescriptor& id); + static void convertToNativeDoubleFormat (double* out, + Long nitems, + amrex::FileStream& is, + const RealDescriptor& id); private: diff --git a/Src/Base/AMReX_FabConv.cpp b/Src/Base/AMReX_FabConv.cpp index e8dd870a058..5f303a436fb 100644 --- a/Src/Base/AMReX_FabConv.cpp +++ b/Src/Base/AMReX_FabConv.cpp @@ -1034,6 +1034,39 @@ RealDescriptor::convertToNativeFormat (Real* out, delete [] bufr; } +void +RealDescriptor::convertToNativeFormat (Real* out, + Long nitems, + amrex::FileStream& is, + const RealDescriptor& id) +{ + auto buffSize = std::min(Long(readBufferSize), nitems); + std::unique_ptr bufr(new char[buffSize * id.numBytes()]); // do NOT use make_unique + while (nitems > 0) + { + auto get = std::min(static_cast(readBufferSize), nitems); + is.read(bufr.get(), id.numBytes()*get); + PD_convert(out, + bufr.get(), + get, + 0, + FPC::NativeRealDescriptor(), + id, + FPC::NativeLongDescriptor()); + + if(bAlwaysFixDenormals) { + PD_fixdenormals(out, get, FPC::NativeRealDescriptor().format(), + FPC::NativeRealDescriptor().order()); + } + nitems -= get; + out += get; + } + + if(is.fail()) { + amrex::Error("convert(Real*,Long,FileStream&,RealDescriptor&) failed"); + } +} + // // Convert nitems Reals in native format to RealDescriptor format. // @@ -1096,6 +1129,30 @@ RealDescriptor::convertFromNativeFormat (std::ostream& os, } } +void +RealDescriptor::convertFromNativeFormat (amrex::FileStream& os, + Long nitems, + const Real* in, + const RealDescriptor& od) +{ + auto buffSize = std::min(Long(writeBufferSize), nitems); + std::unique_ptr bufr(new char[buffSize * od.numBytes()]); // do NOT use make_unique + while (nitems > 0) + { + auto put = std::min(static_cast(writeBufferSize), nitems); + PD_convert(bufr.get(), + in, + put, + 0, + od, + FPC::NativeRealDescriptor(), + FPC::NativeLongDescriptor()); + os.write(bufr.get(), od.numBytes()*put); + nitems -= put; + in += put; + } +} + // // Convert nitems floats in native format to RealDescriptor format // and write them to the ostream. @@ -1138,6 +1195,30 @@ RealDescriptor::convertFromNativeFloatFormat (std::ostream& os, } } +void +RealDescriptor::convertFromNativeFloatFormat (amrex::FileStream& os, + Long nitems, + const float* in, + const RealDescriptor& od) +{ + auto buffSize = std::min(Long(writeBufferSize), nitems); + std::unique_ptr bufr(new char[buffSize * od.numBytes()]); // do NOT use make_unique + while (nitems > 0) + { + auto put = std::min(static_cast(writeBufferSize), nitems); + PD_convert(bufr.get(), + in, + put, + 0, + od, + FPC::Native32RealDescriptor(), + FPC::NativeLongDescriptor()); + os.write(bufr.get(), od.numBytes()*put); + nitems -= put; + in += put; + } +} + // // Convert nitems doubles in native format to RealDescriptor format // and write them to the ostream. @@ -1180,6 +1261,30 @@ RealDescriptor::convertFromNativeDoubleFormat (std::ostream& os, } } +void +RealDescriptor::convertFromNativeDoubleFormat (amrex::FileStream& os, + Long nitems, + const double* in, + const RealDescriptor& od) +{ + auto buffSize = std::min(Long(writeBufferSize), nitems); + std::unique_ptr bufr(new char[buffSize * od.numBytes()]); // do NOT use make_unique + while (nitems > 0) + { + auto put = std::min(static_cast(writeBufferSize), nitems); + PD_convert(bufr.get(), + in, + put, + 0, + od, + FPC::Native64RealDescriptor(), + FPC::NativeLongDescriptor()); + os.write(bufr.get(), od.numBytes()*put); + nitems -= put; + in += put; + } +} + // // Read nitems from istream in RealDescriptor format to native float format. // @@ -1222,6 +1327,39 @@ RealDescriptor::convertToNativeFloatFormat (float* out, delete [] bufr; } +void +RealDescriptor::convertToNativeFloatFormat (float* out, + Long nitems, + amrex::FileStream& is, + const RealDescriptor& id) +{ + auto buffSize = std::min(Long(readBufferSize), nitems); + std::unique_ptr bufr(new char[buffSize * id.numBytes()]); // do NOT use make_unique + while (nitems > 0) + { + auto get = std::min(static_cast(readBufferSize), nitems); + is.read(bufr.get(), id.numBytes()*get); + PD_convert(out, + bufr.get(), + get, + 0, + FPC::Native32RealDescriptor(), + id, + FPC::NativeLongDescriptor()); + + if(bAlwaysFixDenormals) { + PD_fixdenormals(out, get, FPC::Native32RealDescriptor().format(), + FPC::Native32RealDescriptor().order()); + } + nitems -= get; + out += get; + } + + if(is.fail()) { + amrex::Error("convert(Real*,Long,FileStream&,RealDescriptor&) failed"); + } +} + // // Read nitems from istream in RealDescriptor format to native double format. // @@ -1264,4 +1402,37 @@ RealDescriptor::convertToNativeDoubleFormat (double* out, delete [] bufr; } +void +RealDescriptor::convertToNativeDoubleFormat (double* out, + Long nitems, + amrex::FileStream& is, + const RealDescriptor& id) +{ + auto buffSize = std::min(Long(readBufferSize), nitems); + std::unique_ptr bufr(new char[buffSize * id.numBytes()]); // do NOT use make_unique + while (nitems > 0) + { + auto get = std::min(static_cast(readBufferSize), nitems); + is.read(bufr.get(), id.numBytes()*get); + PD_convert(out, + bufr.get(), + get, + 0, + FPC::Native64RealDescriptor(), + id, + FPC::NativeLongDescriptor()); + + if(bAlwaysFixDenormals) { + PD_fixdenormals(out, get, FPC::Native64RealDescriptor().format(), + FPC::Native64RealDescriptor().order()); + } + nitems -= get; + out += get; + } + + if(is.fail()) { + amrex::Error("convert(Real*,Long,FileStream&,RealDescriptor&) failed"); + } +} + } diff --git a/Src/Base/AMReX_FileStream.H b/Src/Base/AMReX_FileStream.H new file mode 100644 index 00000000000..bab9996414b --- /dev/null +++ b/Src/Base/AMReX_FileStream.H @@ -0,0 +1,149 @@ +#ifndef AMREX_FILE_STREAM_H_ +#define AMREX_FILE_STREAM_H_ +#include + +#include +#include +#include + +#ifndef _WIN32 + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace amrex +{ + +class FileStream +{ +public: + + using off_type = std::char_traits::off_type; + using pos_type = std::char_traits::pos_type; + + explicit FileStream (char const* filename, + std::ios_base::openmode mode + = std::ios_base::in | std::ios_base::out); + explicit FileStream (std::string const& filename, + std::ios_base::openmode mode + = std::ios_base::in | std::ios_base::out); + + FileStream () = default; + FileStream (FileStream&& other); + FileStream (FileStream const& rhs) = delete; + + ~FileStream (); + + FileStream& operator= (FileStream&& other); + FileStream& operator= (FileStream const& rhs) = delete; + + void open (char const* filename, + std::ios_base::openmode mode + = std::ios_base::in | std::ios_base::out); + void open (std::string const& filename, + std::ios_base::openmode mode + = std::ios_base::in | std::ios_base::out); + + void close (); + + FileStream& read (char* s, std::streamsize count); + + FileStream& write (char const* s, std::streamsize count); + + FileStream& flush (); + + FileStream& seekp (pos_type off); + FileStream& seekp (off_type off, std::ios_base::seekdir dir); + + FileStream& seekg (pos_type off); + FileStream& seekg (off_type off, std::ios_base::seekdir dir); + + pos_type tellp (); + + pos_type tellg (); + + [[nodiscard]] bool good () const; + + [[nodiscard]] bool bad () const; + + [[nodiscard]] bool fail () const; + +private: + + double m_base_delay = 0.1; + off_t m_pos = 0; + int m_max_retries = 16; + int m_fd = -1; + bool m_good = false; + bool m_binary = true; // not used + + std::unique_ptr m_buffer; + std::size_t m_buffer_size = 32*1024*1024; // 32MB + std::size_t m_buffer_begin = 0; + std::size_t m_buffer_end = 0; + enum class BufferMode { Read, Write, None}; + BufferMode m_buffer_mode = BufferMode::None; + + template + void execute_with_retry (F&& func, std::string const& op_name) + { + int attempt = 0; + double delay = m_base_delay; + + while (attempt <= m_max_retries) { + if (func()) { return; } + + // Retry for certain error + if (errno == EINTR // Interrupted function + || errno == EAGAIN // Resource unavailable, try again + || errno == EWOULDBLOCK // Operation would block + || errno == EMFILE // Too many files this process + || errno == ENFILE // Too many files sysetem wide + || errno == EBUSY // Busy + || errno == ENOENT // No such file or directory + || errno == ENOBUFS // No buffer + || errno == ENODEV // No such device + || errno == ENOMEM // Out of memory + || errno == ENOTCONN // Socket not connected + || errno == ESTALE // Stale file handle + || errno == ETIMEDOUT // Network timeout + || errno == EIO) // I/O error + { + attempt++; + amrex::Sleep(delay); + delay *= 2; // Exponential backoff + if (amrex::Verbose()) { + amrex::AllPrint() << op_name << " failed after " + << std::to_string(attempt) + << ": " << std::strerror(errno) << "\n"; + } + } else { + break; + } + } + + m_good = false; + throw std::runtime_error(op_name + " failed: " + std::strerror(errno)); + } + + void file_write (char const* s, std::size_t count); + + void flush_write_buffer (); + + void fill_write_buffer (char const* s, std::size_t count); + + void fill_read_buffer (); +}; + +} + +#endif + +#endif diff --git a/Src/Base/AMReX_FileStream.cpp b/Src/Base/AMReX_FileStream.cpp new file mode 100644 index 00000000000..49134acc7aa --- /dev/null +++ b/Src/Base/AMReX_FileStream.cpp @@ -0,0 +1,423 @@ +#include + +#ifndef _WIN32 + +#include +#include + +namespace amrex +{ + +FileStream::FileStream (char const* filename, std::ios_base::openmode mode) +{ + open(filename, mode); +} + +FileStream::FileStream (std::string const& filename, std::ios_base::openmode mode) +{ + open(filename.c_str(), mode); +} + +FileStream::FileStream (FileStream&& other) + : m_base_delay (other.m_base_delay), + m_pos (other.m_pos), + m_max_retries (other.m_max_retries), + m_fd (std::exchange(other.m_fd,-1)), + m_good (std::exchange(other.m_good,false)), + m_binary (other.m_binary), + m_buffer (std::exchange(other.m_buffer,nullptr)), + m_buffer_size (other.m_buffer_size), + m_buffer_begin (other.m_buffer_begin), + m_buffer_end (other.m_buffer_end), + m_buffer_mode (other.m_buffer_mode) +{} + +FileStream& FileStream::operator= (FileStream&& rhs) +{ + if (this != &rhs) { + std::swap(m_base_delay , rhs.m_base_delay); + std::swap(m_pos , rhs.m_pos); + std::swap(m_max_retries , rhs.m_max_retries); + std::swap(m_good , rhs.m_good); + std::swap(m_binary , rhs.m_binary); + std::swap(m_fd , rhs.m_fd); + std::swap(m_buffer , rhs.m_buffer); + std::swap(m_buffer_size , rhs.m_buffer_size); + std::swap(m_buffer_begin, rhs.m_buffer_begin); + std::swap(m_buffer_end , rhs.m_buffer_end); + std::swap(m_buffer_mode , rhs.m_buffer_mode); + } + return *this; +} + +FileStream::~FileStream () +{ + try { + close(); + } catch (std::runtime_error& e) { + amrex::Error_host("Error", e.what(), false); + } +} + +void FileStream::open (char const* filename, std::ios_base::openmode mode) +{ + int flags = 0; + if ((mode & std::ios_base::in) && + (mode & std::ios_base::out)) { + flags |= O_RDWR | O_CREAT; + } else if (mode & std::ios_base::in) { + flags |= O_RDONLY; + } else if (mode & std::ios_base::out) { + flags |= O_WRONLY | O_CREAT; + } + if (mode & std::ios_base::app) { + flags |= O_APPEND; + } else if ((mode & std::ios_base::trunc) || (mode & std::ios_base::out)) { + flags |= O_TRUNC; + } + m_binary = (mode & std::ios_base::binary); + + mode_t mod = 0666; + execute_with_retry([&]() { + m_fd = ::open(filename, flags, mod); + if (m_fd != -1) { + m_good = true; + m_pos = 0; + return true; + } else { + m_good = false; + return false; + } + }, "File open"); + + if (m_good && (mode & std::ios_base::ate) && !(mode & std::ios_base::app)) { + execute_with_retry([&]() { + auto end_pos = ::lseek(m_fd, 0, SEEK_END); + if (end_pos >= 0) { + m_pos = end_pos; + return true; + } else { + m_good = false; + return false; + } + }, "Seek to end in ate mode"); + } + + if (m_buffer == nullptr) { + m_buffer.reset(new char[m_buffer_size]); // Do NOT use std::make_unique for performance reasons + } + m_buffer_mode = BufferMode::None; + m_buffer_begin = m_buffer_end = 0; +} + +void FileStream::open (std::string const& filename, std::ios_base::openmode mode) +{ + open(filename.c_str(), mode); +} + +void FileStream::close () +{ + if (m_fd != -1) { + try { + flush(); + } catch (std::exception const& e) { + if (amrex::Verbose()) { + amrex::Warning(std::string("FileStream::close: flush failed before close: ") + + e.what()); + } + } + int r = ::close(m_fd); + m_fd = -1; + if (r != 0) { + m_good = false; + if (errno != EINTR) { // Could be harmless + throw std::runtime_error(std::string("FileStream::close failed: ") + + std::strerror(errno)); + } + } + } + m_good = false; +} + +// xxxx TODO: not tested yet +FileStream& FileStream::read (char* s, std::streamsize count) +{ + if (count == 0) { return *this; } + + if (m_fd == -1 || !m_good) { + throw std::runtime_error("FileStream::read: bad file descriptor or bad state"); + } + + if (m_buffer_mode == BufferMode::Write) { + try { + flush_write_buffer(); // This sets buffer mode to None + } catch (...) { + m_good = false; + throw; + } + } + + std::streamsize total_read = 0; + while (total_read < count) + { + if (m_buffer_mode == BufferMode::None) { + fill_read_buffer(); // This set buffer mode to Read + } + + // Now the buffer mode is Read. + if (m_buffer_end == m_buffer_begin) { break; } // EOF + + auto available = m_buffer_end - m_buffer_begin; + auto to_copy = std::min(available, std::size_t(count-total_read)); + + std::memcpy(s + total_read, m_buffer.get() + m_buffer_begin, to_copy); + m_buffer_begin += to_copy; + m_pos += to_copy; + total_read += to_copy; + + if (m_buffer_begin == m_buffer_end) { + m_buffer_mode = BufferMode::None; + m_buffer_begin = m_buffer_end = 0; + } + } + + return *this; +} + +FileStream& FileStream::write (char const* s, std::streamsize count) +{ + if (count == 0) { return *this; } + + if (m_fd == -1 || !m_good) { + throw std::runtime_error("FileStream::write: bad file descriptor or bad state"); + } + + if (m_buffer_mode == BufferMode::Read) { + if (m_buffer_begin < m_buffer_end) { + off_type unread = m_buffer_end - m_buffer_begin; + m_buffer_mode = BufferMode::None; + m_buffer_begin = m_buffer_end = 0; + this->seekp(-unread, std::ios_base::cur); // go back + } else { + m_buffer_mode = BufferMode::None; + m_buffer_begin = m_buffer_end = 0; + } + } + + if (m_buffer_mode == BufferMode::None) { + if (count <= m_buffer_size) { + fill_write_buffer(s, count); + } else { + file_write(s, count); + } + } else { // Write mode + std::streamsize total_written = 0; + char const* src = s; + while (total_written < count) { + std::streamsize remaining = count - total_written; + + std::size_t space_left = m_buffer_size - m_buffer_end; + if (space_left == 0) { + try { + flush_write_buffer(); // this sets buffer mode to None + } catch (...) { + m_good = false; + throw; + } + space_left = m_buffer_size; + } + + if ((m_buffer_mode == BufferMode::None) && (remaining > 2*m_buffer_size)) { + file_write(src, remaining); + total_written += remaining; + break; + } else { + auto to_buffer = std::min(std::size_t(remaining), space_left); + fill_write_buffer(src, to_buffer); + src += to_buffer; + total_written += to_buffer; + } + } + } + + m_pos += count; + + return *this; +} + +FileStream& FileStream::flush () +{ + if (m_fd != -1 && m_good) { + if (m_buffer_mode == BufferMode::Write) { + try { + flush_write_buffer(); + } catch (...) { + m_good = false; + throw; + } + } + } + return *this; +} + +FileStream& FileStream::seekp (pos_type off) +{ + return seekp(off, std::ios_base::beg); +} + +FileStream& FileStream::seekp (off_type off, std::ios_base::seekdir dir) +{ + if (m_fd != -1 && m_good) + { + // reset buffer + if (m_buffer_mode == BufferMode::Write) { + try { + flush_write_buffer(); + } catch (...) { + m_good = false; + throw; + } + } else if (m_buffer_mode == BufferMode::Read) { + if ((dir == std::ios_base::cur) && (m_buffer_begin < m_buffer_end)) { + off_type unread = m_buffer_end - m_buffer_begin; + off -= unread; + } + m_buffer_mode = BufferMode::None; + m_buffer_begin = m_buffer_end = 0; + } + + int whence; + switch (dir) { + case std::ios_base::beg: { whence = SEEK_SET; break; } + case std::ios_base::cur: { whence = SEEK_CUR; break; } + case std::ios_base::end: { whence = SEEK_END; break; } + default: { + m_good = false; + return *this; + } + } + + execute_with_retry([&]() { + off_t new_pos = ::lseek(m_fd, off, whence); + if (new_pos != -1) { + m_pos = new_pos; + return true; + } else { + m_good = false; + return false; + } + }, "Seek position"); + } + return *this; +} + +FileStream& FileStream::seekg (pos_type off) +{ + return seekg(off, std::ios_base::beg); +} + +FileStream& FileStream::seekg (off_type off, std::ios_base::seekdir dir) +{ + return seekp(off,dir); +} + +FileStream::pos_type FileStream::tellp () +{ + return m_pos; +} + +FileStream::pos_type FileStream::tellg () +{ + return m_pos; +} + +bool FileStream::good () const +{ + return m_good && m_fd != -1; +} + +bool FileStream::bad () const +{ + return !m_good || m_fd == -1; +} + +bool FileStream::fail () const +{ + return !m_good || m_fd == -1; +} + +void FileStream::file_write (char const* s, std::size_t count) +{ + if (count == 0) { return; } + + if (m_fd == -1 || !m_good) { + throw std::runtime_error("FileStream::file_write: bad file descriptor or bad state"); + } + + size_t total_written = 0; + while (total_written < count) { + ssize_t nbytes_written = -1; + execute_with_retry([&]() { + nbytes_written = ::write(m_fd, s + total_written, count - total_written); + if (nbytes_written >= 0) { + return true; + } else { + return false; + } + }, "Write"); + if (nbytes_written >= 0) { + total_written += nbytes_written; + } else { + throw std::runtime_error("FileStream: write failed"); + } + } +} + +void FileStream::flush_write_buffer () +{ + if (m_buffer_end == 0) { return; } + file_write(m_buffer.get(), m_buffer_end); + m_buffer_mode = BufferMode::None; + m_buffer_end = 0; +} + +void FileStream::fill_write_buffer (char const* s, std::size_t count) +{ + std::memcpy(m_buffer.get()+m_buffer_end, s, count); + m_buffer_mode = BufferMode::Write; + m_buffer_end += count; +} + +void FileStream::fill_read_buffer () +{ + if (m_fd == -1 || !m_good) { + throw std::runtime_error("FileStream::fill_read_buffer: bad file descriptor or bad state"); + } + + m_buffer_begin = 0; + m_buffer_end = 0; + while (m_buffer_end < m_buffer_size) { + ssize_t nbytes_read = -1; + execute_with_retry([&]() { + nbytes_read = ::read(m_fd, m_buffer.get() + m_buffer_end, + m_buffer_size - m_buffer_end); + if (nbytes_read >= 0) { + return true; + } else { + return false; + } + }, "Read to buffer"); + if (nbytes_read > 0) { + m_buffer_end += nbytes_read; + } else if (nbytes_read == 0) { + break; // EOF + } else { + throw std::runtime_error("FileStream::fill_read_buffer failed"); + } + } + m_buffer_mode = BufferMode::Read; +} + +} + +#endif diff --git a/Src/Base/AMReX_IntConv.H b/Src/Base/AMReX_IntConv.H index f792f4b8eab..6833ac7d40a 100644 --- a/Src/Base/AMReX_IntConv.H +++ b/Src/Base/AMReX_IntConv.H @@ -19,8 +19,8 @@ namespace amrex { std::uint32_t swapBytes (std::uint32_t value); std::uint64_t swapBytes (std::uint64_t value); - template - void writeIntData (const From* data, std::size_t size, std::ostream& os, + template + void writeIntData (const From* data, std::size_t size, OS& os, const amrex::IntDescriptor& id) { To value; @@ -32,8 +32,8 @@ namespace amrex { } } - template - void readIntData (To* data, std::size_t size, std::istream& is, + template + void readIntData (To* data, std::size_t size, IS& is, const amrex::IntDescriptor& id) { From value; diff --git a/Src/Base/AMReX_NFiles.H b/Src/Base/AMReX_NFiles.H index bd1518dd44c..1dae632d1b9 100644 --- a/Src/Base/AMReX_NFiles.H +++ b/Src/Base/AMReX_NFiles.H @@ -3,6 +3,7 @@ #define BL_NFILES_H #include +#include #include #include #include @@ -27,6 +28,11 @@ class NFilesIter { public: +#if defined(AMREX_USE_OWN_FILE_STREAM) + using stream_type = amrex::FileStream; +#else + using stream_type = std::fstream; +#endif /** * \brief the nfiles will be named "filePrefix" + "00000" @@ -95,7 +101,7 @@ class NFilesIter NFilesIter &operator++(); - std::fstream &Stream() { return fileStream; } + stream_type &Stream() { return fileStream; } /** @@ -215,7 +221,7 @@ class NFilesIter std::string filePrefix; std::string fullFileName; VisMFBuffer::IO_Buffer io_buffer; - std::fstream fileStream; + stream_type fileStream; bool finishedWriting = false; bool isReading = false; bool finishedReading = false; diff --git a/Src/Base/AMReX_NFiles.cpp b/Src/Base/AMReX_NFiles.cpp index fd8e5fef60c..f06f9c56041 100644 --- a/Src/Base/AMReX_NFiles.cpp +++ b/Src/Base/AMReX_NFiles.cpp @@ -25,10 +25,14 @@ NFilesIter::NFilesIter(int noutfiles, std::string fileprefix, stWriteTag (ParallelDescriptor::SeqNum()), stReadTag (ParallelDescriptor::SeqNum()) { +#ifdef AMREX_USE_OWN_FILE_STREAM + amrex::ignore_unused(setBuf); +#else if(setBuf) { io_buffer.resize(VisMFBuffer::GetIOBufferSize()); fileStream.rdbuf()->pubsetbuf(io_buffer.dataPtr(), io_buffer.size()); } +#endif if(myProc == coordinatorProc) { // ---- make a static order @@ -187,10 +191,14 @@ NFilesIter::NFilesIter(std::string filename, finishedReading = false; } +#ifdef AMREX_USE_OWN_FILE_STREAM + amrex::ignore_unused(setBuf); +#else if(setBuf) { io_buffer.resize(VisMFBuffer::GetIOBufferSize()); fileStream.rdbuf()->pubsetbuf(io_buffer.dataPtr(), io_buffer.size()); } +#endif } diff --git a/Src/Base/AMReX_VectorIO.H b/Src/Base/AMReX_VectorIO.H index 3a59ff778a2..7e6250f78d0 100644 --- a/Src/Base/AMReX_VectorIO.H +++ b/Src/Base/AMReX_VectorIO.H @@ -6,7 +6,7 @@ #include #include -#include +#include namespace amrex { @@ -19,115 +19,235 @@ namespace amrex { //! IntDescriptor that describes the data format to use for writing. If no //! IntDescriptor is provided, the data will be written using the native //! format for your machine. - void writeIntData (const int* data, std::size_t size, std::ostream& os, - const IntDescriptor& id = FPC::NativeIntDescriptor()); + template + void writeIntData (const int* data, std::size_t size, OS& os, + const IntDescriptor& id = FPC::NativeIntDescriptor()) + { + if (id == FPC::NativeIntDescriptor()) + { + os.write((char*) data, static_cast(size*sizeof(int))); + } + else if (id.numBytes() == 2) + { + writeIntData(data, size, os, id); + } + else if (id.numBytes() == 4) + { + writeIntData(data, size, os, id); + } + else if (id.numBytes() == 8) + { + writeIntData(data, size, os, id); + } + else { + amrex::Error("Don't know how to work with this integer type."); + } + } //! Read int data from the istream. The arguments are a pointer to data buffer //! to read into, the size of that buffer, the istream, and an IntDescriptor //! that describes the format of the data on disk. //! The buffer is assumed to be large enough to store 'size' integers, and it is //! the user's reponsiblity to allocate this data. - void readIntData (int* data, std::size_t size, std::istream& is, const IntDescriptor& id); + template + void readIntData (int* data, std::size_t size, IS& is, const IntDescriptor& id) + { + if (id == FPC::NativeIntDescriptor()) + { + is.read((char*) data, static_cast(size * id.numBytes())); + } + else if (id.numBytes() == 2) + { + readIntData(data, size, is, id); + } + else if (id.numBytes() == 4) + { + readIntData(data, size, is, id); + } + else if (id.numBytes() == 8) + { + readIntData(data, size, is, id); + } + else { + amrex::Error("Don't know how to work with this integer type."); + } + } //! Write long data to the ostream. The arguments are a pointer to data //! to write, the size of the data buffer, the ostream, and an optional //! IntDescriptor that describes the data format to use for writing. If no //! IntDescriptor is provided, the data will be written using the native //! format for your machine. - void writeLongData (const Long* data, std::size_t size, std::ostream& os, - const IntDescriptor& id = FPC::NativeLongDescriptor()); + template + void writeLongData (const Long* data, std::size_t size, OS& os, + const IntDescriptor& id = FPC::NativeLongDescriptor()) + { + if (id == FPC::NativeLongDescriptor()) + { + os.write((char*) data, static_cast(size*sizeof(Long))); + } + else if (id.numBytes() == 2) + { + writeIntData(data, size, os, id); + } + else if (id.numBytes() == 4) + { + writeIntData(data, size, os, id); + } + else if (id.numBytes() == 8) + { + writeIntData(data, size, os, id); + } + else { + amrex::Error("Don't know how to work with this long type."); + } + } //! Read int data from the istream. The arguments are a pointer to data buffer //! to read into, the size of that buffer, the istream, and an IntDescriptor //! that describes the format of the data on disk. //! The buffer is assumed to be large enough to store 'size' longs, and it is //! the user's reponsiblity to allocate this data. - void readLongData (Long* data, std::size_t size, std::istream& is, const IntDescriptor& id); + template + void readLongData (Long* data, std::size_t size, IS& is, const IntDescriptor& id) + { + if (id == FPC::NativeLongDescriptor()) + { + is.read((char*) data, static_cast(size * id.numBytes())); + } + else if (id.numBytes() == 2) + { + readIntData(data, size, is, id); + } + else if (id.numBytes() == 4) + { + readIntData(data, size, is, id); + } + else if (id.numBytes() == 8) + { + readIntData(data, size, is, id); + } + else { + amrex::Error("Don't know how to work with this long type."); + } + } //! Write Real data to the ostream. The arguments are a pointer to data //! to write, the size of the data buffer, the ostream, and an optional //! RealDescriptor that describes the data format to use for writing. If no //! RealDescriptor is provided, the data will be written using the native //! format for your machine. - void writeRealData (const Real* data, std::size_t size, std::ostream& os, - const RealDescriptor& rd = FPC::NativeRealDescriptor()); + template + void writeRealData (const Real* data, std::size_t size, OS& os, + const RealDescriptor& rd = FPC::NativeRealDescriptor()) + { + RealDescriptor::convertFromNativeFormat(os, static_cast(size), data, rd); + } //! Read Real data from the istream. The arguments are a pointer to data buffer //! to read into, the size of that buffer, the istream, and a RealDescriptor //! that describes the format of the data on disk. //! The buffer is assumed to be large enough to store 'size' Reals, and it is //! the user's reponsiblity to allocate this data. - void readRealData (Real* data, std::size_t size, std::istream& is, - const RealDescriptor& rd); + template + void readRealData (Real* data, std::size_t size, IS& is, + const RealDescriptor& rd) + { + RealDescriptor::convertToNativeFormat(data, static_cast(size), is, rd); + } //! Write float data to the ostream. The arguments are a pointer to data //! to write, the size of the data buffer, the ostream, and an optional //! RealDescriptor that describes the data format to use for writing. If no //! RealDescriptor is provided, the data will be written using the native //! format for your machine. - void writeFloatData (const float* data, std::size_t size, std::ostream& os, - const RealDescriptor& rd = FPC::Native32RealDescriptor()); + template + void writeFloatData (const float* data, std::size_t size, OS& os, + const RealDescriptor& rd = FPC::Native32RealDescriptor()) + { + RealDescriptor::convertFromNativeFloatFormat(os, static_cast(size), data, rd); + } //! Read float data from the istream. The arguments are a pointer to data buffer //! to read into, the size of that buffer, the istream, and a RealDescriptor //! that describes the format of the data on disk. //! The buffer is assumed to be large enough to store 'size' Reals, and it is //! the user's reponsiblity to allocate this data. - void readFloatData(float* data, std::size_t size, std::istream& is, - const RealDescriptor& rd); + template + void readFloatData(float* data, std::size_t size, IS& is, + const RealDescriptor& rd) + { + RealDescriptor::convertToNativeFloatFormat(data, static_cast(size), is, rd); + } //! Write double data to the ostream. The arguments are a pointer to data //! to write, the size of the data buffer, the ostream, and an optional //! RealDescriptor that describes the data format to use for writing. If no //! RealDescriptor is provided, the data will be written using the native //! format for your machine. - void writeDoubleData (const double* data, std::size_t size, std::ostream& os, - const RealDescriptor& rd = FPC::Native64RealDescriptor()); + template + void writeDoubleData (const double* data, std::size_t size, OS& os, + const RealDescriptor& rd = FPC::Native64RealDescriptor()) + { + RealDescriptor::convertFromNativeDoubleFormat(os, static_cast(size), data, rd); + } //! Read double data from the istream. The arguments are a pointer to data buffer //! to read into, the size of that buffer, the istream, and a RealDescriptor //! that describes the format of the data on disk. //! The buffer is assumed to be large enough to store 'size' Reals, and it is //! the user's reponsiblity to allocate this data. - void readDoubleData (double* data, std::size_t size, std::istream& is, - const RealDescriptor& rd); + template + void readDoubleData (double* data, std::size_t size, IS& is, + const RealDescriptor& rd) + { + RealDescriptor::convertToNativeDoubleFormat(data, static_cast(size), is, rd); + } - inline void writeData (int const* data, std::size_t size, std::ostream& os) + template + void writeData (int const* data, std::size_t size, OS& os) { writeIntData(data, size, os); } - inline void writeData (Long const* data, std::size_t size, std::ostream& os) + template + void writeData (Long const* data, std::size_t size, OS& os) { writeLongData(data, size, os); } - inline void writeData (float const* data, std::size_t size, std::ostream& os) + template + void writeData (float const* data, std::size_t size, OS& os) { writeFloatData(data, size, os); } - inline void writeData (double const* data, std::size_t size, std::ostream& os) + template + void writeData (double const* data, std::size_t size, OS& os) { writeDoubleData(data, size, os); } - inline void readData (int * data, std::size_t size, std::istream& is) + template + void readData (int * data, std::size_t size, IS& is) { readIntData(data, size, is, FPC::NativeIntDescriptor()); } - inline void readData (Long * data, std::size_t size, std::istream& is) + template + void readData (Long * data, std::size_t size, IS& is) { readLongData(data, size, is, FPC::NativeLongDescriptor()); } - inline void readData (float * data, std::size_t size, std::istream& is) + template + void readData (float * data, std::size_t size, IS& is) { readFloatData(data, size, is, FPC::Native32RealDescriptor()); } - inline void readData (double * data, std::size_t size, std::istream& is) + template + void readData (double * data, std::size_t size, IS& is) { readDoubleData(data, size, is, FPC::Native64RealDescriptor()); } diff --git a/Src/Base/AMReX_VectorIO.cpp b/Src/Base/AMReX_VectorIO.cpp deleted file mode 100644 index 16e72a96cca..00000000000 --- a/Src/Base/AMReX_VectorIO.cpp +++ /dev/null @@ -1,136 +0,0 @@ -#include -#include - -using namespace amrex; - -void amrex::writeIntData (const int* data, std::size_t size, std::ostream& os, - const IntDescriptor& id) -{ - if (id == FPC::NativeIntDescriptor()) - { - os.write((char*) data, static_cast(size*sizeof(int))); - } - else if (id.numBytes() == 2) - { - writeIntData(data, size, os, id); - } - else if (id.numBytes() == 4) - { - writeIntData(data, size, os, id); - } - else if (id.numBytes() == 8) - { - writeIntData(data, size, os, id); - } - else { - amrex::Error("Don't know how to work with this integer type."); - } -} - -void amrex::readIntData (int* data, std::size_t size, std::istream& is, - const IntDescriptor& id) -{ - if (id == FPC::NativeIntDescriptor()) - { - is.read((char*) data, static_cast(size * id.numBytes())); - } - else if (id.numBytes() == 2) - { - readIntData(data, size, is, id); - } - else if (id.numBytes() == 4) - { - readIntData(data, size, is, id); - } - else if (id.numBytes() == 8) - { - readIntData(data, size, is, id); - } - else { - amrex::Error("Don't know how to work with this integer type."); - } -} - -void amrex::writeLongData (const Long* data, std::size_t size, std::ostream& os, - const IntDescriptor& id) -{ - if (id == FPC::NativeLongDescriptor()) - { - os.write((char*) data, static_cast(size*sizeof(Long))); - } - else if (id.numBytes() == 2) - { - writeIntData(data, size, os, id); - } - else if (id.numBytes() == 4) - { - writeIntData(data, size, os, id); - } - else if (id.numBytes() == 8) - { - writeIntData(data, size, os, id); - } - else { - amrex::Error("Don't know how to work with this long type."); - } -} - -void amrex::readLongData (Long* data, std::size_t size, std::istream& is, - const IntDescriptor& id) -{ - if (id == FPC::NativeLongDescriptor()) - { - is.read((char*) data, static_cast(size * id.numBytes())); - } - else if (id.numBytes() == 2) - { - readIntData(data, size, is, id); - } - else if (id.numBytes() == 4) - { - readIntData(data, size, is, id); - } - else if (id.numBytes() == 8) - { - readIntData(data, size, is, id); - } - else { - amrex::Error("Don't know how to work with this long type."); - } -} - -void amrex::writeRealData (const Real* data, std::size_t size, std::ostream& os, - const RealDescriptor& rd) -{ - RealDescriptor::convertFromNativeFormat(os, static_cast(size), data, rd); -} - -void amrex::readRealData (Real* data, std::size_t size, std::istream& is, - const RealDescriptor& rd) -{ - RealDescriptor::convertToNativeFormat(data, static_cast(size), is, rd); -} - -void amrex::writeFloatData (const float* data, std::size_t size, std::ostream& os, - const RealDescriptor& rd) -{ - RealDescriptor::convertFromNativeFloatFormat(os, static_cast(size), data, rd); -} - -void amrex::readFloatData (float* data, std::size_t size, std::istream& is, - const RealDescriptor& rd) -{ - RealDescriptor::convertToNativeFloatFormat(data, static_cast(size), is, rd); -} - -void amrex::writeDoubleData (const double* data, std::size_t size, std::ostream& os, - const RealDescriptor& rd) -{ - RealDescriptor::convertFromNativeDoubleFormat(os, static_cast(size), data, rd); -} - -void amrex::readDoubleData (double* data, std::size_t size, std::istream& is, - const RealDescriptor& rd) -{ - RealDescriptor::convertToNativeDoubleFormat(data, static_cast(size), is, rd); -} diff --git a/Src/Base/AMReX_VisMF.H b/Src/Base/AMReX_VisMF.H index fc77a8d81a3..01617a6ca1d 100644 --- a/Src/Base/AMReX_VisMF.H +++ b/Src/Base/AMReX_VisMF.H @@ -237,6 +237,9 @@ public: static bool Check (const std::string &name); //! The file offset of the passed ostream. static Long FileOffset (std::ostream& os); +#ifdef AMREX_USE_OWN_FILE_STREAM + static Long FileOffset (FileStream& os); +#endif //! Read the entire fab (all components). FArrayBox* readFAB (int idx, const std::string& mf_name); //! Read the specified fab component. diff --git a/Src/Base/AMReX_VisMF.cpp b/Src/Base/AMReX_VisMF.cpp index 17af7cd0791..57259cccfe1 100644 --- a/Src/Base/AMReX_VisMF.cpp +++ b/Src/Base/AMReX_VisMF.cpp @@ -1,4 +1,6 @@ +#include // xxxxx will remove this in the future + #include #include #include @@ -591,6 +593,15 @@ VisMF::FileOffset (std::ostream& os) return static_cast(os.tellp()); } +#ifdef AMREX_USE_OWN_FILE_STREAM +Long +VisMF::FileOffset (FileStream& os) +{ + os.seekp(0, std::ios::end); + return static_cast(os.tellp()); +} +#endif + FArrayBox* VisMF::readFAB (int idx, const std::string& mf_name) { diff --git a/Src/Base/CMakeLists.txt b/Src/Base/CMakeLists.txt index ae4cf2a5437..e501240503f 100644 --- a/Src/Base/CMakeLists.txt +++ b/Src/Base/CMakeLists.txt @@ -39,6 +39,8 @@ foreach(D IN LISTS AMReX_SPACEDIM) AMReX_String.cpp AMReX_Utility.H AMReX_Utility.cpp + AMReX_FileStream.H + AMReX_FileStream.cpp AMReX_FileSystem.H AMReX_FileSystem.cpp AMReX_ValLocPair.H @@ -97,7 +99,6 @@ foreach(D IN LISTS AMReX_SPACEDIM) AMReX_FPC.H AMReX_FPC.cpp AMReX_VectorIO.H - AMReX_VectorIO.cpp AMReX_Print.H AMReX_IntConv.H AMReX_IntConv.cpp diff --git a/Src/Base/Make.package b/Src/Base/Make.package index 3f2f0c5cb1a..ae66b91f819 100644 --- a/Src/Base/Make.package +++ b/Src/Base/Make.package @@ -35,6 +35,9 @@ C$(AMREX_BASE)_headers += AMReX_ParmParse.H AMReX_Utility.H AMReX_BLassert.H AMR C$(AMREX_BASE)_headers += AMReX_Functional.H AMReX_Reduce.H AMReX_Scan.H AMReX_Partition.H C$(AMREX_BASE)_headers += AMReX_ValLocPair.H +C$(AMREX_BASE)_headers += AMReX_FileStream.H +C$(AMREX_BASE)_sources += AMReX_FileStream.cpp + C$(AMREX_BASE)_headers += AMReX_FileSystem.H C$(AMREX_BASE)_sources += AMReX_FileSystem.cpp @@ -124,7 +127,7 @@ C$(AMREX_BASE)_headers += AMReX_ParReduce.H # I/O stuff. # C${AMREX_BASE}_headers += AMReX_ANSIEscCode.H AMReX_FabConv.H AMReX_FPC.H AMReX_Print.H AMReX_IntConv.H AMReX_VectorIO.H -C${AMREX_BASE}_sources += AMReX_FabConv.cpp AMReX_FPC.cpp AMReX_IntConv.cpp AMReX_VectorIO.cpp +C${AMREX_BASE}_sources += AMReX_FabConv.cpp AMReX_FPC.cpp AMReX_IntConv.cpp C${AMREX_BASE}_headers += AMReX_IOFormat.H # diff --git a/Src/Particle/AMReX_ParticleContainer.H b/Src/Particle/AMReX_ParticleContainer.H index 3baafe1d34e..9f6c128366c 100644 --- a/Src/Particle/AMReX_ParticleContainer.H +++ b/Src/Particle/AMReX_ParticleContainer.H @@ -725,7 +725,8 @@ public: * \param size The number of elements to write * \param os The ostream into which to write the data */ - void WriteParticleRealData (void* data, size_t size, std::ostream& os) const; + template + void WriteParticleRealData (void* data, size_t size, OS& os) const; /** * \brief Read a contiguous chunk of real particle data from an istream. @@ -735,7 +736,8 @@ public: * \param os The istream from which to read the data * \param rd A RealDescriptor describing the type of the floating point data */ - void ReadParticleRealData (void* data, size_t size, std::istream& is); + template + void ReadParticleRealData (void* data, size_t size, IS& is); /** * \brief Writes a particle checkpoint to file, suitable for restarting. @@ -1444,8 +1446,9 @@ protected: int lev_min = 0, int lev_max = -1, int local_grid=-1) const; public: + template void - WriteParticles (int level, std::ofstream& ofs, int fnum, + WriteParticles (int level, OS& ofs, int fnum, Vector& which, Vector& count, Vector& where, const Vector& write_real_comp, const Vector& write_int_comp, const Vector,IntVector>>& particle_io_flags, bool is_checkpoint) const; diff --git a/Src/Particle/AMReX_ParticleIO.H b/Src/Particle/AMReX_ParticleIO.H index 71b893d2af8..896a04f9779 100644 --- a/Src/Particle/AMReX_ParticleIO.H +++ b/Src/Particle/AMReX_ParticleIO.H @@ -9,9 +9,10 @@ namespace amrex { template class Allocator, class CellAssignor> +template void ParticleContainer_impl -::WriteParticleRealData (void* data, size_t size, std::ostream& os) const +::WriteParticleRealData (void* data, size_t size, OS& os) const { if (sizeof(typename ParticleType::RealType) == 4) { writeFloatData((float*) data, size, os, ParticleRealDescriptor); @@ -23,9 +24,10 @@ ParticleContainer_impl class Allocator, class CellAssignor> +template void ParticleContainer_impl -::ReadParticleRealData (void* data, size_t size, std::istream& is) +::ReadParticleRealData (void* data, size_t size, IS& is) { if (sizeof(typename ParticleType::RealType) == 4) { readFloatData((float*) data, size, is, ParticleRealDescriptor); @@ -569,9 +571,10 @@ ParticleContainer_impl class Allocator, class CellAssignor> +template void ParticleContainer_impl -::WriteParticles (int lev, std::ofstream& ofs, int fnum, +::WriteParticles (int lev, OS& ofs, int fnum, Vector& which, Vector& count, Vector& where, const Vector& write_real_comp, const Vector& write_int_comp, diff --git a/Src/Particle/AMReX_WriteBinaryParticleData.H b/Src/Particle/AMReX_WriteBinaryParticleData.H index 4a5117a1d29..9097da42f7a 100644 --- a/Src/Particle/AMReX_WriteBinaryParticleData.H +++ b/Src/Particle/AMReX_WriteBinaryParticleData.H @@ -605,7 +605,7 @@ void WriteBinaryParticleDataSync (PC const& pc, { for(NFilesIter nfi(nOutFiles, filePrefix, groupSets, setBuf); nfi.ReadyToWrite(); ++nfi) { - auto& myStream = (std::ofstream&) nfi.Stream(); + auto& myStream = nfi.Stream(); pc.WriteParticles(lev, myStream, nfi.FileNumber(), which, count, where, write_real_comp, write_int_comp, particle_io_flags, is_checkpoint); } diff --git a/Tools/CMake/AMReXConfig.cmake.in b/Tools/CMake/AMReXConfig.cmake.in index 96fb12cbf77..b8bea71c802 100644 --- a/Tools/CMake/AMReXConfig.cmake.in +++ b/Tools/CMake/AMReXConfig.cmake.in @@ -97,6 +97,7 @@ set(AMReX_ASSERTIONS_FOUND @AMReX_ASSERTIONS@) set(AMReX_FLATTEN_FOR_FOUND @AMReX_FLATTEN_FOR@) set(AMReX_COMPILER_DEFAULT_INLINE_FOUND @AMReX_COMPILER_DEFAULT_INLINE@) set(AMReX_INLINE_LIMIT_FOUND @AMReX_INLINE_LIMIT@) +set(AMReX_OWN_FILE_STREAM_FOUND @AMReX_OWN_FILE_STREAM@) # Profiling options set(AMReX_BASEP_FOUND @AMReX_BASE_PROFILE@) @@ -109,7 +110,6 @@ set(AMReX_PROFPARSER_FOUND @AMReX_PROFPARSER@) # Plotfile tools set(AMReX_PFTOOLS_FOUND @AMReX_PLOTFILE_TOOLS@) - # export the actual values as well. # General options set(AMReX_SPACEDIM @AMReX_SPACEDIM@) @@ -154,6 +154,7 @@ set(AMReX_FPE @AMReX_FPE@) set(AMReX_PIC @AMReX_PIC@) set(AMReX_ASSERTIONS @AMReX_ASSERTIONS@) set(AMReX_FLATTEN_FOR @AMReX_FLATTEN_FOR@) +set(AMReX_OWN_FILE_STREAM @AMReX_OWN_FILE_STREAM@) # Profiling options set(AMReX_BASE_PROFILE @AMReX_BASE_PROFILE@) diff --git a/Tools/CMake/AMReXOptions.cmake b/Tools/CMake/AMReXOptions.cmake index ef55a1053ca..adde6d9d727 100644 --- a/Tools/CMake/AMReXOptions.cmake +++ b/Tools/CMake/AMReXOptions.cmake @@ -399,6 +399,8 @@ else () set(AMReX_COMPILER_DEFAULT_INLINE ON) endif () +option( AMReX_OWN_FILE_STREAM "Use AMReX's own file stream for I/O" OFF ) +print_option( AMReX_OWN_FILE_STREAM ) if ( "${CMAKE_BUILD_TYPE}" MATCHES "Debug" ) option( AMReX_ASSERTIONS "Enable assertions" ON) diff --git a/Tools/CMake/AMReXSetDefines.cmake b/Tools/CMake/AMReXSetDefines.cmake index 3fb050385ec..080598d5b54 100644 --- a/Tools/CMake/AMReXSetDefines.cmake +++ b/Tools/CMake/AMReXSetDefines.cmake @@ -92,6 +92,9 @@ add_amrex_define( AMREX_USE_FLATTEN_FOR NO_LEGACY IF AMReX_FLATTEN_FOR ) # Bound checking add_amrex_define( AMREX_BOUND_CHECK NO_LEGACY IF AMReX_BOUND_CHECK ) +# amrex::FileStream +add_amrex_define( AMREX_USE_OWN_FILE_STREAM NO_LEGACY IF AMReX_OWN_FILE_STREAM ) + # Backtraces on macOS add_amrex_define( AMREX_EXPORT_DYNAMIC NO_LEGACY IF AMReX_EXPORT_DYNAMIC ) diff --git a/Tools/CMake/AMReX_Config_ND.H.in b/Tools/CMake/AMReX_Config_ND.H.in index 07e3b7fd637..505236c227f 100644 --- a/Tools/CMake/AMReX_Config_ND.H.in +++ b/Tools/CMake/AMReX_Config_ND.H.in @@ -34,6 +34,7 @@ #cmakedefine AMREX_USE_ASSERTION #cmakedefine AMREX_USE_FLATTEN_FOR #cmakedefine AMREX_BOUND_CHECK +#cmakedefine AMREX_USE_OWN_FILE_STREAM #cmakedefine AMREX_EXPORT_DYNAMIC #cmakedefine BL_FORT_USE_UNDERSCORE #cmakedefine BL_FORT_USE_LOWERCASE diff --git a/Tools/GNUMake/Make.defs b/Tools/GNUMake/Make.defs index ed3a1a4df92..e08be76cfbb 100644 --- a/Tools/GNUMake/Make.defs +++ b/Tools/GNUMake/Make.defs @@ -396,6 +396,16 @@ else USE_GPU_RDC := TRUE endif +ifdef USE_OWN_FILE_STREAM + USE_OWN_FILE_STREAM := $(strip $(USE_OWN_FILE_STREAM)) +else + USE_OWN_FILE_STREAM := FALSE +endif + +ifeq ($(USE_OWN_FILE_STREAM),TRUE) + DEFINES += -DAMREX_USE_OWN_FILE_STREAM +endif + build_time_begin := $(shell date +"%s") ALLOW_DIFFERENT_COMP ?= TRUE From 90c921e0ff0af33d8d57f474ef036284c1f363ba Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Thu, 3 Jul 2025 19:42:15 -0700 Subject: [PATCH 04/11] fix warning --- Src/Base/AMReX_FileStream.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Src/Base/AMReX_FileStream.cpp b/Src/Base/AMReX_FileStream.cpp index 49134acc7aa..2611b731d00 100644 --- a/Src/Base/AMReX_FileStream.cpp +++ b/Src/Base/AMReX_FileStream.cpp @@ -186,7 +186,7 @@ FileStream& FileStream::read (char* s, std::streamsize count) FileStream& FileStream::write (char const* s, std::streamsize count) { - if (count == 0) { return *this; } + if (count <= 0) { return *this; } if (m_fd == -1 || !m_good) { throw std::runtime_error("FileStream::write: bad file descriptor or bad state"); @@ -205,7 +205,7 @@ FileStream& FileStream::write (char const* s, std::streamsize count) } if (m_buffer_mode == BufferMode::None) { - if (count <= m_buffer_size) { + if (std::size_t(count) <= m_buffer_size) { fill_write_buffer(s, count); } else { file_write(s, count); @@ -227,7 +227,7 @@ FileStream& FileStream::write (char const* s, std::streamsize count) space_left = m_buffer_size; } - if ((m_buffer_mode == BufferMode::None) && (remaining > 2*m_buffer_size)) { + if ((m_buffer_mode == BufferMode::None) && (std::size_t(remaining) > 2*m_buffer_size)) { file_write(src, remaining); total_written += remaining; break; From b8dba91f8df6d47c91c10ae1c6b40c0ab00c0c3a Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Thu, 3 Jul 2025 19:51:22 -0700 Subject: [PATCH 05/11] fix windows --- Src/Base/AMReX_FabConv.H | 12 ++++++++++++ Src/Base/AMReX_FabConv.cpp | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/Src/Base/AMReX_FabConv.H b/Src/Base/AMReX_FabConv.H index 6d0093af4c6..382129a562c 100644 --- a/Src/Base/AMReX_FabConv.H +++ b/Src/Base/AMReX_FabConv.H @@ -183,10 +183,12 @@ public: Long nitems, std::istream& is, const RealDescriptor& id); +#ifndef _WIN32 static void convertToNativeFormat (Real* out, Long nitems, amrex::FileStream& is, const RealDescriptor& id); +#endif /** * \brief Convert nitems Reals in native format to RealDescriptor format @@ -196,10 +198,12 @@ public: Long nitems, const Real* in, const RealDescriptor& od); +#ifndef _WIN32 static void convertFromNativeFormat (amrex::FileStream& os, Long nitems, const Real* in, const RealDescriptor& od); +#endif /** * \brief Convert nitems Reals in native format to RealDescriptor format. * The out array is assumed to be large enough to hold the @@ -218,10 +222,12 @@ public: Long nitems, const float* in, const RealDescriptor& od); +#ifndef _WIN32 static void convertFromNativeFloatFormat (amrex::FileStream& os, Long nitems, const float* in, const RealDescriptor& od); +#endif /** * \brief Convert nitems doubles in native format to RealDescriptor format @@ -231,10 +237,12 @@ public: Long nitems, const double* in, const RealDescriptor& od); +#ifndef _WIN32 static void convertFromNativeDoubleFormat (amrex::FileStream& os, Long nitems, const double* in, const RealDescriptor& od); +#endif /** * \brief Read nitems from istream in RealDescriptor format and @@ -245,10 +253,12 @@ public: Long nitems, std::istream& is, const RealDescriptor& id); +#ifndef _WIN32 static void convertToNativeFloatFormat (float* out, Long nitems, amrex::FileStream& is, const RealDescriptor& id); +#endif /** * \brief Read nitems from istream in RealDescriptor format and @@ -259,10 +269,12 @@ public: Long nitems, std::istream& is, const RealDescriptor& id); +#ifndef _WIN32 static void convertToNativeDoubleFormat (double* out, Long nitems, amrex::FileStream& is, const RealDescriptor& id); +#endif private: diff --git a/Src/Base/AMReX_FabConv.cpp b/Src/Base/AMReX_FabConv.cpp index 5f303a436fb..dfce59d3940 100644 --- a/Src/Base/AMReX_FabConv.cpp +++ b/Src/Base/AMReX_FabConv.cpp @@ -1034,6 +1034,7 @@ RealDescriptor::convertToNativeFormat (Real* out, delete [] bufr; } +#ifndef _WIN32 void RealDescriptor::convertToNativeFormat (Real* out, Long nitems, @@ -1066,6 +1067,7 @@ RealDescriptor::convertToNativeFormat (Real* out, amrex::Error("convert(Real*,Long,FileStream&,RealDescriptor&) failed"); } } +#endif // // Convert nitems Reals in native format to RealDescriptor format. @@ -1129,6 +1131,7 @@ RealDescriptor::convertFromNativeFormat (std::ostream& os, } } +#ifndef _WIN32 void RealDescriptor::convertFromNativeFormat (amrex::FileStream& os, Long nitems, @@ -1152,6 +1155,7 @@ RealDescriptor::convertFromNativeFormat (amrex::FileStream& os, in += put; } } +#endif // // Convert nitems floats in native format to RealDescriptor format @@ -1195,6 +1199,7 @@ RealDescriptor::convertFromNativeFloatFormat (std::ostream& os, } } +#ifndef _WIN32 void RealDescriptor::convertFromNativeFloatFormat (amrex::FileStream& os, Long nitems, @@ -1218,6 +1223,7 @@ RealDescriptor::convertFromNativeFloatFormat (amrex::FileStream& os, in += put; } } +#endif // // Convert nitems doubles in native format to RealDescriptor format @@ -1261,6 +1267,7 @@ RealDescriptor::convertFromNativeDoubleFormat (std::ostream& os, } } +#ifndef _WIN32 void RealDescriptor::convertFromNativeDoubleFormat (amrex::FileStream& os, Long nitems, @@ -1284,6 +1291,7 @@ RealDescriptor::convertFromNativeDoubleFormat (amrex::FileStream& os, in += put; } } +#endif // // Read nitems from istream in RealDescriptor format to native float format. @@ -1327,6 +1335,7 @@ RealDescriptor::convertToNativeFloatFormat (float* out, delete [] bufr; } +#ifndef _WIN32 void RealDescriptor::convertToNativeFloatFormat (float* out, Long nitems, @@ -1359,6 +1368,7 @@ RealDescriptor::convertToNativeFloatFormat (float* out, amrex::Error("convert(Real*,Long,FileStream&,RealDescriptor&) failed"); } } +#endif // // Read nitems from istream in RealDescriptor format to native double format. @@ -1402,6 +1412,7 @@ RealDescriptor::convertToNativeDoubleFormat (double* out, delete [] bufr; } +#ifndef _WIN32 void RealDescriptor::convertToNativeDoubleFormat (double* out, Long nitems, @@ -1434,5 +1445,6 @@ RealDescriptor::convertToNativeDoubleFormat (double* out, amrex::Error("convert(Real*,Long,FileStream&,RealDescriptor&) failed"); } } +#endif } From 2a89b7c5377b507ccba7c389bde5f30cedf4a861 Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Thu, 3 Jul 2025 20:17:00 -0700 Subject: [PATCH 06/11] No trunc for inout --- Src/Base/AMReX_FileStream.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Src/Base/AMReX_FileStream.cpp b/Src/Base/AMReX_FileStream.cpp index 2611b731d00..13b744302f5 100644 --- a/Src/Base/AMReX_FileStream.cpp +++ b/Src/Base/AMReX_FileStream.cpp @@ -72,7 +72,9 @@ void FileStream::open (char const* filename, std::ios_base::openmode mode) } if (mode & std::ios_base::app) { flags |= O_APPEND; - } else if ((mode & std::ios_base::trunc) || (mode & std::ios_base::out)) { + } else if (mode & std::ios_base::trunc) { + flags |= O_TRUNC; + } else if ((mode & std::ios_base::out) && !(mode & std::ios_base::in)) { flags |= O_TRUNC; } m_binary = (mode & std::ios_base::binary); From f8b3ccaf469ce400b9dd46999c211848704a7a8c Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Thu, 3 Jul 2025 21:13:38 -0700 Subject: [PATCH 07/11] fix warning --- Src/Base/AMReX_FileStream.H | 20 +++++------ Src/Base/AMReX_FileStream.cpp | 62 +++++++++++++++++------------------ 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/Src/Base/AMReX_FileStream.H b/Src/Base/AMReX_FileStream.H index bab9996414b..ba98ea99197 100644 --- a/Src/Base/AMReX_FileStream.H +++ b/Src/Base/AMReX_FileStream.H @@ -36,12 +36,12 @@ public: = std::ios_base::in | std::ios_base::out); FileStream () = default; - FileStream (FileStream&& other); + FileStream (FileStream&& rhs) noexcept; FileStream (FileStream const& rhs) = delete; ~FileStream (); - FileStream& operator= (FileStream&& other); + FileStream& operator= (FileStream&& rhs) noexcept; FileStream& operator= (FileStream const& rhs) = delete; void open (char const* filename, @@ -65,9 +65,9 @@ public: FileStream& seekg (pos_type off); FileStream& seekg (off_type off, std::ios_base::seekdir dir); - pos_type tellp (); + [[nodiscard]] pos_type tellp () const; - pos_type tellg (); + [[nodiscard]] pos_type tellg () const; [[nodiscard]] bool good () const; @@ -85,14 +85,14 @@ private: bool m_binary = true; // not used std::unique_ptr m_buffer; - std::size_t m_buffer_size = 32*1024*1024; // 32MB - std::size_t m_buffer_begin = 0; - std::size_t m_buffer_end = 0; + Long m_buffer_size = 32*1024*1024; // 32MB + Long m_buffer_begin = 0; + Long m_buffer_end = 0; enum class BufferMode { Read, Write, None}; BufferMode m_buffer_mode = BufferMode::None; template - void execute_with_retry (F&& func, std::string const& op_name) + void execute_with_retry (F const& func, std::string const& op_name) { int attempt = 0; double delay = m_base_delay; @@ -133,11 +133,11 @@ private: throw std::runtime_error(op_name + " failed: " + std::strerror(errno)); } - void file_write (char const* s, std::size_t count); + void file_write (char const* s, Long count); void flush_write_buffer (); - void fill_write_buffer (char const* s, std::size_t count); + void fill_write_buffer (char const* s, Long count); void fill_read_buffer (); }; diff --git a/Src/Base/AMReX_FileStream.cpp b/Src/Base/AMReX_FileStream.cpp index 13b744302f5..4f0bca43e71 100644 --- a/Src/Base/AMReX_FileStream.cpp +++ b/Src/Base/AMReX_FileStream.cpp @@ -18,21 +18,21 @@ FileStream::FileStream (std::string const& filename, std::ios_base::openmode mod open(filename.c_str(), mode); } -FileStream::FileStream (FileStream&& other) - : m_base_delay (other.m_base_delay), - m_pos (other.m_pos), - m_max_retries (other.m_max_retries), - m_fd (std::exchange(other.m_fd,-1)), - m_good (std::exchange(other.m_good,false)), - m_binary (other.m_binary), - m_buffer (std::exchange(other.m_buffer,nullptr)), - m_buffer_size (other.m_buffer_size), - m_buffer_begin (other.m_buffer_begin), - m_buffer_end (other.m_buffer_end), - m_buffer_mode (other.m_buffer_mode) +FileStream::FileStream (FileStream&& rhs) noexcept + : m_base_delay (rhs.m_base_delay), + m_pos (rhs.m_pos), + m_max_retries (rhs.m_max_retries), + m_fd (std::exchange(rhs.m_fd,-1)), + m_good (std::exchange(rhs.m_good,false)), + m_binary (rhs.m_binary), + m_buffer (std::exchange(rhs.m_buffer,nullptr)), + m_buffer_size (rhs.m_buffer_size), + m_buffer_begin (rhs.m_buffer_begin), + m_buffer_end (rhs.m_buffer_end), + m_buffer_mode (rhs.m_buffer_mode) {} -FileStream& FileStream::operator= (FileStream&& rhs) +FileStream& FileStream::operator= (FileStream&& rhs) noexcept { if (this != &rhs) { std::swap(m_base_delay , rhs.m_base_delay); @@ -62,22 +62,22 @@ FileStream::~FileStream () void FileStream::open (char const* filename, std::ios_base::openmode mode) { int flags = 0; - if ((mode & std::ios_base::in) && - (mode & std::ios_base::out)) { + if ((mode & std::ios_base::in ) != 0 && + (mode & std::ios_base::out) != 0) { flags |= O_RDWR | O_CREAT; - } else if (mode & std::ios_base::in) { + } else if ((mode & std::ios_base::in) != 0) { flags |= O_RDONLY; - } else if (mode & std::ios_base::out) { + } else if ((mode & std::ios_base::out) != 0) { flags |= O_WRONLY | O_CREAT; } - if (mode & std::ios_base::app) { + if ((mode & std::ios_base::app) != 0) { flags |= O_APPEND; - } else if (mode & std::ios_base::trunc) { + } else if ((mode & std::ios_base::trunc) != 0) { flags |= O_TRUNC; - } else if ((mode & std::ios_base::out) && !(mode & std::ios_base::in)) { + } else if (((mode & std::ios_base::out) != 0) && ((mode & std::ios_base::in) == 0)) { flags |= O_TRUNC; } - m_binary = (mode & std::ios_base::binary); + m_binary = (mode & std::ios_base::binary) != 0; mode_t mod = 0666; execute_with_retry([&]() { @@ -170,7 +170,7 @@ FileStream& FileStream::read (char* s, std::streamsize count) if (m_buffer_end == m_buffer_begin) { break; } // EOF auto available = m_buffer_end - m_buffer_begin; - auto to_copy = std::min(available, std::size_t(count-total_read)); + auto to_copy = std::min(available, count-total_read); std::memcpy(s + total_read, m_buffer.get() + m_buffer_begin, to_copy); m_buffer_begin += to_copy; @@ -207,7 +207,7 @@ FileStream& FileStream::write (char const* s, std::streamsize count) } if (m_buffer_mode == BufferMode::None) { - if (std::size_t(count) <= m_buffer_size) { + if (count <= m_buffer_size) { fill_write_buffer(s, count); } else { file_write(s, count); @@ -218,7 +218,7 @@ FileStream& FileStream::write (char const* s, std::streamsize count) while (total_written < count) { std::streamsize remaining = count - total_written; - std::size_t space_left = m_buffer_size - m_buffer_end; + auto space_left = m_buffer_size - m_buffer_end; if (space_left == 0) { try { flush_write_buffer(); // this sets buffer mode to None @@ -229,12 +229,12 @@ FileStream& FileStream::write (char const* s, std::streamsize count) space_left = m_buffer_size; } - if ((m_buffer_mode == BufferMode::None) && (std::size_t(remaining) > 2*m_buffer_size)) { + if ((m_buffer_mode == BufferMode::None) && (remaining > 2*m_buffer_size)) { file_write(src, remaining); total_written += remaining; break; } else { - auto to_buffer = std::min(std::size_t(remaining), space_left); + auto to_buffer = std::min(remaining, space_left); fill_write_buffer(src, to_buffer); src += to_buffer; total_written += to_buffer; @@ -323,12 +323,12 @@ FileStream& FileStream::seekg (off_type off, std::ios_base::seekdir dir) return seekp(off,dir); } -FileStream::pos_type FileStream::tellp () +FileStream::pos_type FileStream::tellp () const { return m_pos; } -FileStream::pos_type FileStream::tellg () +FileStream::pos_type FileStream::tellg () const { return m_pos; } @@ -348,7 +348,7 @@ bool FileStream::fail () const return !m_good || m_fd == -1; } -void FileStream::file_write (char const* s, std::size_t count) +void FileStream::file_write (char const* s, Long count) { if (count == 0) { return; } @@ -356,7 +356,7 @@ void FileStream::file_write (char const* s, std::size_t count) throw std::runtime_error("FileStream::file_write: bad file descriptor or bad state"); } - size_t total_written = 0; + Long total_written = 0; while (total_written < count) { ssize_t nbytes_written = -1; execute_with_retry([&]() { @@ -383,7 +383,7 @@ void FileStream::flush_write_buffer () m_buffer_end = 0; } -void FileStream::fill_write_buffer (char const* s, std::size_t count) +void FileStream::fill_write_buffer (char const* s, Long count) { std::memcpy(m_buffer.get()+m_buffer_end, s, count); m_buffer_mode = BufferMode::Write; From 46ec90c1623751263423b59ec67644b8b6b4caf5 Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Thu, 3 Jul 2025 21:37:26 -0700 Subject: [PATCH 08/11] fix warning --- Src/Base/AMReX_FileStream.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Src/Base/AMReX_FileStream.cpp b/Src/Base/AMReX_FileStream.cpp index 4f0bca43e71..9673babb4ea 100644 --- a/Src/Base/AMReX_FileStream.cpp +++ b/Src/Base/AMReX_FileStream.cpp @@ -92,7 +92,7 @@ void FileStream::open (char const* filename, std::ios_base::openmode mode) } }, "File open"); - if (m_good && (mode & std::ios_base::ate) && !(mode & std::ios_base::app)) { + if (m_good && ((mode & std::ios_base::ate) != 0) && ((mode & std::ios_base::app) == 0)) { execute_with_retry([&]() { auto end_pos = ::lseek(m_fd, 0, SEEK_END); if (end_pos >= 0) { @@ -231,7 +231,7 @@ FileStream& FileStream::write (char const* s, std::streamsize count) if ((m_buffer_mode == BufferMode::None) && (remaining > 2*m_buffer_size)) { file_write(src, remaining); - total_written += remaining; + // no need to update total_written due to break break; } else { auto to_buffer = std::min(remaining, space_left); From 2b849b2acb8128cc27014015397d6bc107e7a3a9 Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Fri, 4 Jul 2025 16:41:20 -0700 Subject: [PATCH 09/11] Make sure we always write to the end in app mode --- Src/Base/AMReX_FileStream.H | 2 ++ Src/Base/AMReX_FileStream.cpp | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/Src/Base/AMReX_FileStream.H b/Src/Base/AMReX_FileStream.H index ba98ea99197..693957d8c8e 100644 --- a/Src/Base/AMReX_FileStream.H +++ b/Src/Base/AMReX_FileStream.H @@ -79,10 +79,12 @@ private: double m_base_delay = 0.1; off_t m_pos = 0; + off_t m_size = 0; int m_max_retries = 16; int m_fd = -1; bool m_good = false; bool m_binary = true; // not used + bool m_append = false; std::unique_ptr m_buffer; Long m_buffer_size = 32*1024*1024; // 32MB diff --git a/Src/Base/AMReX_FileStream.cpp b/Src/Base/AMReX_FileStream.cpp index 9673babb4ea..960ec22bdd4 100644 --- a/Src/Base/AMReX_FileStream.cpp +++ b/Src/Base/AMReX_FileStream.cpp @@ -3,6 +3,7 @@ #ifndef _WIN32 #include +#include #include namespace amrex @@ -21,10 +22,12 @@ FileStream::FileStream (std::string const& filename, std::ios_base::openmode mod FileStream::FileStream (FileStream&& rhs) noexcept : m_base_delay (rhs.m_base_delay), m_pos (rhs.m_pos), + m_size (rhs.m_size), m_max_retries (rhs.m_max_retries), m_fd (std::exchange(rhs.m_fd,-1)), m_good (std::exchange(rhs.m_good,false)), m_binary (rhs.m_binary), + m_append (rhs.m_append), m_buffer (std::exchange(rhs.m_buffer,nullptr)), m_buffer_size (rhs.m_buffer_size), m_buffer_begin (rhs.m_buffer_begin), @@ -37,9 +40,11 @@ FileStream& FileStream::operator= (FileStream&& rhs) noexcept if (this != &rhs) { std::swap(m_base_delay , rhs.m_base_delay); std::swap(m_pos , rhs.m_pos); + std::swap(m_size , rhs.m_size); std::swap(m_max_retries , rhs.m_max_retries); std::swap(m_good , rhs.m_good); std::swap(m_binary , rhs.m_binary); + std::swap(m_append , rhs.m_append); std::swap(m_fd , rhs.m_fd); std::swap(m_buffer , rhs.m_buffer); std::swap(m_buffer_size , rhs.m_buffer_size); @@ -78,6 +83,7 @@ void FileStream::open (char const* filename, std::ios_base::openmode mode) flags |= O_TRUNC; } m_binary = (mode & std::ios_base::binary) != 0; + m_append = (mode & std::ios_base::app) != 0; mode_t mod = 0666; execute_with_retry([&]() { @@ -92,17 +98,18 @@ void FileStream::open (char const* filename, std::ios_base::openmode mode) } }, "File open"); - if (m_good && ((mode & std::ios_base::ate) != 0) && ((mode & std::ios_base::app) == 0)) { + if (m_good && (((mode & std::ios_base::ate) != 0) || m_append)) { execute_with_retry([&]() { auto end_pos = ::lseek(m_fd, 0, SEEK_END); if (end_pos >= 0) { m_pos = end_pos; + m_size = end_pos; return true; } else { m_good = false; return false; } - }, "Seek to end in ate mode"); + }, "Seek to end in ate or app mode"); } if (m_buffer == nullptr) { @@ -194,6 +201,10 @@ FileStream& FileStream::write (char const* s, std::streamsize count) throw std::runtime_error("FileStream::write: bad file descriptor or bad state"); } + if (m_append && (m_pos != m_size)) { + this->seekp(0, std::ios_base::end); + } + if (m_buffer_mode == BufferMode::Read) { if (m_buffer_begin < m_buffer_end) { off_type unread = m_buffer_end - m_buffer_begin; @@ -243,6 +254,7 @@ FileStream& FileStream::write (char const* s, std::streamsize count) } m_pos += count; + m_size += count; return *this; } From eae54f52070612a187afee02d4fe1703b4713829 Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Fri, 4 Jul 2025 17:15:16 -0700 Subject: [PATCH 10/11] add assert for file size --- Src/Base/AMReX_NFiles.cpp | 129 +++++++++++++++++++++----------------- 1 file changed, 71 insertions(+), 58 deletions(-) diff --git a/Src/Base/AMReX_NFiles.cpp b/Src/Base/AMReX_NFiles.cpp index f06f9c56041..2ca11c03e72 100644 --- a/Src/Base/AMReX_NFiles.cpp +++ b/Src/Base/AMReX_NFiles.cpp @@ -72,6 +72,10 @@ NFilesIter::NFilesIter(int noutfiles, std::string fileprefix, void NFilesIter::SetDynamic(int deciderproc) { +#if defined(AMREX_USE_OWN_FILE_STREAM) + amrex::ignore_unused(deciderproc); + AMREX_ALWAYS_ASSERT(useStaticSetSelection); // disable dynamic selection for now +#else deciderProc = deciderproc; // ---- we have to check currentDeciderIndex here also in case of // ---- different nfiles for plots and checkpoints @@ -112,6 +116,7 @@ void NFilesIter::SetDynamic(int deciderproc) fileNumbersWriteOrder.clear(); fileNumbersWriteOrder.resize(nOutFiles); } +#endif } @@ -221,50 +226,56 @@ bool NFilesIter::ReadyToWrite(bool appendFirst) { if(useSparseFPP) { - if(mySparseFileNumber != -1) { - if( ! appendFirst) { - fileStream.open(fullFileName.c_str(), - std::ios::out | std::ios::trunc | std::ios::binary); + if(mySparseFileNumber != -1) { + if( ! appendFirst) { + fileStream.open(fullFileName.c_str(), + std::ios::out | std::ios::trunc | std::ios::binary); + } else { + fileStream.open(fullFileName.c_str(), + std::ios::out | std::ios::app | std::ios::binary); + } + if( ! fileStream.good()) { + amrex::FileOpenFailed(fullFileName); + } + return true; } else { - fileStream.open(fullFileName.c_str(), - std::ios::out | std::ios::app | std::ios::binary); + return false; } - if( ! fileStream.good()) { - amrex::FileOpenFailed(fullFileName); - } - return true; - } else { - return false; - } } else { // ---- the general static set selection - for(int iSet(0); iSet < nSets; ++iSet) { - if(mySetPosition == iSet) { - if(iSet == 0 && ! appendFirst) { // ---- first set - fileStream.open(fullFileName.c_str(), - std::ios::out | std::ios::trunc | std::ios::binary); - } else { - fileStream.open(fullFileName.c_str(), - std::ios::out | std::ios::app | std::ios::binary); - } - if( ! fileStream.good()) { - amrex::FileOpenFailed(fullFileName); - } - return true; - } + static Long file_size = -1; + + for(int iSet(0); iSet < nSets; ++iSet) { + if(mySetPosition == iSet) { + if(iSet == 0 && ! appendFirst) { // ---- first set + fileStream.open(fullFileName.c_str(), + std::ios::out | std::ios::trunc | std::ios::binary); + } else { + fileStream.open(fullFileName.c_str(), + std::ios::out | std::ios::app | std::ios::binary); + fileStream.seekg(0, std::ios_base::end); + AMREX_ALWAYS_ASSERT_WITH_MESSAGE + (file_size == Long(fileStream.tellg()), + "File size in incorrect"); + } + if( ! fileStream.good()) { + amrex::FileOpenFailed(fullFileName); + } + return true; + } - if(mySetPosition == (iSet + 1)) { // ---- next set waits - int iBuff, waitForPID(-1); - if(groupSets) { - waitForPID = (myProc - nOutFiles); - } else { - waitForPID = (myProc - 1); + if(mySetPosition == (iSet + 1)) { // ---- next set waits + int waitForPID(-1); + if(groupSets) { + waitForPID = (myProc - nOutFiles); + } else { + waitForPID = (myProc - 1); + } + ParallelDescriptor::Recv(&file_size, 1, waitForPID, stWriteTag); + } } - ParallelDescriptor::Recv(&iBuff, 1, waitForPID, stWriteTag); - } - } } } else { // ---- use dynamic set selection @@ -367,32 +378,34 @@ NFilesIter &NFilesIter::operator++() { if(useStaticSetSelection) { - if(useSparseFPP) { + if(useSparseFPP) { - if(mySparseFileNumber != -1) { - fileStream.flush(); - fileStream.close(); - } - finishedWriting = true; + if(mySparseFileNumber != -1) { + fileStream.flush(); + fileStream.close(); + } + finishedWriting = true; - } else { // ---- the general static set selection + } else { // ---- the general static set selection - fileStream.flush(); - fileStream.close(); + fileStream.flush(); + fileStream.seekg(0, std::ios_base::end); + auto file_size = (Long)fileStream.tellg(); + fileStream.close(); - int iBuff(0), wakeUpPID(-1); - if(groupSets) { - wakeUpPID = (myProc + nOutFiles); - } else { - wakeUpPID = (myProc + 1); - } - if(wakeUpPID < nProcs) { - int nextSP = WhichSetPosition(wakeUpPID, nProcs, nOutFiles, groupSets); - if(nextSP > mySetPosition) { - ParallelDescriptor::Send(&iBuff, 1, wakeUpPID, stWriteTag); - } - } - finishedWriting = true; + int wakeUpPID(-1); + if(groupSets) { + wakeUpPID = (myProc + nOutFiles); + } else { + wakeUpPID = (myProc + 1); + } + if(wakeUpPID < nProcs) { + int nextSP = WhichSetPosition(wakeUpPID, nProcs, nOutFiles, groupSets); + if(nextSP > mySetPosition) { + ParallelDescriptor::Send(&file_size, 1, wakeUpPID, stWriteTag); + } + } + finishedWriting = true; } From c214e381d9bd584a8465b81c7af456f865df00ab Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Fri, 4 Jul 2025 22:03:30 -0700 Subject: [PATCH 11/11] white spaces --- Src/Base/AMReX_NFiles.H | 118 +++--- Src/Base/AMReX_NFiles.cpp | 805 +++++++++++++++++++------------------- 2 files changed, 461 insertions(+), 462 deletions(-) diff --git a/Src/Base/AMReX_NFiles.H b/Src/Base/AMReX_NFiles.H index 1dae632d1b9..657ed5117f1 100644 --- a/Src/Base/AMReX_NFiles.H +++ b/Src/Base/AMReX_NFiles.H @@ -26,7 +26,7 @@ namespace amrex { */ class NFilesIter { - public: +public: #if defined(AMREX_USE_OWN_FILE_STREAM) using stream_type = amrex::FileStream; @@ -46,8 +46,8 @@ class NFilesIter * \param groupsets * \param setBuf */ - NFilesIter(int noutfiles, std::string fileprefix, - bool groupsets, bool setBuf); + NFilesIter (int noutfiles, std::string fileprefix, + bool groupsets, bool setBuf); /** @@ -56,8 +56,8 @@ class NFilesIter * * \param deciderproc */ - void SetDynamic(int deciderproc = -1); - [[nodiscard]] bool GetDynamic() const { return ( ! useStaticSetSelection); } + void SetDynamic (int deciderproc = -1); + [[nodiscard]] bool GetDynamic () const { return ( ! useStaticSetSelection); } /** @@ -67,8 +67,8 @@ class NFilesIter * * \param &ranksToWrite */ - void SetSparseFPP(const Vector &ranksToWrite); - [[nodiscard]] bool GetSparseFPP() const { return useSparseFPP; } + void SetSparseFPP (const Vector &ranksToWrite); + [[nodiscard]] bool GetSparseFPP () const { return useSparseFPP; } /** @@ -78,11 +78,11 @@ class NFilesIter * \param &readRanks * \param setBuf */ - NFilesIter(std::string fileName, - Vector readRanks, - bool setBuf = false); + NFilesIter (std::string fileName, + Vector readRanks, + bool setBuf = false); - ~NFilesIter(); + ~NFilesIter (); NFilesIter (NFilesIter const&) = delete; NFilesIter (NFilesIter &&) = delete; @@ -95,35 +95,35 @@ class NFilesIter * * \param appendFirst */ - bool ReadyToWrite(bool appendFirst = false); + bool ReadyToWrite (bool appendFirst = false); - bool ReadyToRead(); + bool ReadyToRead (); - NFilesIter &operator++(); + NFilesIter &operator++ (); - stream_type &Stream() { return fileStream; } + stream_type &Stream () { return fileStream; } /** * \brief get the current Stream()'s seek position */ - std::streampos SeekPos(); + std::streampos SeekPos (); - static int LengthOfSet(int nProcs, int nOutFiles) { - int anf(ActualNFiles(nOutFiles)); - return ((nProcs + (anf - 1)) / anf); + static int LengthOfSet (int nProcs, int nOutFiles) { + int anf(ActualNFiles(nOutFiles)); + return ((nProcs + (anf - 1)) / anf); } - static int WhichSetPosition(int whichproc, int nprocs, - int noutfiles, bool groupsets) + static int WhichSetPosition (int whichproc, int nprocs, + int noutfiles, bool groupsets) { - int whichset; - if(groupsets) { - whichset = whichproc / noutfiles; - } else { - whichset = whichproc % LengthOfSet(nprocs, noutfiles); - } - return whichset; + int whichset; + if(groupsets) { + whichset = whichproc / noutfiles; + } else { + whichset = whichproc % LengthOfSet(nprocs, noutfiles); + } + return whichset; } @@ -133,9 +133,9 @@ class NFilesIter * * \param nOutFiles */ - static int ActualNFiles(int nOutFiles) + static int ActualNFiles (int nOutFiles) { - return( std::max(1, std::min(ParallelDescriptor::NProcs(), nOutFiles)) ); + return( std::max(1, std::min(ParallelDescriptor::NProcs(), nOutFiles)) ); } @@ -147,69 +147,69 @@ class NFilesIter * \param nOutFiles * \param groupSets */ - static bool CheckNFiles(int nProcs, int nOutFiles, bool groupSets); + static bool CheckNFiles (int nProcs, int nOutFiles, bool groupSets); - static int FileNumber(int nOutFiles, int whichProc, bool groupSets) + static int FileNumber (int nOutFiles, int whichProc, bool groupSets) { - BL_ASSERT(whichProc >= 0 && whichProc < ParallelDescriptor::NProcs()); + BL_ASSERT(whichProc >= 0 && whichProc < ParallelDescriptor::NProcs()); - int anf(ActualNFiles(nOutFiles)); - if(groupSets) { - return(whichProc % anf); - } else { + int anf(ActualNFiles(nOutFiles)); + if(groupSets) { + return(whichProc % anf); + } else { - int nProcs(ParallelDescriptor::NProcs()); - return(whichProc / LengthOfSet(nProcs, anf)); - } + int nProcs(ParallelDescriptor::NProcs()); + return(whichProc / LengthOfSet(nProcs, anf)); + } } - [[nodiscard]] const std::string &FileName() const { return fullFileName; } - [[nodiscard]] int FileNumber() const { return fileNumber; } + [[nodiscard]] const std::string &FileName () const { return fullFileName; } + [[nodiscard]] int FileNumber () const { return fileNumber; } - static std::string FileName(int nOutFiles, - const std::string &filePrefix, - int whichProc, bool groupSets) + static std::string FileName (int nOutFiles, + const std::string &filePrefix, + int whichProc, bool groupSets) { - BL_ASSERT(whichProc >= 0 && whichProc < ParallelDescriptor::NProcs()); + BL_ASSERT(whichProc >= 0 && whichProc < ParallelDescriptor::NProcs()); - return ( amrex::Concatenate(filePrefix, - FileNumber(ActualNFiles(nOutFiles), - whichProc, groupSets), minDigits ) ); + return ( amrex::Concatenate(filePrefix, + FileNumber(ActualNFiles(nOutFiles), + whichProc, groupSets), minDigits ) ); } - static std::string FileName(int fileNumber, - const std::string &filePrefix) + static std::string FileName (int fileNumber, + const std::string &filePrefix) { - return ( amrex::Concatenate(filePrefix, fileNumber, minDigits ) ); + return ( amrex::Concatenate(filePrefix, fileNumber, minDigits ) ); } /** * \brief this is the processor coordinating dynamic set selection */ - [[nodiscard]] int CoordinatorProc() const { return coordinatorProc; } + [[nodiscard]] int CoordinatorProc () const { return coordinatorProc; } /** * \brief these are the file numbers to which each rank wrote [rank] * a rank only writes to one file */ - Vector FileNumbersWritten(); + Vector FileNumbersWritten (); /** * \brief these are the order of ranks which wrote to each file * [filenumber][ranks in order they wrote to filenumber] */ - [[nodiscard]] const Vector< Vector > &FileNumbersWriteOrder() const { return fileNumbersWriteOrder; } + [[nodiscard]] const Vector< Vector > &FileNumbersWriteOrder () const { return fileNumbersWriteOrder; } - void CleanUpMessages(); + void CleanUpMessages (); - static int GetMinDigits() { return minDigits; } + static int GetMinDigits () { return minDigits; } - static void SetMinDigits(int md) { minDigits = md; } + static void SetMinDigits (int md) { minDigits = md; } - private: +private: int myProc = -1; int nProcs = -1; diff --git a/Src/Base/AMReX_NFiles.cpp b/Src/Base/AMReX_NFiles.cpp index 2ca11c03e72..2998eef2654 100644 --- a/Src/Base/AMReX_NFiles.cpp +++ b/Src/Base/AMReX_NFiles.cpp @@ -10,8 +10,8 @@ int NFilesIter::currentDeciderIndex(-1); int NFilesIter::minDigits(5); -NFilesIter::NFilesIter(int noutfiles, std::string fileprefix, - bool groupsets, bool setBuf) +NFilesIter::NFilesIter (int noutfiles, std::string fileprefix, + bool groupsets, bool setBuf) : myProc (ParallelDescriptor::MyProc()), nProcs (ParallelDescriptor::NProcs()), nOutFiles (ActualNFiles(noutfiles)), @@ -26,153 +26,153 @@ NFilesIter::NFilesIter(int noutfiles, std::string fileprefix, stReadTag (ParallelDescriptor::SeqNum()) { #ifdef AMREX_USE_OWN_FILE_STREAM - amrex::ignore_unused(setBuf); + amrex::ignore_unused(setBuf); #else - if(setBuf) { - io_buffer.resize(VisMFBuffer::GetIOBufferSize()); - fileStream.rdbuf()->pubsetbuf(io_buffer.dataPtr(), io_buffer.size()); - } + if(setBuf) { + io_buffer.resize(VisMFBuffer::GetIOBufferSize()); + fileStream.rdbuf()->pubsetbuf(io_buffer.dataPtr(), io_buffer.size()); + } #endif - if(myProc == coordinatorProc) { - // ---- make a static order - fileNumbersWriteOrder.resize(nOutFiles); - for(int i(0); i < nProcs; ++i) { - fileNumbersWriteOrder[FileNumber(nOutFiles, i, groupSets)].push_back(i); + if(myProc == coordinatorProc) { + // ---- make a static order + fileNumbersWriteOrder.resize(nOutFiles); + for(int i(0); i < nProcs; ++i) { + fileNumbersWriteOrder[FileNumber(nOutFiles, i, groupSets)].push_back(i); + } } - } - - availableDeciders.resize(0); - availableDeciders.reserve(nProcs); - setZeroProcs.reserve(nOutFiles); - for(int i(0); i < nProcs; ++i) { - // ---- count zero set positions and find an alternate decider - if(NFilesIter::WhichSetPosition(i, nProcs, nOutFiles, groupSets) == 0) { - setZeroProcs.push_back(i); - } else { - availableDeciders.push_back(i); + + availableDeciders.resize(0); + availableDeciders.reserve(nProcs); + setZeroProcs.reserve(nOutFiles); + for(int i(0); i < nProcs; ++i) { + // ---- count zero set positions and find an alternate decider + if(NFilesIter::WhichSetPosition(i, nProcs, nOutFiles, groupSets) == 0) { + setZeroProcs.push_back(i); + } else { + availableDeciders.push_back(i); + } } - } - if(currentDeciderIndex < 0) { - currentDeciderIndex = nSets / 2; - if(currentDeciderIndex >= availableDeciders.size()) { - currentDeciderIndex = 0; + if(currentDeciderIndex < 0) { + currentDeciderIndex = nSets / 2; + if(currentDeciderIndex >= availableDeciders.size()) { + currentDeciderIndex = 0; + } } - } #if 0 - bool checkNFiles(false); - if(checkNFiles) { - CheckNFiles(nProcs, nOutFiles, groupSets); - } + bool checkNFiles(false); + if(checkNFiles) { + CheckNFiles(nProcs, nOutFiles, groupSets); + } #endif } -void NFilesIter::SetDynamic(int deciderproc) +void NFilesIter::SetDynamic (int deciderproc) { #if defined(AMREX_USE_OWN_FILE_STREAM) amrex::ignore_unused(deciderproc); AMREX_ALWAYS_ASSERT(useStaticSetSelection); // disable dynamic selection for now #else - deciderProc = deciderproc; - // ---- we have to check currentDeciderIndex here also in case of - // ---- different nfiles for plots and checkpoints - if(currentDeciderIndex >= availableDeciders.size() || currentDeciderIndex < 0) { - currentDeciderIndex = 0; - } - if( ! availableDeciders.empty()) { - if(deciderProc < 0 || deciderProc >= nProcs) { - deciderProc = availableDeciders[currentDeciderIndex]; + deciderProc = deciderproc; + // ---- we have to check currentDeciderIndex here also in case of + // ---- different nfiles for plots and checkpoints + if(currentDeciderIndex >= availableDeciders.size() || currentDeciderIndex < 0) { + currentDeciderIndex = 0; } - if(NFilesIter::WhichSetPosition(deciderProc, nProcs, nOutFiles, groupSets) == 0) { - // ---- the decider cannot have set position zero - deciderProc = availableDeciders[currentDeciderIndex]; + if( ! availableDeciders.empty()) { + if(deciderProc < 0 || deciderProc >= nProcs) { + deciderProc = availableDeciders[currentDeciderIndex]; + } + if(NFilesIter::WhichSetPosition(deciderProc, nProcs, nOutFiles, groupSets) == 0) { + // ---- the decider cannot have set position zero + deciderProc = availableDeciders[currentDeciderIndex]; + } + } + currentDeciderIndex += nSets - 1; + if(currentDeciderIndex >= availableDeciders.size() || currentDeciderIndex < 0) { + currentDeciderIndex = 0; } - } - currentDeciderIndex += nSets - 1; - if(currentDeciderIndex >= availableDeciders.size() || currentDeciderIndex < 0) { - currentDeciderIndex = 0; - } #if 0 - // The following has no effect because WhichSetPostion is a pure function and - // its return type is not used. So not sure why this is here in the first place. - if(myProc == deciderProc) { - NFilesIter::WhichSetPosition(myProc, nProcs, nOutFiles, groupSets); - } + // The following has no effect because WhichSetPostion is a pure function and + // its return type is not used. So not sure why this is here in the first place. + if(myProc == deciderProc) { + NFilesIter::WhichSetPosition(myProc, nProcs, nOutFiles, groupSets); + } #endif - deciderTag = ParallelDescriptor::SeqNum(); - coordinatorTag = ParallelDescriptor::SeqNum(); - doneTag = ParallelDescriptor::SeqNum(); - writeTag = ParallelDescriptor::SeqNum(); - remainingWriters = nProcs; - useStaticSetSelection = false; - if(nOutFiles == nProcs) { - useStaticSetSelection = true; - coordinatorProc = ParallelDescriptor::IOProcessorNumber(); - } else { - fileNumbersWriteOrder.clear(); - fileNumbersWriteOrder.resize(nOutFiles); - } + deciderTag = ParallelDescriptor::SeqNum(); + coordinatorTag = ParallelDescriptor::SeqNum(); + doneTag = ParallelDescriptor::SeqNum(); + writeTag = ParallelDescriptor::SeqNum(); + remainingWriters = nProcs; + useStaticSetSelection = false; + if(nOutFiles == nProcs) { + useStaticSetSelection = true; + coordinatorProc = ParallelDescriptor::IOProcessorNumber(); + } else { + fileNumbersWriteOrder.clear(); + fileNumbersWriteOrder.resize(nOutFiles); + } #endif } -void NFilesIter::SetSparseFPP(const Vector &ranksToWrite) +void NFilesIter::SetSparseFPP (const Vector &ranksToWrite) { - if(ranksToWrite.empty()) { - return; - } - if(ranksToWrite.size() > nProcs) { - amrex::Abort("**** Error in NFilesIter::SetSparseFPP: ranksToWrite.size() > nProcs."); - } - - sparseWritingRanks = ranksToWrite; - - // ---- do more error checking here - // ---- ranks in range, is dynamic on already - mySparseFileNumber = -1; - for(int r : ranksToWrite) { - if(r < 0 || r >= nProcs) { - amrex::Abort("**** Error in NFilesIter::SetSparseFPP: rank out of range."); + if(ranksToWrite.empty()) { + return; + } + if(ranksToWrite.size() > nProcs) { + amrex::Abort("**** Error in NFilesIter::SetSparseFPP: ranksToWrite.size() > nProcs."); } - if(r == myProc) { - if(mySparseFileNumber == -1) { - mySparseFileNumber = myProc; - } else { - amrex::Abort("**** Error in NFilesIter::SetSparseFPP: ranksToWrite not unique."); - } + + sparseWritingRanks = ranksToWrite; + + // ---- do more error checking here + // ---- ranks in range, is dynamic on already + mySparseFileNumber = -1; + for(int r : ranksToWrite) { + if(r < 0 || r >= nProcs) { + amrex::Abort("**** Error in NFilesIter::SetSparseFPP: rank out of range."); + } + if(r == myProc) { + if(mySparseFileNumber == -1) { + mySparseFileNumber = myProc; + } else { + amrex::Abort("**** Error in NFilesIter::SetSparseFPP: ranksToWrite not unique."); + } + } } - } - nOutFiles = static_cast(ranksToWrite.size()); + nOutFiles = static_cast(ranksToWrite.size()); - if(myProc == coordinatorProc) { - // ---- get the write order from ranksToWrite - fileNumbersWriteOrder.clear(); - fileNumbersWriteOrder.resize(nOutFiles); - for(int i(0); i < fileNumbersWriteOrder.size(); ++i) { - fileNumbersWriteOrder[i].push_back(ranksToWrite[i]); + if(myProc == coordinatorProc) { + // ---- get the write order from ranksToWrite + fileNumbersWriteOrder.clear(); + fileNumbersWriteOrder.resize(nOutFiles); + for(int i(0); i < fileNumbersWriteOrder.size(); ++i) { + fileNumbersWriteOrder[i].push_back(ranksToWrite[i]); + } } - } - if(mySparseFileNumber != -1) { - fileNumber = mySparseFileNumber; - fullFileName = FileName(fileNumber, filePrefix); - } else { - fullFileName = "fullFileNameUndefined"; - } + if(mySparseFileNumber != -1) { + fileNumber = mySparseFileNumber; + fullFileName = FileName(fileNumber, filePrefix); + } else { + fullFileName = "fullFileNameUndefined"; + } - useSparseFPP = true; - useStaticSetSelection = true; + useSparseFPP = true; + useStaticSetSelection = true; } -NFilesIter::NFilesIter(std::string filename, - Vector readranks, - bool setBuf) +NFilesIter::NFilesIter (std::string filename, + Vector readranks, + bool setBuf) : myProc (ParallelDescriptor::MyProc()), nProcs (ParallelDescriptor::NProcs()), fullFileName (std::move(filename)), @@ -180,410 +180,409 @@ NFilesIter::NFilesIter(std::string filename, readRanks (std::move(readranks)), myReadIndex (indexUndefined) { - for(int i(0); i < readRanks.size(); ++i) { - if(myProc == readRanks[i]) { - if(myReadIndex != indexUndefined) { - amrex::Abort("**** Error in NFilesIter: readRanks not unique."); - } - myReadIndex = i; + for(int i(0); i < readRanks.size(); ++i) { + if(myProc == readRanks[i]) { + if(myReadIndex != indexUndefined) { + amrex::Abort("**** Error in NFilesIter: readRanks not unique."); + } + myReadIndex = i; + } } - } - if(myReadIndex == indexUndefined) { // ---- nothing to read - finishedReading = true; - return; - } else { - finishedReading = false; - } + if(myReadIndex == indexUndefined) { // ---- nothing to read + finishedReading = true; + return; + } else { + finishedReading = false; + } #ifdef AMREX_USE_OWN_FILE_STREAM - amrex::ignore_unused(setBuf); + amrex::ignore_unused(setBuf); #else - if(setBuf) { - io_buffer.resize(VisMFBuffer::GetIOBufferSize()); - fileStream.rdbuf()->pubsetbuf(io_buffer.dataPtr(), io_buffer.size()); - } + if(setBuf) { + io_buffer.resize(VisMFBuffer::GetIOBufferSize()); + fileStream.rdbuf()->pubsetbuf(io_buffer.dataPtr(), io_buffer.size()); + } #endif } -NFilesIter::~NFilesIter() { - if( ! useStaticSetSelection) { - CleanUpMessages(); - } +NFilesIter::~NFilesIter () { + if( ! useStaticSetSelection) { + CleanUpMessages(); + } } -bool NFilesIter::ReadyToWrite(bool appendFirst) { +bool NFilesIter::ReadyToWrite (bool appendFirst) { #ifdef BL_USE_MPI - if(finishedWriting) { - return false; - } - - if(useStaticSetSelection) { - - if(useSparseFPP) { - - if(mySparseFileNumber != -1) { - if( ! appendFirst) { - fileStream.open(fullFileName.c_str(), - std::ios::out | std::ios::trunc | std::ios::binary); - } else { - fileStream.open(fullFileName.c_str(), - std::ios::out | std::ios::app | std::ios::binary); - } - if( ! fileStream.good()) { - amrex::FileOpenFailed(fullFileName); - } - return true; - } else { - return false; - } - + if(finishedWriting) { + return false; + } - } else { // ---- the general static set selection + if(useStaticSetSelection) { - static Long file_size = -1; + if(useSparseFPP) { - for(int iSet(0); iSet < nSets; ++iSet) { - if(mySetPosition == iSet) { - if(iSet == 0 && ! appendFirst) { // ---- first set + if(mySparseFileNumber != -1) { + if( ! appendFirst) { fileStream.open(fullFileName.c_str(), std::ios::out | std::ios::trunc | std::ios::binary); } else { fileStream.open(fullFileName.c_str(), std::ios::out | std::ios::app | std::ios::binary); - fileStream.seekg(0, std::ios_base::end); - AMREX_ALWAYS_ASSERT_WITH_MESSAGE - (file_size == Long(fileStream.tellg()), - "File size in incorrect"); } if( ! fileStream.good()) { amrex::FileOpenFailed(fullFileName); } return true; + } else { + return false; } - if(mySetPosition == (iSet + 1)) { // ---- next set waits - int waitForPID(-1); - if(groupSets) { - waitForPID = (myProc - nOutFiles); - } else { - waitForPID = (myProc - 1); + } else { // ---- the general static set selection + + static Long file_size = -1; + + for(int iSet(0); iSet < nSets; ++iSet) { + if(mySetPosition == iSet) { + if(iSet == 0 && ! appendFirst) { // ---- first set + fileStream.open(fullFileName.c_str(), + std::ios::out | std::ios::trunc | std::ios::binary); + } else { + fileStream.open(fullFileName.c_str(), + std::ios::out | std::ios::app | std::ios::binary); + fileStream.seekg(0, std::ios_base::end); + AMREX_ALWAYS_ASSERT_WITH_MESSAGE + (file_size == Long(fileStream.tellg()), + "File size in incorrect"); + } + if( ! fileStream.good()) { + amrex::FileOpenFailed(fullFileName); + } + return true; + } + + if(mySetPosition == (iSet + 1)) { // ---- next set waits + int waitForPID(-1); + if(groupSets) { + waitForPID = (myProc - nOutFiles); + } else { + waitForPID = (myProc - 1); + } + ParallelDescriptor::Recv(&file_size, 1, waitForPID, stWriteTag); } - ParallelDescriptor::Recv(&file_size, 1, waitForPID, stWriteTag); } } - } - } else { // ---- use dynamic set selection + } else { // ---- use dynamic set selection - if(mySetPosition == 0) { // ---- return true, ready to write data + if(mySetPosition == 0) { // ---- return true, ready to write data - fullFileName = amrex::Concatenate(filePrefix, fileNumber, minDigits); - if(appendFirst) { - fileStream.open(fullFileName.c_str(), - std::ios::out | std::ios::app | std::ios::binary); - } else { - fileStream.open(fullFileName.c_str(), - std::ios::out | std::ios::trunc | std::ios::binary); - } - if( ! fileStream.good()) { - amrex::FileOpenFailed(fullFileName); - } - return true; - - } else if(myProc == deciderProc) { // ---- this proc decides who decides - - BL_PROFILE("NFI::ReadyToWrite:decider"); - // ---- the first message received is the coordinator - ParallelDescriptor::Recv(&coordinatorProc, 1, MPI_ANY_SOURCE, deciderTag); - for(int setZeroProc : setZeroProcs) { // ---- tell the set zero ranks who is coordinating - ParallelDescriptor::Send(&coordinatorProc, 1, setZeroProc, coordinatorTag); - } - unreadMessages.push_back(std::make_pair(deciderTag, setZeroProcs.size() - 1)); - } + fullFileName = amrex::Concatenate(filePrefix, fileNumber, minDigits); + if(appendFirst) { + fileStream.open(fullFileName.c_str(), + std::ios::out | std::ios::app | std::ios::binary); + } else { + fileStream.open(fullFileName.c_str(), + std::ios::out | std::ios::trunc | std::ios::binary); + } + if( ! fileStream.good()) { + amrex::FileOpenFailed(fullFileName); + } + return true; - // ---- these are the rest of the procs who need to write - if( ! finishedWriting) { // ---- the deciderProc drops through to here - // ---- wait for signal to start writing - ParallelDescriptor::Message rmess = - ParallelDescriptor::Recv(&fileNumber, 1, MPI_ANY_SOURCE, writeTag); - coordinatorProc = rmess.pid(); - fullFileName = amrex::Concatenate(filePrefix, fileNumber, minDigits); - - fileStream.open(fullFileName.c_str(), - std::ios::out | std::ios::app | std::ios::binary); - if( ! fileStream.good()) { - amrex::FileOpenFailed(fullFileName); - } - return true; + } else if(myProc == deciderProc) { // ---- this proc decides who decides - } + BL_PROFILE("NFI::ReadyToWrite:decider"); + // ---- the first message received is the coordinator + ParallelDescriptor::Recv(&coordinatorProc, 1, MPI_ANY_SOURCE, deciderTag); + for(int setZeroProc : setZeroProcs) { // ---- tell the set zero ranks who is coordinating + ParallelDescriptor::Send(&coordinatorProc, 1, setZeroProc, coordinatorTag); + } + unreadMessages.push_back(std::make_pair(deciderTag, setZeroProcs.size() - 1)); + } - } - return false; + // ---- these are the rest of the procs who need to write + if( ! finishedWriting) { // ---- the deciderProc drops through to here + // ---- wait for signal to start writing + ParallelDescriptor::Message rmess = + ParallelDescriptor::Recv(&fileNumber, 1, MPI_ANY_SOURCE, writeTag); + coordinatorProc = rmess.pid(); + fullFileName = amrex::Concatenate(filePrefix, fileNumber, minDigits); -#else - amrex::ignore_unused(appendFirst); - if(finishedWriting) { + fileStream.open(fullFileName.c_str(), + std::ios::out | std::ios::app | std::ios::binary); + if( ! fileStream.good()) { + amrex::FileOpenFailed(fullFileName); + } + return true; + + } + + } return false; - } - fileStream.open(fullFileName.c_str(), - std::ios::out | std::ios::trunc | std::ios::binary); - if( ! fileStream.good()) { - amrex::FileOpenFailed(fullFileName); - } - return true; + +#else + amrex::ignore_unused(appendFirst); + if(finishedWriting) { + return false; + } + fileStream.open(fullFileName.c_str(), + std::ios::out | std::ios::trunc | std::ios::binary); + if( ! fileStream.good()) { + amrex::FileOpenFailed(fullFileName); + } + return true; #endif } -bool NFilesIter::ReadyToRead() { +bool NFilesIter::ReadyToRead () { - if(finishedReading) { - return false; - } - - if(myReadIndex != 0) { // ---- wait for rank myReadIndex - 1 - int iBuff(-1), waitForPID(readRanks[myReadIndex - 1]); - ParallelDescriptor::Recv(&iBuff, 1, waitForPID, stReadTag); - } - - fileStream.open(fullFileName.c_str(), - std::ios::in | std::ios::binary); - if( ! fileStream.good()) { - amrex::FileOpenFailed(fullFileName); - } - return true; + if(finishedReading) { + return false; + } + + if(myReadIndex != 0) { // ---- wait for rank myReadIndex - 1 + int iBuff(-1), waitForPID(readRanks[myReadIndex - 1]); + ParallelDescriptor::Recv(&iBuff, 1, waitForPID, stReadTag); + } + + fileStream.open(fullFileName.c_str(), + std::ios::in | std::ios::binary); + if( ! fileStream.good()) { + amrex::FileOpenFailed(fullFileName); + } + return true; } -NFilesIter &NFilesIter::operator++() { +NFilesIter &NFilesIter::operator++ () { #ifdef BL_USE_MPI - if(isReading) { - fileStream.close(); + if(isReading) { + fileStream.close(); - if(myReadIndex < readRanks.size() - 1) { - int iBuff(0), wakeUpPID(readRanks[myReadIndex + 1]); - ParallelDescriptor::Send(&iBuff, 1, wakeUpPID, stReadTag); - } - finishedReading = true; + if(myReadIndex < readRanks.size() - 1) { + int iBuff(0), wakeUpPID(readRanks[myReadIndex + 1]); + ParallelDescriptor::Send(&iBuff, 1, wakeUpPID, stReadTag); + } + finishedReading = true; - } else { // ---- writing + } else { // ---- writing - if(useStaticSetSelection) { + if(useStaticSetSelection) { - if(useSparseFPP) { + if(useSparseFPP) { + + if(mySparseFileNumber != -1) { + fileStream.flush(); + fileStream.close(); + } + finishedWriting = true; + + } else { // ---- the general static set selection - if(mySparseFileNumber != -1) { fileStream.flush(); + fileStream.seekg(0, std::ios_base::end); + auto file_size = (Long)fileStream.tellg(); fileStream.close(); + + int wakeUpPID(-1); + if(groupSets) { + wakeUpPID = (myProc + nOutFiles); + } else { + wakeUpPID = (myProc + 1); + } + if(wakeUpPID < nProcs) { + int nextSP = WhichSetPosition(wakeUpPID, nProcs, nOutFiles, groupSets); + if(nextSP > mySetPosition) { + ParallelDescriptor::Send(&file_size, 1, wakeUpPID, stWriteTag); + } + } + finishedWriting = true; + } - finishedWriting = true; - } else { // ---- the general static set selection + } else { // ---- use dynamic set selection - fileStream.flush(); - fileStream.seekg(0, std::ios_base::end); - auto file_size = (Long)fileStream.tellg(); - fileStream.close(); + if(mySetPosition == 0) { // ---- write data - int wakeUpPID(-1); - if(groupSets) { - wakeUpPID = (myProc + nOutFiles); - } else { - wakeUpPID = (myProc + 1); - } - if(wakeUpPID < nProcs) { - int nextSP = WhichSetPosition(wakeUpPID, nProcs, nOutFiles, groupSets); - if(nextSP > mySetPosition) { - ParallelDescriptor::Send(&file_size, 1, wakeUpPID, stWriteTag); - } - } - finishedWriting = true; + fileStream.flush(); + fileStream.close(); + finishedWriting = true; + + // ---- tell the decider we are done + ParallelDescriptor::Send(&myProc, 1, deciderProc, deciderTag); + + // ---- wait to find out who will coordinate + ParallelDescriptor::Recv(&coordinatorProc, 1, deciderProc, coordinatorTag); + + if(myProc == coordinatorProc) { + Vector > procsToWrite(nOutFiles); // ---- [fileNumber](procsToWriteToFileNumber) + // ---- populate with the static nfiles sets + for(int i(0); i < nProcs; ++i) { + int procSet(WhichSetPosition(i, nProcs, nOutFiles, groupSets)); + int whichFileNumber(NFilesIter::FileNumber(nOutFiles, i, groupSets)); + // ---- procSet == 0 have already written their data + if(procSet == 0) { + fileNumbersWriteOrder[whichFileNumber].push_back(i); + --remainingWriters; + } + if(procSet != 0) { + procsToWrite[whichFileNumber].push_back(i); + } + } + + // ---- signal each remaining processor when to write and to which file + std::set availableFileNumbers; + availableFileNumbers.insert(fileNumber); // ---- the coordinators file number + + // ---- recv incoming available files + while(remainingWriters > 0) { + + int nextProcToWrite(-1), nextFileNumberToWrite, nextFileNumberAvailable; + auto ait = availableFileNumbers.begin(); + nextFileNumberToWrite = *ait; + availableFileNumbers.erase(nextFileNumberToWrite); + + for(int nfn(0); nfn < procsToWrite.size(); ++nfn) { + // ---- start with the current next file number + // ---- get a proc from another file number if the queue is empty + auto tempNFN = static_cast((nextFileNumberToWrite + nfn) % procsToWrite.size()); + if(!procsToWrite[tempNFN].empty()) { + nextProcToWrite = procsToWrite[tempNFN].front(); + procsToWrite[tempNFN].pop_front(); + break; // ---- found one + } + } + if(nextProcToWrite == -1) { + --remainingWriters; +// amrex::Print() << myProc << "::IOIOIOIO: nptw == -1 rW = " << remainingWriters << '\n'; + } else { - } + fileNumbersWriteOrder[nextFileNumberToWrite].push_back(nextProcToWrite); - } else { // ---- use dynamic set selection + ParallelDescriptor::Send(&nextFileNumberToWrite, 1, nextProcToWrite, writeTag); - if(mySetPosition == 0) { // ---- write data + ParallelDescriptor::Recv(&nextFileNumberAvailable, 1, MPI_ANY_SOURCE, doneTag); + availableFileNumbers.insert(nextFileNumberAvailable); + --remainingWriters; + } + } + unreadMessages.push_back(std::make_pair(doneTag, setZeroProcs.size() - 1)); - fileStream.flush(); - fileStream.close(); - finishedWriting = true; + } else { // ---- tell the coordinatorProc we are done writing + ParallelDescriptor::Send(&fileNumber, 1, coordinatorProc, doneTag); + } - // ---- tell the decider we are done - ParallelDescriptor::Send(&myProc, 1, deciderProc, deciderTag); - - // ---- wait to find out who will coordinate - ParallelDescriptor::Recv(&coordinatorProc, 1, deciderProc, coordinatorTag); - - if(myProc == coordinatorProc) { - Vector > procsToWrite(nOutFiles); // ---- [fileNumber](procsToWriteToFileNumber) - // ---- populate with the static nfiles sets - for(int i(0); i < nProcs; ++i) { - int procSet(WhichSetPosition(i, nProcs, nOutFiles, groupSets)); - int whichFileNumber(NFilesIter::FileNumber(nOutFiles, i, groupSets)); - // ---- procSet == 0 have already written their data - if(procSet == 0) { - fileNumbersWriteOrder[whichFileNumber].push_back(i); - --remainingWriters; } - if(procSet != 0) { - procsToWrite[whichFileNumber].push_back(i); - } - } - - // ---- signal each remaining processor when to write and to which file - std::set availableFileNumbers; - availableFileNumbers.insert(fileNumber); // ---- the coordinators file number - - // ---- recv incoming available files - while(remainingWriters > 0) { - - int nextProcToWrite(-1), nextFileNumberToWrite, nextFileNumberAvailable; - auto ait = availableFileNumbers.begin(); - nextFileNumberToWrite = *ait; - availableFileNumbers.erase(nextFileNumberToWrite); - - for(int nfn(0); nfn < procsToWrite.size(); ++nfn) { - // ---- start with the current next file number - // ---- get a proc from another file number if the queue is empty - auto tempNFN = static_cast((nextFileNumberToWrite + nfn) % procsToWrite.size()); - if(!procsToWrite[tempNFN].empty()) { - nextProcToWrite = procsToWrite[tempNFN].front(); - procsToWrite[tempNFN].pop_front(); - break; // ---- found one - } - } - if(nextProcToWrite == -1) { - --remainingWriters; -// amrex::Print() << myProc << "::IOIOIOIO: nptw == -1 rW = " << remainingWriters << '\n'; - } else { - - fileNumbersWriteOrder[nextFileNumberToWrite].push_back(nextProcToWrite); - ParallelDescriptor::Send(&nextFileNumberToWrite, 1, nextProcToWrite, writeTag); + if( ! finishedWriting) { // ---- the deciderProc drops through to here + fileStream.flush(); + fileStream.close(); + finishedWriting = true; - ParallelDescriptor::Recv(&nextFileNumberAvailable, 1, MPI_ANY_SOURCE, doneTag); - availableFileNumbers.insert(nextFileNumberAvailable); - --remainingWriters; + // ---- signal we are finished + ParallelDescriptor::Send(&fileNumber, 1, coordinatorProc, doneTag); } - } - unreadMessages.push_back(std::make_pair(doneTag, setZeroProcs.size() - 1)); - } else { // ---- tell the coordinatorProc we are done writing - ParallelDescriptor::Send(&fileNumber, 1, coordinatorProc, doneTag); } - } + } - if( ! finishedWriting) { // ---- the deciderProc drops through to here +#else + if(isReading) { + fileStream.close(); + finishedReading = true; + } else { // ---- writing fileStream.flush(); fileStream.close(); finishedWriting = true; - - // ---- signal we are finished - ParallelDescriptor::Send(&fileNumber, 1, coordinatorProc, doneTag); - } - } - - } - -#else - if(isReading) { - fileStream.close(); - finishedReading = true; - } else { // ---- writing - fileStream.flush(); - fileStream.close(); - finishedWriting = true; - } #endif - return *this; + return *this; } -std::streampos NFilesIter::SeekPos() { - return fileStream.tellp(); +std::streampos NFilesIter::SeekPos () { + return fileStream.tellp(); } -bool NFilesIter::CheckNFiles(int nProcs, int nOutFiles, bool groupSets) +bool NFilesIter::CheckNFiles (int nProcs, int nOutFiles, bool groupSets) { - if(ParallelDescriptor::IOProcessor()) { - std::set fileNumbers; - for(int i(0); i < nProcs; ++i) { - fileNumbers.insert(FileNumber(nOutFiles, i, groupSets)); - } + if(ParallelDescriptor::IOProcessor()) { + std::set fileNumbers; + for(int i(0); i < nProcs; ++i) { + fileNumbers.insert(FileNumber(nOutFiles, i, groupSets)); + } // amrex::Print() << "nOutFiles fileNumbers.size() = " << nOutFiles // << " " << fileNumbers.size() << '\n'; - if(nOutFiles != static_cast(fileNumbers.size())) { + if(nOutFiles != static_cast(fileNumbers.size())) { // amrex::Print() << "**** Different number of files." << '\n'; - return false; + return false; + } } - } - return true; + return true; } -Vector NFilesIter::FileNumbersWritten() +Vector NFilesIter::FileNumbersWritten () { - Vector fileNumbersWritten(nProcs, -1); + Vector fileNumbersWritten(nProcs, -1); - if(myProc == coordinatorProc) { + if(myProc == coordinatorProc) { #if 0 - int total(0); - std::set procSet; - for(int f(0); f < fileNumbersWriteOrder.size(); ++f) { - total += fileNumbersWriteOrder[f].size(); - for(int r(0); r < fileNumbersWriteOrder[f].size(); ++r) { - procSet.insert(fileNumbersWriteOrder[f][r]); - } - } - if(total != nProcs || static_cast(procSet.size()) != nProcs) { - amrex::AllPrint() << "**** Error in NFilesIter::FileNumbersWritten(): " - << " coordinatorProc nProcs total procSet.size() = " - << coordinatorProc << " " << nProcs << " " - << total << " " << procSet.size() << '\n'; - } + int total(0); + std::set procSet; + for(int f(0); f < fileNumbersWriteOrder.size(); ++f) { + total += fileNumbersWriteOrder[f].size(); + for(int r(0); r < fileNumbersWriteOrder[f].size(); ++r) { + procSet.insert(fileNumbersWriteOrder[f][r]); + } + } + if(total != nProcs || static_cast(procSet.size()) != nProcs) { + amrex::AllPrint() << "**** Error in NFilesIter::FileNumbersWritten(): " + << " coordinatorProc nProcs total procSet.size() = " + << coordinatorProc << " " << nProcs << " " + << total << " " << procSet.size() << '\n'; + } #endif - for(int f(0); f < fileNumbersWriteOrder.size(); ++f) { - for(int r(0); r < fileNumbersWriteOrder[f].size(); ++r) { - fileNumbersWritten[fileNumbersWriteOrder[f][r]] = f; - } - } + for(int f(0); f < fileNumbersWriteOrder.size(); ++f) { + for(int r(0); r < fileNumbersWriteOrder[f].size(); ++r) { + fileNumbersWritten[fileNumbersWriteOrder[f][r]] = f; + } + } - } - return fileNumbersWritten; + } + return fileNumbersWritten; } -void NFilesIter::CleanUpMessages() { +void NFilesIter::CleanUpMessages () { #ifdef BL_USE_MPI - BL_PROFILE("NFI::CleanUpMessages"); - for(auto & pii : unreadMessages) { - int fromProc, tag(pii.first), nMessages(pii.second); + BL_PROFILE("NFI::CleanUpMessages"); + for(auto & pii : unreadMessages) { + int fromProc, tag(pii.first), nMessages(pii.second); #if 0 - amrex::AllPrint() << ParallelDescriptor::MyProc() << ":: cleaning up " << nMessages - << " messages for tag " << tag << '\n'; + amrex::AllPrint() << ParallelDescriptor::MyProc() << ":: cleaning up " << nMessages + << " messages for tag " << tag << '\n'; #endif - for(int n(0); n < nMessages; ++n) { - ParallelDescriptor::Recv(&fromProc, 1, MPI_ANY_SOURCE, tag); + for(int n(0); n < nMessages; ++n) { + ParallelDescriptor::Recv(&fromProc, 1, MPI_ANY_SOURCE, tag); + } } - } - unreadMessages.clear(); + unreadMessages.clear(); #endif }