diff --git a/Docs/source/usage/parameters.rst b/Docs/source/usage/parameters.rst index e5fc4883d3a..5aef0c33801 100644 --- a/Docs/source/usage/parameters.rst +++ b/Docs/source/usage/parameters.rst @@ -2928,6 +2928,9 @@ In-situ capabilities can be used by turning on Sensei or Ascent (provided they a ``variable based`` is an `experimental feature with ADIOS2 BP5 `__ that will replace ``g``. Default: ``f`` (full diagnostics) +* ``.buffer_flush_limit_btd`` (`integer`; defaults to 5) optional, only read if ``.diag_type = BackTransformed`` + This parameter is intended for ADIOS backend to group every N buffers (N is the value of this parameter) and then flush to disk. + * ``.adios2_operator.type`` (``zfp``, ``blosc``) optional, `ADIOS2 I/O operator type `__ for `openPMD `_ data dumps. diff --git a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H index 5666d85bf3a..22968ae3a34 100644 --- a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H +++ b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H @@ -53,6 +53,11 @@ public: private: /** This is responsible for dumping to file */ std::unique_ptr< WarpXOpenPMDPlot > m_OpenPMDPlotWriter; + + /** This parameter is corresponding to the input option + "buffer_flush_limit_btd" at the diagnostic level. + By default we set to flush every 5 buffers per snapshot */ + int m_NumAggBTDBufferToFlush = 5; }; #endif // WARPX_FLUSHFORMATOPENPMD_H_ diff --git a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp index ae54fd13e5a..85b28d9aba3 100644 --- a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp +++ b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp @@ -64,6 +64,8 @@ FlushFormatOpenPMD::FlushFormatOpenPMD (const std::string& diag_name) ablastr::warn_manager::WMRecordWarning("Diagnostics", warnMsg); encoding = openPMD::IterationEncoding::groupBased; } + + pp_diag_name.query("buffer_flush_limit_btd", m_NumAggBTDBufferToFlush); } // @@ -176,6 +178,9 @@ FlushFormatOpenPMD::WriteToFile ( m_OpenPMDPlotWriter->WriteOpenPMDParticles( particle_diags, static_cast(time), use_pinned_pc, isBTD, isLastBTDFlush); + if (isBTD && (bufferID % m_NumAggBTDBufferToFlush == 0) ) + m_OpenPMDPlotWriter->FlushBTDToDisk(); + // signal that no further updates will be written to this iteration m_OpenPMDPlotWriter->CloseStep(isBTD, isLastBTDFlush); } diff --git a/Source/Diagnostics/WarpXOpenPMD.H b/Source/Diagnostics/WarpXOpenPMD.H index ed44fd8de51..128f50e287d 100644 --- a/Source/Diagnostics/WarpXOpenPMD.H +++ b/Source/Diagnostics/WarpXOpenPMD.H @@ -152,6 +152,21 @@ public: /** Return OpenPMD File type ("bp5", "bp4", "h5" or "json")*/ std::string OpenPMDFileType () { return m_OpenPMDFileType; } + /** Ensure BTD buffers are written to disk + * + * This function can be called to intermediately ensure ADIOS buffered "steps" + * are written to disk, and the valid metadata if checkpointing is required. + * + * This is needed to read partial data while a simulation is running or + * to support restarting (the BTD diagnostics) in WarpX, so it + * can continue to append to a partially written labframe station + * after restart. + * + * The frequency is controlled by + * FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5). + * It can be adjusted in the input file: diag_name.buffer_flush_limit_btd + */ + void FlushBTDToDisk (); private: void Init (openPMD::Access access, bool isBTD); @@ -181,14 +196,18 @@ private: * @param[in] isBTD if the current diagnostic is BTD * * if isBTD=false, apply the default flush behaviour - * if isBTD=true, advice to use ADIOS Put() instead of PDW for better performance. + * in ADIOS, the action will be PerformDataWrite + * if isBTD=true, in ADIOS, the action will be PerformPut + * because no action is taken for the span tasks. + * This way we can aggregate buffers before + * calling FlushBTDToDisk() to write out. * * iteration.seriesFlush() is used instead of series.flush() * because the latter flushes only if data is dirty * this causes trouble when the underlying writing function is collective (like PDW) * */ - void flushCurrent (bool isBTD) const; + void seriesFlush (bool isBTD) const; /** This function does initial setup for the fields when interation is newly created * @param[in] meshes The meshes in a series diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index 17054cd1290..8bb32c41e7e 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -407,13 +407,16 @@ WarpXOpenPMDPlot::~WarpXOpenPMDPlot () } } -void WarpXOpenPMDPlot::flushCurrent (bool isBTD) const +void WarpXOpenPMDPlot::seriesFlush (bool isBTD) const { - WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent"); - - openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); - - currIteration.seriesFlush(); + openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); + if (isBTD) { + WARPX_PROFILE("WarpXOpenPMDPlot::SeriesFlush()::BTD"); + currIteration.seriesFlush("adios2.engine.preferred_flush_target = \"buffer\""); + } else { + WARPX_PROFILE("WarpXOpenPMDPlot::SeriesFlush()()"); + currIteration.seriesFlush(); + } } std::string @@ -463,6 +466,7 @@ void WarpXOpenPMDPlot::SetStep (int ts, const std::string& dirPrefix, int file_m void WarpXOpenPMDPlot::CloseStep (bool isBTD, bool isLastBTDFlush) { + WARPX_PROFILE("WarpXOpenPMDPlot::CloseStep()"); // default close is true bool callClose = true; // close BTD file only when isLastBTDFlush is true @@ -666,19 +670,32 @@ for (const auto & particle_diag : particle_diags) { pc->getCharge(), pc->getMass(), isBTD, isLastBTDFlush); } +} +void +WarpXOpenPMDPlot::FlushBTDToDisk() +{ + bool isBTD = true; auto hasOption = m_OpenPMDoptions.find("FlattenSteps"); - const bool flattenSteps = isBTD && (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos); + const bool flattenSteps = (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos); if (flattenSteps) { - // forcing new step so data from each btd batch in - // preferred_flush_target="buffer" can be flushed out - openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); - currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")"); + WARPX_PROFILE("WarpXOpenPMDPlot::ForceFlush()"); + // Here for checkpointing purpose, we ask ADIOS to create to a new step, which + // triggers writting both data and metadata. + openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); + currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")"); } + else + { + WARPX_PROFILE("WarpXOpenPMDPlot::ForceFlush()::Disk()"); + openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); + currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "disk")"); + } } + void WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc, const std::string& name, @@ -752,7 +769,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc, SetConstParticleRecordsEDPIC(currSpecies, positionComponents, NewParticleVectorSize, charge, mass); } - flushCurrent(isBTD); + this->seriesFlush(isBTD); // dump individual particles bool contributed_particles = false; // did the local MPI rank contribute particles? @@ -833,7 +850,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc, } } - flushCurrent(isBTD); + this->seriesFlush(isBTD); } void @@ -1412,6 +1429,8 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, // collective open series_iteration.open(); + bool hasADIOS = (m_Series->backend() == "ADIOS2"); + auto meshes = series_iteration.meshes; if (first_write_to_iteration) { // lets see whether full_geom varies from geom[0] xgeom[1] @@ -1509,26 +1528,58 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, // GPU pointers to the I/O library #ifdef AMREX_USE_GPU if (fab.arena()->isManaged() || fab.arena()->isDevice()) { - amrex::BaseFab foo(local_box, 1, amrex::The_Pinned_Arena()); - std::shared_ptr data_pinned(foo.release()); - amrex::Gpu::dtoh_memcpy_async(data_pinned.get(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); - // intentionally delayed until before we .flush(): amrex::Gpu::streamSynchronize(); - mesh_comp.storeChunk(data_pinned, chunk_offset, chunk_size); - } else + if (hasADIOS) + { + WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H_Span()"); + auto dynamicMemoryView = mesh_comp.storeChunk( + chunk_offset, chunk_size, + [&local_box](size_t /* size */) { + amrex::BaseFab foo(local_box, 1, amrex::The_Pinned_Arena()); + std::shared_ptr data_pinned(foo.release()); + return data_pinned; + }); + auto span = dynamicMemoryView.currentBuffer(); + amrex::Gpu::dtoh_memcpy_async(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); + } else + { + WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H()"); + amrex::BaseFab foo(local_box, 1, amrex::The_Pinned_Arena()); + std::shared_ptr data_pinned(foo.release()); + amrex::Gpu::dtoh_memcpy_async(data_pinned.get(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); + // intentionally delayed until before we .flush(): amrex::Gpu::streamSynchronize(); + mesh_comp.storeChunk(data_pinned, chunk_offset, chunk_size); + } + } else #endif - { - amrex::Real const *local_data = fab.dataPtr(icomp); - mesh_comp.storeChunkRaw( - local_data, chunk_offset, chunk_size); - } - } + { // CPU + if (hasADIOS) + { + WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_span()"); + auto dynamicMemoryView = mesh_comp.storeChunk( + chunk_offset, chunk_size, + [&local_box](size_t /* size */) { + amrex::BaseFab foo(local_box, 1); + std::shared_ptr data_pinned(foo.release()); + return data_pinned; + }); + + auto span = dynamicMemoryView.currentBuffer(); + std::memcpy(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); + } + else + { + WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_mesh()"); + amrex::Real const *local_data = fab.dataPtr(icomp); + mesh_comp.storeChunkRaw( local_data, chunk_offset, chunk_size); + } + } // CPU + } } // icomp store loop - #ifdef AMREX_USE_GPU amrex::Gpu::streamSynchronize(); #endif // Flush data to disk after looping over all components - flushCurrent(isBTD); + this->seriesFlush(isBTD); } // levels loop (i) } #endif // WARPX_USE_OPENPMD