diff --git a/tests/ignite/metrics/regression/test_canberra_metric.py b/tests/ignite/metrics/regression/test_canberra_metric.py index eaaee884768a..f99d1532ca6b 100644 --- a/tests/ignite/metrics/regression/test_canberra_metric.py +++ b/tests/ignite/metrics/regression/test_canberra_metric.py @@ -20,83 +20,73 @@ def test_wrong_input_shapes(): m.update((torch.rand(4, 1), torch.rand(4))) -def test_compute(): - a = np.random.randn(4) - b = np.random.randn(4) - c = np.random.randn(4) - d = np.random.randn(4) - ground_truth = np.random.randn(4) +def test_compute(available_device): + ground_truth = torch.randn(4) + preds = [torch.randn(4) for _ in range(4)] - m = CanberraMetric() + m = CanberraMetric(device=available_device) + assert m._device == torch.device(available_device) canberra = DistanceMetric.get_metric("canberra") - m.update((torch.from_numpy(a), torch.from_numpy(ground_truth))) - np_sum = (np.abs(ground_truth - a) / (np.abs(a) + np.abs(ground_truth))).sum() - assert m.compute() == pytest.approx(np_sum) - assert canberra.pairwise([a, ground_truth])[0][1] == pytest.approx(np_sum) - - m.update((torch.from_numpy(b), torch.from_numpy(ground_truth))) - np_sum += ((np.abs(ground_truth - b)) / (np.abs(b) + np.abs(ground_truth))).sum() - assert m.compute() == pytest.approx(np_sum) - v1 = np.hstack([a, b]) - v2 = np.hstack([ground_truth, ground_truth]) - assert canberra.pairwise([v1, v2])[0][1] == pytest.approx(np_sum) - - m.update((torch.from_numpy(c), torch.from_numpy(ground_truth))) - np_sum += ((np.abs(ground_truth - c)) / (np.abs(c) + np.abs(ground_truth))).sum() - assert m.compute() == pytest.approx(np_sum) - v1 = np.hstack([v1, c]) - v2 = np.hstack([v2, ground_truth]) - assert canberra.pairwise([v1, v2])[0][1] == pytest.approx(np_sum) - - m.update((torch.from_numpy(d), torch.from_numpy(ground_truth))) - np_sum += (np.abs(ground_truth - d) / (np.abs(d) + np.abs(ground_truth))).sum() - assert m.compute() == pytest.approx(np_sum) - v1 = np.hstack([v1, d]) - v2 = np.hstack([v2, ground_truth]) - assert canberra.pairwise([v1, v2])[0][1] == pytest.approx(np_sum) - - -def test_integration(): - def _test(y_pred, y, batch_size): - def update_fn(engine, batch): - idx = (engine.state.iteration - 1) * batch_size - y_true_batch = np_y[idx : idx + batch_size] - y_pred_batch = np_y_pred[idx : idx + batch_size] - return torch.from_numpy(y_pred_batch), torch.from_numpy(y_true_batch) - - engine = Engine(update_fn) - - m = CanberraMetric() - m.attach(engine, "cm") + total_sum = 0.0 + v1 = [] + v2 = [] - np_y = y.numpy().ravel() - np_y_pred = y_pred.numpy().ravel() + for pred in preds: + m.update((pred, ground_truth)) + diff = torch.abs(ground_truth - pred) + denom = torch.abs(pred) + torch.abs(ground_truth) + batch_sum = (diff / denom).sum() + total_sum += batch_sum - canberra = DistanceMetric.get_metric("canberra") + assert m.compute() == pytest.approx(total_sum) - data = list(range(y_pred.shape[0] // batch_size)) - cm = engine.run(data, max_epochs=1).metrics["cm"] + v1.append(pred.cpu().numpy()) + v2.append(ground_truth.cpu().numpy()) + v1_cat = np.hstack(v1) + v2_cat = np.hstack(v2) + assert canberra.pairwise([v1_cat, v2_cat])[0][1] == pytest.approx(total_sum) - assert canberra.pairwise([np_y_pred, np_y])[0][1] == pytest.approx(cm) - def get_test_cases(): - test_cases = [ - (torch.rand(size=(100,)), torch.rand(size=(100,)), 10), - (torch.rand(size=(100, 1)), torch.rand(size=(100, 1)), 20), - ] - return test_cases +@pytest.mark.parametrize("n_times", range(3)) +@pytest.mark.parametrize( + "test_cases", + [ + (torch.rand(size=(100,)), torch.rand(size=(100,)), 10), + (torch.rand(size=(100, 1)), torch.rand(size=(100, 1)), 20), + ], +) +def test_integration(n_times, test_cases, available_device): + y_pred, y, batch_size = test_cases - for _ in range(5): - # check multiple random inputs as random exact occurencies are rare - test_cases = get_test_cases() - for y_pred, y, batch_size in test_cases: - _test(y_pred, y, batch_size) + def update_fn(engine, batch): + idx = (engine.state.iteration - 1) * batch_size + y_true_batch = y[idx : idx + batch_size] + y_pred_batch = y_pred[idx : idx + batch_size] + return y_pred_batch, y_true_batch + engine = Engine(update_fn) -def test_error_is_not_nan(): - m = CanberraMetric() + m = CanberraMetric(device=available_device) + assert m._device == torch.device(available_device) + + m.attach(engine, "cm") + + canberra = DistanceMetric.get_metric("canberra") + + data = list(range(y_pred.shape[0] // batch_size)) + cm = engine.run(data, max_epochs=1).metrics["cm"] + + pred_np = y_pred.cpu().numpy().reshape(len(y_pred), -1) + true_np = y.cpu().numpy().reshape(len(y), -1) + expected = np.sum(canberra.pairwise(pred_np, true_np).diagonal()) + assert expected == pytest.approx(cm) + + +def test_error_is_not_nan(available_device): + m = CanberraMetric(device=available_device) + assert m._device == torch.device(available_device) m.update((torch.zeros(4), torch.zeros(4))) assert not (torch.isnan(m._sum_of_errors).any() or torch.isinf(m._sum_of_errors).any()), m._sum_of_errors diff --git a/tests/ignite/metrics/regression/test_fractional_absolute_error.py b/tests/ignite/metrics/regression/test_fractional_absolute_error.py index c1c3b080576f..36de17058d73 100644 --- a/tests/ignite/metrics/regression/test_fractional_absolute_error.py +++ b/tests/ignite/metrics/regression/test_fractional_absolute_error.py @@ -28,77 +28,63 @@ def test_wrong_input_shapes(): m.update((torch.rand(4, 1), torch.rand(4))) -def test_compute(): - a = np.random.randn(4) - b = np.random.randn(4) - c = np.random.randn(4) - d = np.random.randn(4) - ground_truth = np.random.randn(4) +def test_compute(available_device): + a = torch.randn(4) + b = torch.randn(4) + c = torch.randn(4) + d = torch.randn(4) + ground_truth = torch.randn(4) - m = FractionalAbsoluteError() - - m.update((torch.from_numpy(a), torch.from_numpy(ground_truth))) - np_sum = (2 * np.abs((a - ground_truth)) / (np.abs(a) + np.abs(ground_truth))).sum() - np_len = len(a) - np_ans = np_sum / np_len - assert m.compute() == pytest.approx(np_ans) + m = FractionalAbsoluteError(device=available_device) + assert m._device == torch.device(available_device) - m.update((torch.from_numpy(b), torch.from_numpy(ground_truth))) - np_sum += (2 * np.abs((b - ground_truth)) / (np.abs(b) + np.abs(ground_truth))).sum() - np_len += len(b) - np_ans = np_sum / np_len - assert m.compute() == pytest.approx(np_ans) + total_error = 0.0 + total_len = 0 - m.update((torch.from_numpy(c), torch.from_numpy(ground_truth))) - np_sum += (2 * np.abs((c - ground_truth)) / (np.abs(c) + np.abs(ground_truth))).sum() - np_len += len(c) - np_ans = np_sum / np_len - assert m.compute() == pytest.approx(np_ans) + for pred in [a, b, c, d]: + m.update((pred, ground_truth)) - m.update((torch.from_numpy(d), torch.from_numpy(ground_truth))) - np_sum += (2 * np.abs((d - ground_truth)) / (np.abs(d) + np.abs(ground_truth))).sum() - np_len += len(d) - np_ans = np_sum / np_len - assert m.compute() == pytest.approx(np_ans) + # Compute fractional absolute error in PyTorch + error = 2 * torch.abs(pred - ground_truth) / (torch.abs(pred) + torch.abs(ground_truth)) + total_error += error.sum().item() + total_len += len(pred) + expected = total_error / total_len + assert m.compute() == pytest.approx(expected) -def test_integration(): - def _test(y_pred, y, batch_size): - def update_fn(engine, batch): - idx = (engine.state.iteration - 1) * batch_size - y_true_batch = np_y[idx : idx + batch_size] - y_pred_batch = np_y_pred[idx : idx + batch_size] - return torch.from_numpy(y_pred_batch), torch.from_numpy(y_true_batch) - engine = Engine(update_fn) +@pytest.mark.parametrize("n_times", range(5)) +@pytest.mark.parametrize( + "test_cases", + [ + (torch.rand(size=(100,)), torch.rand(size=(100,)), 10), + (torch.rand(size=(100, 1)), torch.rand(size=(100, 1)), 20), + ], +) +def test_integration_fractional_absolute_error(n_times, test_cases, available_device): + y_pred, y, batch_size = test_cases - m = FractionalAbsoluteError() - m.attach(engine, "fab") + def update_fn(engine, batch): + idx = (engine.state.iteration - 1) * batch_size + y_true_batch = y[idx : idx + batch_size] + y_pred_batch = y_pred[idx : idx + batch_size] + return y_pred_batch, y_true_batch - np_y = y.numpy().ravel() - np_y_pred = y_pred.numpy().ravel() + engine = Engine(update_fn) - data = list(range(y_pred.shape[0] // batch_size)) - fab = engine.run(data, max_epochs=1).metrics["fab"] + metric = FractionalAbsoluteError(device=available_device) + assert metric._device == torch.device(available_device) - np_sum = (2 * np.abs((np_y_pred - np_y)) / (np.abs(np_y_pred) + np.abs(np_y))).sum() - np_len = len(y_pred) - np_ans = np_sum / np_len + metric.attach(engine, "fab") - assert np_ans == pytest.approx(fab) + data = list(range(y_pred.shape[0] // batch_size)) + fab = engine.run(data, max_epochs=1).metrics["fab"] - def get_test_cases(): - test_cases = [ - (torch.rand(size=(100,)), torch.rand(size=(100,)), 10), - (torch.rand(size=(100, 1)), torch.rand(size=(100, 1)), 20), - ] - return test_cases + abs_diff = torch.abs(y_pred - y) + denom = torch.abs(y_pred) + torch.abs(y) + expected = (2 * abs_diff / denom).sum().item() / y.numel() - for _ in range(5): - # check multiple random inputs as random exact occurencies are rare - test_cases = get_test_cases() - for y_pred, y, batch_size in test_cases: - _test(y_pred, y, batch_size) + assert pytest.approx(expected) == fab def _test_distrib_compute(device): diff --git a/tests/ignite/metrics/regression/test_fractional_bias.py b/tests/ignite/metrics/regression/test_fractional_bias.py index bf78d4870d5b..20bbf27f2186 100644 --- a/tests/ignite/metrics/regression/test_fractional_bias.py +++ b/tests/ignite/metrics/regression/test_fractional_bias.py @@ -1,6 +1,5 @@ import os -import numpy as np import pytest import torch @@ -28,81 +27,83 @@ def test_wrong_input_shapes(): m.update((torch.rand(4, 1), torch.rand(4))) -def test_fractional_bias(): - a = np.random.randn(4) - b = np.random.randn(4) - c = np.random.randn(4) - d = np.random.randn(4) - ground_truth = np.random.randn(4) +def test_fractional_bias(available_device): + a = torch.randn(4) + b = torch.randn(4) + c = torch.randn(4) + d = torch.randn(4) + ground_truth = torch.randn(4) - m = FractionalBias() + m = FractionalBias(device=available_device) + assert m._device == torch.device(available_device) - m.update((torch.from_numpy(a), torch.from_numpy(ground_truth))) - np_sum = (2 * (ground_truth - a) / (a + ground_truth)).sum() - np_len = len(a) - np_ans = np_sum / np_len - assert m.compute() == pytest.approx(np_ans) - - m.update((torch.from_numpy(b), torch.from_numpy(ground_truth))) - np_sum += (2 * (ground_truth - b) / (b + ground_truth)).sum() - np_len += len(b) - np_ans = np_sum / np_len - assert m.compute() == pytest.approx(np_ans) - - m.update((torch.from_numpy(c), torch.from_numpy(ground_truth))) - np_sum += (2 * (ground_truth - c) / (c + ground_truth)).sum() - np_len += len(c) - np_ans = np_sum / np_len - assert m.compute() == pytest.approx(np_ans) - - m.update((torch.from_numpy(d), torch.from_numpy(ground_truth))) - np_sum += (2 * (ground_truth - d) / (d + ground_truth)).sum() - np_len += len(d) - np_ans = np_sum / np_len - assert m.compute() == pytest.approx(np_ans) - - -def test_integration(): - def _test(y_pred, y, batch_size): - def update_fn(engine, batch): - idx = (engine.state.iteration - 1) * batch_size - y_true_batch = np_y[idx : idx + batch_size] - y_pred_batch = np_y_pred[idx : idx + batch_size] - return torch.from_numpy(y_pred_batch), torch.from_numpy(y_true_batch) - - engine = Engine(update_fn) - - m = FractionalBias() - m.attach(engine, "fb") + total_error = 0.0 + total_len = 0 - np_y = y.double().numpy().ravel() - np_y_pred = y_pred.double().numpy().ravel() + for pred in [a, b, c, d]: + m.update((pred, ground_truth)) - data = list(range(y_pred.shape[0] // batch_size)) - fb = engine.run(data, max_epochs=1).metrics["fb"] + error = 2 * (ground_truth - pred) / (pred + ground_truth) + total_error += error.sum().item() + total_len += len(pred) - np_sum = (2 * (np_y - np_y_pred) / (np_y_pred + np_y)).sum() - np_len = len(y_pred) - np_ans = np_sum / np_len + expected = total_error / total_len + assert m.compute() == pytest.approx(expected) - assert np_ans == pytest.approx(fb) - def get_test_cases(): - test_cases = [ - (torch.rand(size=(100,)), torch.rand(size=(100,)), 10), - (torch.rand(size=(100, 1)), torch.rand(size=(100, 1)), 20), - ] - return test_cases +@pytest.mark.parametrize("n_times", range(5)) +@pytest.mark.parametrize( + "test_case", + [ + (torch.rand(size=(100,)), torch.rand(size=(100,)), 10), + (torch.rand(size=(100, 1)), torch.rand(size=(100, 1)), 20), + ], +) +def test_integration_fractional_bias(n_times, test_case, available_device): + y_pred, y, batch_size = test_case - for _ in range(5): - # check multiple random inputs as random exact occurencies are rare - test_cases = get_test_cases() - for y_pred, y, batch_size in test_cases: - _test(y_pred, y, batch_size) + np_y = y.double().numpy().ravel() + np_y_pred = y_pred.double().numpy().ravel() + def update_fn(engine, batch): + idx = (engine.state.iteration - 1) * batch_size + y_true_batch = np_y[idx : idx + batch_size] + y_pred_batch = np_y_pred[idx : idx + batch_size] -def test_error_is_not_nan(): - m = FractionalBias() + torch_y_pred_batch = ( + torch.from_numpy(y_pred_batch).to(dtype=torch.float32) + if available_device == "mps" + else torch.from_numpy(y_pred_batch) + ) + torch_y_true_batch = ( + torch.from_numpy(y_true_batch).to(dtype=torch.float32) + if available_device == "mps" + else torch.from_numpy(y_true_batch) + ) + + return torch_y_pred_batch, torch_y_true_batch + + engine = Engine(update_fn) + + metric = FractionalBias(device=available_device) + assert metric._device == torch.device(available_device) + + metric.attach(engine, "fb") + + data = list(range(y_pred.shape[0] // batch_size)) + fb = engine.run(data, max_epochs=1).metrics["fb"] + + expected = (2 * (np_y - np_y_pred) / (np_y_pred + np_y)).sum() / len(np_y) + + if available_device == "mps": + assert expected == pytest.approx(fb, rel=1e-5) + else: + assert expected == pytest.approx(fb) + + +def test_error_is_not_nan(available_device): + m = FractionalBias(device=available_device) + assert m._device == torch.device(available_device) m.update((torch.zeros(4), torch.zeros(4))) assert not (torch.isnan(m._sum_of_errors).any() or torch.isinf(m._sum_of_errors).any()), m._sum_of_errors diff --git a/tests/ignite/metrics/regression/test_geometric_mean_absolute_error.py b/tests/ignite/metrics/regression/test_geometric_mean_absolute_error.py index 05f023691a54..607f838d2ac9 100644 --- a/tests/ignite/metrics/regression/test_geometric_mean_absolute_error.py +++ b/tests/ignite/metrics/regression/test_geometric_mean_absolute_error.py @@ -28,47 +28,27 @@ def test_wrong_input_shapes(): m.update((torch.rand(4, 1), torch.rand(4))) -def test_compute(): - a = np.random.randn(4) - b = np.random.randn(4) - c = np.random.randn(4) - d = np.random.randn(4) - ground_truth = np.random.randn(4) - np_prod = 1.0 +def test_compute(available_device): + inputs = [torch.randn(4) for _ in range(4)] + ground_truth = torch.randn(4) - m = GeometricMeanAbsoluteError() - m.update((torch.from_numpy(a), torch.from_numpy(ground_truth))) - - errors = np.abs(ground_truth - a) - np_prod = np.multiply.reduce(errors) * np_prod - np_len = len(a) - np_ans = np.power(np_prod, 1.0 / np_len) - assert m.compute() == pytest.approx(np_ans) - - m.update((torch.from_numpy(b), torch.from_numpy(ground_truth))) - errors = np.abs(ground_truth - b) - np_prod = np.multiply.reduce(errors) * np_prod - np_len += len(b) - np_ans = np.power(np_prod, 1.0 / np_len) - assert m.compute() == pytest.approx(np_ans) - - m.update((torch.from_numpy(c), torch.from_numpy(ground_truth))) - errors = np.abs(ground_truth - c) - np_prod = np.multiply.reduce(errors) * np_prod - np_len += len(c) - np_ans = np.power(np_prod, 1.0 / np_len) - assert m.compute() == pytest.approx(np_ans) - - m.update((torch.from_numpy(d), torch.from_numpy(ground_truth))) - errors = np.abs(ground_truth - d) - np_prod = np.multiply.reduce(errors) * np_prod - np_len += len(d) - np_ans = np.power(np_prod, 1.0 / np_len) - assert m.compute() == pytest.approx(np_ans) - - -def test_integration(): - def _test(y_pred, y, batch_size): + m = GeometricMeanAbsoluteError(device=available_device) + assert m._device == torch.device(available_device) + + total_prod = 1.0 + total_len = 0 + + for pred in inputs: + m.update((pred, ground_truth)) + errors = torch.abs(ground_truth - pred) + total_prod *= torch.prod(errors).item() + total_len += pred.numel() + expected = torch.pow(torch.tensor(total_prod), 1.0 / total_len) + assert m.compute() == pytest.approx(expected.item()) + + +def test_integration(available_device): + def _test(y_pred, y, batch_size, device="cpu"): def update_fn(engine, batch): idx = (engine.state.iteration - 1) * batch_size y_true_batch = np_y[idx : idx + batch_size] @@ -77,7 +57,9 @@ def update_fn(engine, batch): engine = Engine(update_fn) - m = GeometricMeanAbsoluteError() + m = GeometricMeanAbsoluteError(device=device) + assert m._device == torch.device(device) + m.attach(engine, "gmae") np_y = y.numpy().ravel()