-
Notifications
You must be signed in to change notification settings - Fork 80
/
mlperf_logger.py
executable file
·67 lines (60 loc) · 2.31 KB
/
mlperf_logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
import numpy as np
import os
from mlperf_logging import mllog
from mlperf_logging.mllog import constants as mllog_const
mllogger = mllog.get_mllogger()
mllog.config(
filename=(os.getenv("COMPLIANCE_FILE") or "mlperf_compliance.log"),
root_dir=os.path.normpath(os.path.dirname(os.path.realpath(__file__))))
def ssd_print(*args, sync=True, device=None, use_hpu=False, **kwargs):
if sync:
barrier(device, use_hpu)
if get_rank() == 0:
kwargs['stack_offset'] = 2
mllogger.event(*args, **kwargs)
def barrier(device, use_hpu):
"""
Works as a temporary distributed barrier, currently pytorch
doesn't implement barrier for NCCL/HCL backend.
Calls all_reduce on dummy tensor and synchronizes with GPU/HPU.
"""
if torch.distributed.is_initialized():
if use_hpu:
torch.distributed.all_reduce(torch.ones(1).to(device))
else:
torch.distributed.all_reduce(torch.cuda.FloatTensor(1))
torch.cuda.synchronize()
def get_rank():
"""
Gets distributed rank or returns zero if distributed is not initialized.
"""
if torch.distributed.is_initialized():
rank = torch.distributed.get_rank()
else:
rank = os.getenv('RANK', os.getenv('LOCAL_RANK', 0))
return rank
def broadcast_seeds(seed, device, use_hpu=False):
if torch.distributed.is_initialized():
if use_hpu:
# handle dtype overflow before tensor allocation
seed = seed % 2**31
seeds_tensor = torch.IntTensor([seed]).to(device)
else:
seeds_tensor = torch.LongTensor([seed]).to(device)
torch.distributed.broadcast(seeds_tensor, 0)
seed = seeds_tensor.item()
return seed