|
41 | 41 | #define COORDINATE_RANK 0
|
42 | 42 | #define BLUEFOG_TIMELINE "BLUEFOG_TIMELINE"
|
43 | 43 | #define BLUEFOG_CYCLE_TIME "BLUEFOG_CYCLE_TIME"
|
| 44 | +// Stall-check warning time |
| 45 | +#define STALL_WARNING_TIME std::chrono::seconds(15) |
44 | 46 |
|
45 | 47 | namespace bluefog {
|
46 | 48 | namespace common {
|
@@ -313,6 +315,55 @@ Response ConstructResponse(MessageTable* message_table, std::string name) {
|
313 | 315 | return response;
|
314 | 316 | }
|
315 | 317 |
|
| 318 | +// Report Tensors that were submitted to be reduced, gathered or broadcasted by |
| 319 | +// some ranks but not others and are waiting for long time to get processed. |
| 320 | +void CheckForStalledTensors(BluefogGlobalState& state) { |
| 321 | + bool preamble = false; |
| 322 | + auto now = std::chrono::steady_clock::now(); |
| 323 | + for (auto& m : *state.message_table) { |
| 324 | + auto tensor_name = m.first; |
| 325 | + std::vector<Request>& messages = std::get<0>(m.second); |
| 326 | + std::chrono::steady_clock::time_point start_at = std::get<1>(m.second); |
| 327 | + |
| 328 | + if (now - start_at > STALL_WARNING_TIME) { |
| 329 | + if (!preamble) { |
| 330 | + std::cerr << "WARNING: One or more tensors were submitted to be " |
| 331 | + "reduced, gathered or broadcasted by subset of ranks and " |
| 332 | + "are waiting for remainder of ranks for more than " |
| 333 | + << std::chrono::duration_cast<std::chrono::seconds>( |
| 334 | + STALL_WARNING_TIME) |
| 335 | + .count() |
| 336 | + << " seconds. "; |
| 337 | + std::cerr << "This may indicate that different ranks are trying to " |
| 338 | + "submit different tensors or that only subset of ranks is " |
| 339 | + "submitting tensors, which will cause deadlock. " << std::endl; |
| 340 | + std::cerr << "Stalled ops:" << std::endl; |
| 341 | + preamble = true; |
| 342 | + } |
| 343 | + std::cerr << tensor_name; |
| 344 | + std::cerr << " [missing ranks:"; |
| 345 | + std::unordered_set<int32_t> ready_ranks; |
| 346 | + bool missing_preamble = false; |
| 347 | + for (auto msg_iter = messages.begin(); msg_iter != messages.end(); |
| 348 | + msg_iter++) { |
| 349 | + ready_ranks.insert(msg_iter->request_rank()); |
| 350 | + } |
| 351 | + for (int32_t rank = 0; rank < mpi_context.size_; rank++) { |
| 352 | + if (ready_ranks.find(rank) == ready_ranks.end()) { |
| 353 | + if (!missing_preamble) { |
| 354 | + std::cerr << " "; |
| 355 | + missing_preamble = true; |
| 356 | + } else { |
| 357 | + std::cerr << ", "; |
| 358 | + } |
| 359 | + std::cerr << rank; |
| 360 | + } |
| 361 | + } |
| 362 | + std::cerr << "]" << std::endl; |
| 363 | + } |
| 364 | + } |
| 365 | +} |
| 366 | + |
316 | 367 | bool RunLoopOnce(BluefogGlobalState& state);
|
317 | 368 |
|
318 | 369 | void BackgroundThreadLoop(BluefogGlobalState& state) {
|
@@ -777,7 +828,13 @@ bool RunLoopOnce(BluefogGlobalState& state) {
|
777 | 828 | state.tensor_queue.GetTensorEntriesFromResponse(response, entries);
|
778 | 829 | // TODO: tensor fusion logics?
|
779 | 830 | }
|
780 |
| - // TODO: Check for stalled tensors. |
| 831 | + |
| 832 | + // Check for stalled tensors. |
| 833 | + if (std::chrono::steady_clock::now() - state.last_stall_check > |
| 834 | + STALL_WARNING_TIME) { |
| 835 | + CheckForStalledTensors(state); |
| 836 | + state.last_stall_check = std::chrono::steady_clock::now(); |
| 837 | + } |
781 | 838 | } else {
|
782 | 839 | std::string encoded_message;
|
783 | 840 | RequestList message_list;
|
|
0 commit comments