Skip to content

Commit 56f0fa2

Browse files
committed
[RDF] Refactor progress bar and shorten its output.
This removes redundant information from the line printed to the terminal and shortens the output. Otherwise, the progress bar frequently overflows the terminal. When the progress bar prints to a file, reduce the frequency to every 10 seconds, and limit its width to 60 chars. Furthermore, significantly improve how completion is estimated. The following two heuristics are employed: - Files that have been opened count as fractionOfFilesAlreadyOpened * eventsProcessed/totalEvents This tracks the progress of all files for which the number of events has been seen. - Files that have not been opened count as 1/totalFiles until they have been opened. This means that the progress bar e.g. can't reach 50% if half of the files haven't been opened yet. This change significantly reduces the jumps of the progress bar when a new file is opened. Finally, significantly refactor the code: - Handle locking logic for updates with a single RAII, update locks to C++17. - Remove a lock and mutex that didn't have any effect. - Reduce repeated function calls to functions that hold locks. - Simplify computation of average number of events. - Relax memory order of the atomics to what's necessary. - Outline as many functions as possible. Since RDFHelpers.hxx goes through JITting and pcms, it gets compiled frequently, so outlining is probably beneficial. - Make a lot of members const to avoid unintenional modifications. This might be interesting because it's an MT context. - Collect helper functions in one anonymous namespace. - Remove a constructor argument that was ignored. - Don't unconditionally clear cout at the end. The bar might be going to a different stream.
1 parent 401339f commit 56f0fa2

File tree

2 files changed

+190
-176
lines changed

2 files changed

+190
-176
lines changed

tree/dataframe/inc/ROOT/RDFHelpers.hxx

Lines changed: 26 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -333,66 +333,49 @@ class ProgressBarAction;
333333
/// ~~~
334334
class ProgressHelper {
335335
private:
336+
std::size_t ComputeTotalEvents() const;
336337
double EvtPerSec() const;
337338
std::pair<std::size_t, std::chrono::seconds> RecordEvtCountAndTime();
338-
void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const;
339+
void PrintProgressAndStats(std::ostream &stream, std::size_t currentEventCount,
340+
std::chrono::seconds totalElapsedSeconds) const;
339341
void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const;
340-
void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const;
341342

342-
std::chrono::time_point<std::chrono::system_clock> fBeginTime = std::chrono::system_clock::now();
343-
std::chrono::time_point<std::chrono::system_clock> fLastPrintTime = fBeginTime;
344-
std::chrono::seconds fPrintInterval{1};
343+
bool const fIsTTY;
344+
bool const fUseShellColours;
345345

346346
std::atomic<std::size_t> fProcessedEvents{0};
347347
std::size_t fLastProcessedEvents{0};
348-
std::size_t fIncrement;
349-
350-
mutable std::mutex fSampleNameToEventEntriesMutex;
351-
std::map<std::string, ULong64_t> fSampleNameToEventEntries; // Filename, events in the file
348+
std::size_t const fIncrement;
349+
unsigned int const fNColumns;
350+
unsigned int const fTotalFiles;
352351

353-
std::array<double, 20> fEventsPerSecondStatistics;
354-
std::size_t fEventsPerSecondStatisticsIndex{0};
352+
std::array<double, 10> fEventsPerSecondStatistics;
353+
unsigned int fEventsPerSecondStatisticsCounter{0};
355354

356-
unsigned int fBarWidth;
357-
unsigned int fTotalFiles;
355+
std::chrono::time_point<std::chrono::system_clock> const fBeginTime = std::chrono::system_clock::now();
356+
std::chrono::time_point<std::chrono::system_clock> fLastPrintTime = fBeginTime;
357+
std::chrono::seconds const fPrintInterval;
358358

359-
std::mutex fPrintMutex;
360-
bool fIsTTY;
361-
bool fUseShellColours;
359+
std::mutex fUpdateMutex; // Mutex to ensure that only one thread updates the progress bar
362360

363-
std::shared_ptr<TTree> fTree{nullptr};
361+
mutable std::mutex fSampleNameToEventEntriesMutex; // Mutex to protect access to the below map
362+
std::map<std::string, ULong64_t> fSampleNameToEventEntries; // Filename, events in the file
364363

365364
public:
366365
/// Create a progress helper.
367366
/// \param increment RDF callbacks are called every `n` events. Pass this `n` here.
368-
/// \param totalFiles read total number of files in the RDF.
369-
/// \param progressBarWidth Number of characters the progress bar will occupy.
370-
/// \param printInterval Update every stats every `n` seconds.
367+
/// \param totalFiles number of files read in the RDF.
368+
/// \param printInterval Update stats every `n` seconds.
371369
/// \param useColors Use shell colour codes to colour the output. Automatically disabled when
372370
/// we are not writing to a tty.
373-
ProgressHelper(std::size_t increment, unsigned int totalFiles = 1, unsigned int progressBarWidth = 40,
374-
unsigned int printInterval = 1, bool useColors = true);
371+
ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int printInterval = 0,
372+
bool useColors = true);
375373

376374
~ProgressHelper() = default;
377375

378376
friend class ProgressBarAction;
379377

380-
/// Register a new sample for completion statistics.
381-
/// \see ROOT::RDF::RInterface::DefinePerSample().
382-
/// The *id.AsString()* refers to the name of the currently processed file.
383-
/// The idea is to populate the event entries in the *fSampleNameToEventEntries* map
384-
/// by selecting the greater of the two values:
385-
/// *id.EntryRange().second* which is the upper event entry range of the processed sample
386-
/// and the current value of the event entries in the *fSampleNameToEventEntries* map.
387-
/// In the single threaded case, the two numbers are the same as the entry range corresponds
388-
/// to the number of events in an individual file (each sample is simply a single file).
389-
/// In the multithreaded case, the idea is to accumulate the higher event entry value until
390-
/// the total number of events in a given file is reached.
391-
void registerNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id)
392-
{
393-
std::lock_guard<std::mutex> lock(fSampleNameToEventEntriesMutex);
394-
fSampleNameToEventEntries[id.AsString()] = std::max(id.NEntriesTotal(), fSampleNameToEventEntries[id.AsString()]);
395-
}
378+
void registerNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id);
396379

397380
/// Thread-safe callback for RDataFrame.
398381
/// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the
@@ -414,7 +397,7 @@ public:
414397
// ***************************************************
415398
// Warning: Here, everything needs to be thread safe:
416399
// ***************************************************
417-
fProcessedEvents += fIncrement;
400+
fProcessedEvents.fetch_add(fIncrement, std::memory_order_relaxed);
418401

419402
// We only print every n seconds.
420403
if (duration_cast<seconds>(system_clock::now() - fLastPrintTime) < fPrintInterval) {
@@ -424,40 +407,22 @@ public:
424407
// ***************************************************
425408
// Protected by lock from here:
426409
// ***************************************************
427-
if (!fPrintMutex.try_lock())
410+
std::unique_lock<std::mutex> lockGuard(fUpdateMutex, std::try_to_lock);
411+
if (!lockGuard)
428412
return;
429-
std::lock_guard<std::mutex> lockGuard(fPrintMutex, std::adopt_lock);
430413

431-
std::size_t eventCount;
432-
seconds elapsedSeconds;
433-
std::tie(eventCount, elapsedSeconds) = RecordEvtCountAndTime();
414+
auto const [eventCount, elapsedSeconds] = RecordEvtCountAndTime();
434415

435416
if (fIsTTY)
436417
std::cout << "\r";
437418

438-
PrintProgressBar(std::cout, eventCount);
439-
PrintStats(std::cout, eventCount, elapsedSeconds);
419+
PrintProgressAndStats(std::cout, eventCount, elapsedSeconds);
440420

441421
if (fIsTTY)
442422
std::cout << std::flush;
443423
else
444424
std::cout << std::endl;
445425
}
446-
447-
std::size_t ComputeNEventsSoFar() const
448-
{
449-
std::unique_lock<std::mutex> lock(fSampleNameToEventEntriesMutex);
450-
std::size_t result = 0;
451-
for (const auto &item : fSampleNameToEventEntries)
452-
result += item.second;
453-
return result;
454-
}
455-
456-
unsigned int ComputeCurrentFileIdx() const
457-
{
458-
std::unique_lock<std::mutex> lock(fSampleNameToEventEntriesMutex);
459-
return fSampleNameToEventEntries.size();
460-
}
461426
};
462427
} // namespace Experimental
463428
} // namespace RDF

0 commit comments

Comments
 (0)