diff --git a/CHANGELOG.md b/CHANGELOG.md index 3881af3b37..86773d2d86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Added `warn_once` option to logging configuration (`td.config.logging.warn_once`) that causes each unique warning message to be shown only once per process, reducing noise from repeated validation warnings. +- Adjoint gradients for `CustomCurrentSource.current_dataset` and `CustomFieldSource.field_dataset`. ### Changed - Unified inside/outside permittivity handling for all geometries when computing shape gradients. diff --git a/tests/test_components/autograd/numerical/test_autograd_source_numerical.py b/tests/test_components/autograd/numerical/test_autograd_source_numerical.py new file mode 100644 index 0000000000..53490ece49 --- /dev/null +++ b/tests/test_components/autograd/numerical/test_autograd_source_numerical.py @@ -0,0 +1,325 @@ +from __future__ import annotations + +import sys +from collections.abc import Mapping, Sequence +from dataclasses import dataclass +from typing import Callable + +import autograd as ag +import autograd.numpy as anp +import numpy as np +import pytest + +import tidy3d as td +import tidy3d.web as web + + +@pytest.fixture(autouse=True) +def _enable_local_cache(monkeypatch): + monkeypatch.setattr(td.config.local_cache, "enabled", True) + + +WVL0 = 1 +FREQ0 = td.C_0 / WVL0 +FWIDTH = FREQ0 / 10 +PULSE = td.GaussianPulse(freq0=FREQ0, fwidth=FWIDTH) +SIM_SIZE = (3 * WVL0, 3 * WVL0, 3 * WVL0) +MONITOR_CENTER = (-0.3, 0.1, 0.2) +MONITOR_SIZE = (0.5, 0.5, 0) +SOURCE_SIZE = (1, 1, 0.1) +SOURCE_CENTER = (0.1, 0.4, -0.2) +DATASET_SPACING = 0.1 + + +def _axis_coords(size: float, spacing: float) -> np.ndarray: + """Return 1D coords for an axis; if size==0, return a single point at 0.""" + if size <= 0: + return np.array([0.0]) + # Prefer rounding to avoid silent off-by-one due to float -> int truncation + n = max(2, int(np.round(size / spacing))) + return np.linspace(-size / 2, size / 2, n) + + +def _make_coords( + size_xyz: tuple[float, float, float], spacing: float, freq0: float +) -> dict[str, object]: + x = _axis_coords(size_xyz[0], spacing) + y = _axis_coords(size_xyz[1], spacing) + z = _axis_coords(size_xyz[2], spacing) + return {"x": x, "y": y, "z": z, "f": [freq0]} + + +def _make_field_data( + amp: float, + shape: tuple[int, int, int, int], + *, + add_noise: bool, + noise_scale: float = 1.0, + seed: int = 12345, +) -> np.ndarray: + """Uniform field with optional deterministic Gaussian noise.""" + base = amp * np.ones(shape, dtype=float) + if not add_noise: + return base + rng = np.random.RandomState(seed) + noise = rng.normal(size=shape) + return base + noise_scale * noise + + +def _make_field_dataset_from_amplitudes( + amplitudes: Sequence[float], + field_prefix: str, + coords: Mapping[str, object], + *, + add_noise: bool, + noise_scale: float = 1.0, + noise_seed: int = 12345, +) -> td.FieldDataset: + x = np.asarray(coords["x"]) + y = np.asarray(coords["y"]) + z = np.asarray(coords["z"]) + shape = (len(x), len(y), len(z), 1) + + field_components = {} + for amp, axis in zip(amplitudes, "xyz"): + data = _make_field_data( + amp, + shape, + add_noise=add_noise, + noise_scale=noise_scale, + seed=noise_seed, + ) + field_components[f"{field_prefix}{axis}"] = td.ScalarFieldDataArray(data, coords=coords) + + return td.FieldDataset(**field_components) + + +def _make_custom_field_source_components( + amplitudes: Sequence[float], + field_prefix: str, + *, + add_noise: bool, +) -> td.CustomFieldSource: + coords = _make_coords((SOURCE_SIZE[0], SOURCE_SIZE[1], 0.0), DATASET_SPACING, FREQ0) + field_dataset = _make_field_dataset_from_amplitudes( + amplitudes, + field_prefix, + coords, + add_noise=add_noise, + noise_scale=1.0, + noise_seed=12345, + ) + return td.CustomFieldSource( + center=SOURCE_CENTER, + size=(SOURCE_SIZE[0], SOURCE_SIZE[1], 0.0), + source_time=PULSE, + field_dataset=field_dataset, + ) + + +def _make_custom_current_source_components( + amplitudes: Sequence[float], + field_prefix: str, + *, + add_noise: bool, +) -> td.CustomCurrentSource: + coords = _make_coords(SOURCE_SIZE, DATASET_SPACING, FREQ0) + field_dataset = _make_field_dataset_from_amplitudes( + amplitudes, + field_prefix, + coords, + add_noise=add_noise, + noise_scale=1.0, + noise_seed=12345, + ) + return td.CustomCurrentSource( + center=SOURCE_CENTER, + size=SOURCE_SIZE, + source_time=PULSE, + current_dataset=field_dataset, + ) + + +def angled_overlap_deg(v1, v2): + norm_v1 = np.linalg.norm(v1) + norm_v2 = np.linalg.norm(v2) + + if np.isclose(norm_v1, 0.0) or np.isclose(norm_v2, 0.0): + if not (np.isclose(norm_v1, 0.0) and np.isclose(norm_v2, 0.0)): + return np.inf + + return 0.0 + + dot = np.minimum(1.0, np.sum((v1 / np.linalg.norm(v1)) * (v2 / np.linalg.norm(v2)))) + angle_deg = np.arccos(dot) * 180.0 / np.pi + + return angle_deg + + +def _make_sim(source: td.Source) -> td.Simulation: + monitor = td.FieldMonitor( + name="field_monitor", + center=MONITOR_CENTER, + size=MONITOR_SIZE, + freqs=[FREQ0], + ) + return td.Simulation( + size=SIM_SIZE, + run_time=1e-12, + grid_spec=td.GridSpec.auto(min_steps_per_wvl=40, wavelength=WVL0), + sources=[source], + monitors=[monitor], + boundary_spec=td.BoundarySpec.all_sides(boundary=td.PML()), + ) + + +def _eval_objective(sim_data: td.SimulationData, field_component: str) -> float: + field_data = sim_data.load_field_monitor("field_monitor") + component = getattr(field_data, field_component, None) + if component is None: + return 0.0 + indexers = {} + for dim in ("x", "y", "z", "f"): + if dim in component.dims: + indexers[dim] = component.sizes[dim] // 2 + field_value = component.isel(**indexers).values + return anp.abs(field_value) ** 2 + + +def _eval_objective_components( + sim_data: td.SimulationData, field_components: Sequence[str] +) -> float: + total = 0.0 + for component in field_components: + total += _eval_objective(sim_data, component) + return total + + +@dataclass(frozen=True) +class SourceCase: + name: str + monitor_components: tuple[str, str, str] + delta: float + make_source: Callable + + +VECTOR_SOURCE_CASES = [ + # ------------------ + # custom field source + # ------------------ + SourceCase( + name="custom_field_vec_e", + monitor_components=("Ex", "Ey", "Ez"), + delta=1e-4, + make_source=lambda amps, *, add_noise: _make_custom_field_source_components( + amps, "E", add_noise=add_noise + ), + ), + SourceCase( + name="custom_field_vec_h", + monitor_components=("Hx", "Hy", "Hz"), + delta=1e-4, + make_source=lambda amps, *, add_noise: _make_custom_field_source_components( + amps, "H", add_noise=add_noise + ), + ), + # ------------------ + # custom current source + # ------------------ + SourceCase( + name="custom_current_vec_e", + monitor_components=("Ex", "Ey", "Ez"), + delta=1e-4, + make_source=lambda amps, *, add_noise: _make_custom_current_source_components( + amps, "E", add_noise=add_noise + ), + ), + SourceCase( + name="custom_current_vec_h", + monitor_components=("Hx", "Hy", "Hz"), + delta=1e-4, + make_source=lambda amps, *, add_noise: _make_custom_current_source_components( + amps, "H", add_noise=add_noise + ), + ), +] +PARAM_VECTORS = [ + pytest.param((1.0, -0.5, 0.25), id="p1"), + pytest.param((0.2, 0.7, -0.9), id="p2"), +] + +NOISE_CASES = [ + pytest.param(False, id="no_noise"), + pytest.param(True, id="noise"), +] + + +@pytest.mark.numerical +@pytest.mark.parametrize("params", PARAM_VECTORS) +@pytest.mark.parametrize("add_noise", NOISE_CASES) +@pytest.mark.parametrize("case", VECTOR_SOURCE_CASES, ids=lambda case: case.name) +def test_custom_source_gradients( + _enable_local_cache, + tmp_path, + case, + params, + add_noise, +): + delta = case.delta + monitor_components = case.monitor_components + label = f"{case.name}_{'noise' if add_noise else 'clean'}_{params!r}" + + make_source = lambda amps: case.make_source(amps, add_noise=add_noise) + + def objective_adj(ax, ay, az): + sim = _make_sim(make_source((ax, ay, az))) + sim_data = web.run( + sim, + task_name=f"{label}_adj", + path=tmp_path / f"{label}_adj.hdf5", + local_gradient=True, + verbose=False, + ) + return _eval_objective_components(sim_data, monitor_components) + + grad_adjoint = np.array( + [ + ag.grad(objective_adj, 0)(*params), + ag.grad(objective_adj, 1)(*params), + ag.grad(objective_adj, 2)(*params), + ], + dtype=float, + ) + + sims = {} + for idx, axis in enumerate("xyz"): + params_plus = list(params) + params_plus[idx] += delta + params_minus = list(params) + params_minus[idx] -= delta + sims[f"{label}_fd_{axis}_plus"] = _make_sim(make_source(tuple(params_plus))) + sims[f"{label}_fd_{axis}_minus"] = _make_sim(make_source(tuple(params_minus))) + + sim_data_map = web.run_async( + sims, + path_dir=tmp_path, + local_gradient=False, + verbose=False, + ) + + grad_fd = np.zeros(3, dtype=float) + for idx, axis in enumerate("xyz"): + obj_plus = _eval_objective_components( + sim_data_map[f"{label}_fd_{axis}_plus"], monitor_components + ) + obj_minus = _eval_objective_components( + sim_data_map[f"{label}_fd_{axis}_minus"], monitor_components + ) + grad_fd[idx] = (obj_plus - obj_minus) / (2 * delta) + + angle = angled_overlap_deg(grad_adjoint, grad_fd) + print(f"[{label}] grad_adjoint = {grad_adjoint}", file=sys.stderr) + print(f"[{label}] grad_fd = {grad_fd}", file=sys.stderr) + print(f"[{label}] angle_deg = {angle}", file=sys.stderr) + + assert angle < 5.0 diff --git a/tests/test_components/autograd/test_adjoint_monitors.py b/tests/test_components/autograd/test_adjoint_monitors.py index 7994d9d47e..d4ce0fd4c8 100644 --- a/tests/test_components/autograd/test_adjoint_monitors.py +++ b/tests/test_components/autograd/test_adjoint_monitors.py @@ -7,7 +7,7 @@ import tidy3d as td -SIM_FIELDS_KEYS = [("dummy", 0, "geometry")] +SIM_FIELDS_KEYS = [("structures", 0, "geometry")] POLY_VERTS_2D: np.ndarray = np.array( [ diff --git a/tests/test_components/autograd/test_autograd_sources.py b/tests/test_components/autograd/test_autograd_sources.py new file mode 100644 index 0000000000..61ece9a173 --- /dev/null +++ b/tests/test_components/autograd/test_autograd_sources.py @@ -0,0 +1,973 @@ +""" +Analytical tests for source VJP computation. + +Tests for ``td.CustomCurrentSource._compute_derivatives`` and +``td.CustomFieldSource._compute_derivatives`` using analytical solutions +for simple geometries and field distributions. + +Test coverage: + - Rectangular sources with uniform field distributions + - Gaussian field distributions + - Different source orientations and sizes + - Edge cases with zero fields and boundary conditions +""" + +from __future__ import annotations + +import autograd as ag +import autograd.numpy as anp +import numpy as np +import numpy.testing as npt +import pytest + +import tidy3d as td +from tidy3d.web import run + +from .test_autograd import use_emulated_run # noqa: F401 + + +class DummySourceDI: + """Stand-in for DerivativeInfo for source testing.""" + + def __init__( + self, + *, + paths, + E_adj: dict, + H_adj: dict | None = None, + frequencies: np.ndarray, + bounds: tuple[tuple[float, float, float], tuple[float, float, float]], + source_time_scaling: complex = 1.0, + background_medium: td.Medium | None = None, + ) -> None: + self.paths = paths + self.E_adj = E_adj + self.H_adj = H_adj or {} + self.frequencies = frequencies + self.bounds = bounds + self.E_fwd = {} # Not used for sources + self.D_adj = {} + self.D_fwd = {} + self.eps_data = None + self.eps_in = None + self.eps_out = None + self.eps_background = None + self.eps_no_structure = None + self.eps_inf_structure = None + self.bounds_intersect = bounds + self.source_time_scaling = source_time_scaling + self.background_medium = background_medium + + +def create_uniform_field_data( + center: tuple[float, float, float], + size: tuple[float, float, float], + field_value: float = 1.0, + num_points: int = 10, +) -> td.FieldDataset: + """Create uniform field data for testing.""" + + # Create grid coordinates + x_min, x_max = center[0] - size[0] / 2, center[0] + size[0] / 2 + y_min, y_max = center[1] - size[1] / 2, center[1] + size[1] / 2 + z_min, z_max = center[2] - size[2] / 2, center[2] + size[2] / 2 + + x = np.linspace(x_min, x_max, num_points) + y = np.linspace(y_min, y_max, num_points) + z = np.linspace(z_min, z_max, max(1, num_points // 10)) # Fewer z points for 2D sources + f = [2e14] # Single frequency + + coords = {"x": x, "y": y, "z": z, "f": f} + + # Create uniform field data + data_shape = (len(x), len(y), len(z), len(f)) + field_data = field_value * np.ones(data_shape) + + scalar_field = td.ScalarFieldDataArray(field_data, coords=coords) + return td.FieldDataset(Ex=scalar_field, Ey=scalar_field, Ez=scalar_field) + + +def create_adjoint_field_dataarray( + field_value: float, + shape: tuple[int, int, int, int] = (10, 10, 5, 1), +) -> td.ScalarFieldDataArray: + """Create adjoint field DataArray for testing.""" + + # Create grid coordinates + x = np.linspace(-0.5, 0.5, shape[0]) + y = np.linspace(-0.5, 0.5, shape[1]) + z = np.linspace(-0.05, 0.05, shape[2]) + f = [2e14] + + coords = {"x": x, "y": y, "z": z, "f": f} + + # Create uniform field data + field_data = field_value * np.ones(shape) + + return td.ScalarFieldDataArray(field_data, coords=coords) + + +def create_gaussian_field_data( + center: tuple[float, float, float], + size: tuple[float, float, float], + amplitude: float = 1.0, + sigma: float = 0.1, + num_points: int = 20, +) -> td.FieldDataset: + """Create Gaussian field data for testing.""" + + # Create grid coordinates + x_min, x_max = center[0] - size[0] / 2, center[0] + size[0] / 2 + y_min, y_max = center[1] - size[1] / 2, center[1] + size[1] / 2 + z_min, z_max = center[2] - size[2] / 2, center[2] + size[2] / 2 + + x = np.linspace(x_min, x_max, num_points) + y = np.linspace(y_min, y_max, num_points) + z = np.linspace(z_min, z_max, max(1, num_points // 10)) + f = [2e14] + + # Create Gaussian field data + X, Y, Z = np.meshgrid(x, y, z, indexing="ij") + + # Gaussian centered at source center + r_sq = ((X - center[0]) ** 2 + (Y - center[1]) ** 2 + (Z - center[2]) ** 2) / (2 * sigma**2) + field_data = amplitude * np.exp(-r_sq) + + # Add frequency dimension to match coordinates + field_data = field_data[..., np.newaxis] + + coords = {"x": x, "y": y, "z": z, "f": f} + scalar_field = td.ScalarFieldDataArray(field_data, coords=coords, dims=("x", "y", "z", "f")) + return td.FieldDataset(Ex=scalar_field) + + +class TestSpatialWeights: + """Unit tests for spatial weight helpers.""" + + def test_compute_spatial_weights_cell_sizes(self): + """Cell-size weights should match averaged coordinate spacing.""" + from tidy3d.components.autograd.derivative_utils import compute_spatial_weights + + coords = {"x": np.array([0.0, 1.0, 2.0]), "y": np.array([0.0, 2.0]), "z": np.array([0.0])} + values = np.zeros((3, 2, 1)) + arr = td.ScalarFieldDataArray(values, coords=coords, dims=("x", "y", "z")) + + weights = compute_spatial_weights(arr, dims=("x", "y", "z")) + + expected = np.array([[2.0, 2.0], [2.0, 2.0], [2.0, 2.0]]) + npt.assert_allclose(weights.values, expected) + + def test_compute_source_weights_collapsed_axis(self): + """Collapsed axis should use adjoint spacing when size is zero.""" + from tidy3d.components.autograd.derivative_utils import compute_source_weights + + coords = {"x": np.array([0.0, 1.0]), "y": np.array([0.0, 1.0]), "z": np.array([0.0])} + field_data = td.ScalarFieldDataArray( + np.ones((2, 2, 1)), coords=coords, dims=("x", "y", "z") + ) + + adjoint_coords = { + "x": np.array([0.0, 1.0]), + "y": np.array([0.0, 1.0]), + "z": np.array([0.0, 0.1, 0.2]), + } + adjoint_field = td.ScalarFieldDataArray( + np.ones((2, 2, 3)), coords=adjoint_coords, dims=("x", "y", "z") + ) + + weights = compute_source_weights(field_data, adjoint_field, source_size=(1.0, 1.0, 0.0)) + expected_sum = 2.0 * 2.0 * 0.1 + npt.assert_allclose(np.sum(weights.values), expected_sum) + + def test_transpose_interp_identity(self): + """Adjoint interpolation should preserve weighted values on identical grids.""" + from tidy3d.components.autograd.derivative_utils import ( + compute_spatial_weights, + transpose_interp_field_to_dataset, + ) + + coords = { + "x": np.array([0.0, 1.0]), + "y": np.array([0.0, 2.0]), + "z": np.array([0.0]), + "f": np.array([2e14]), + } + values = np.ones((2, 2, 1, 1), dtype=complex) + adjoint_field = td.ScalarFieldDataArray(values, coords=coords, dims=("x", "y", "z", "f")) + dataset_field = td.ScalarFieldDataArray(values, coords=coords, dims=("x", "y", "z", "f")) + + result = transpose_interp_field_to_dataset( + adjoint_field, dataset_field, center=(0.0, 0.0, 0.0) + ) + weights = compute_spatial_weights(adjoint_field, dims=("x", "y", "z")) + expected = (adjoint_field * weights).transpose(*dataset_field.dims) + + npt.assert_allclose(result.values, expected.values) + + def test_transpose_interp_collapsed_axis(self): + """Collapsed dataset axis should sum weighted adjoint samples.""" + from tidy3d.components.autograd.derivative_utils import transpose_interp_field_to_dataset + + adjoint_coords = { + "x": np.array([0.0, 1.0]), + "y": np.array([0.0]), + "z": np.array([0.0]), + "f": np.array([2e14]), + } + adjoint_values = np.ones((2, 1, 1, 1), dtype=complex) + adjoint_field = td.ScalarFieldDataArray( + adjoint_values, coords=adjoint_coords, dims=("x", "y", "z", "f") + ) + + dataset_coords = { + "x": np.array([0.0]), + "y": np.array([0.0]), + "z": np.array([0.0]), + "f": np.array([2e14]), + } + dataset_field = td.ScalarFieldDataArray( + np.ones((1, 1, 1, 1)), coords=dataset_coords, dims=("x", "y", "z", "f") + ) + + result = transpose_interp_field_to_dataset( + adjoint_field, dataset_field, center=(0.0, 0.0, 0.0) + ) + npt.assert_allclose(result.values, 2.0) + + +def analytical_uniform_source_gradient( + source_bounds: tuple[tuple[float, float, float], tuple[float, float, float]], + adjoint_field_value: complex, + source_scale: complex, + field_component: str = "Ex", +) -> float: + """ + Analytical gradient for uniform source with uniform adjoint field. + + The gradient is the volume integral of the scaled adjoint field. + """ + (x_min, y_min, z_min), (x_max, y_max, z_max) = source_bounds + volume = (x_max - x_min) * (y_max - y_min) * (z_max - z_min) + return np.real(1j * source_scale * adjoint_field_value) * volume + + +def analytical_gaussian_source_gradient( + source_bounds: tuple[tuple[float, float, float], tuple[float, float, float]], + adjoint_amplitude: complex, + source_scale: complex, + sigma: float = 0.1, + field_component: str = "Ex", +) -> float: + """ + Analytical gradient for uniform source with Gaussian adjoint field. + + The gradient is the volume integral of the Gaussian adjoint field. + For a Gaussian centered at the source center, we compute the actual + integral over the source bounds. + """ + (x_min, y_min, z_min), (x_max, y_max, z_max) = source_bounds + + # For a Gaussian field exp(-(x^2 + y^2 + z^2)/(2*sigma^2)), the integral + # over a rectangular domain can be approximated by the sum of field values + # times the volume element, which is what the numerical integration does. + + # Since the test uses a 10x10x1 grid over the bounds, we can compute + # the expected value by sampling the Gaussian at those points + x = np.linspace(x_min, x_max, 10) + y = np.linspace(y_min, y_max, 10) + z = np.array([0.0]) # Single z point as in the test + + X, Y, Z = np.meshgrid(x, y, z, indexing="ij") + r_sq = (X**2 + Y**2 + Z**2) / (2 * sigma**2) + field_data = adjoint_amplitude * np.exp(-r_sq) + + # Compute the volume element + dx = x[1] - x[0] + dy = y[1] - y[0] + + # The integral is the sum of field values times the area element (2D integral) + # since z has only one point, integrate_within_bounds only integrates over x and y + integral = np.sum(np.real(1j * source_scale * field_data)) * dx * dy + + return integral + + +class TestCustomCurrentSourceUniform: + """Test CustomCurrentSource with uniform field distributions.""" + + @pytest.fixture + def source(self): + """Create a CustomCurrentSource with uniform field data.""" + center = (0.0, 0.0, 0.0) + size = (1.0, 1.0, 0.1) # 2D source + field_dataset = create_uniform_field_data(center, size, field_value=1.0) + + return td.CustomCurrentSource( + center=center, + size=size, + source_time=td.GaussianPulse(freq0=2e14, fwidth=1e13), + current_dataset=field_dataset, + ) + + @pytest.fixture + def source_bounds(self, source): + """Get the source bounds.""" + return source.geometry.bounds + + def test_uniform_adjoint_field(self, source, source_bounds): + """Test with uniform adjoint field.""" + # Create uniform adjoint field + adjoint_field_value = -1j * 2.0 + E_adj = {"Ex": create_adjoint_field_dataarray(adjoint_field_value)} + source_scale = 2.0 + + di = DummySourceDI( + paths=[("current_dataset", "Ex")], + E_adj=E_adj, + frequencies=np.array([2e14]), + bounds=source_bounds, + source_time_scaling=source_scale, + ) + + results = source._compute_derivatives(di) + + field_data = source.current_dataset.Ex + from tidy3d.components.autograd.derivative_utils import transpose_interp_field_to_dataset + + adjoint_on_dataset = transpose_interp_field_to_dataset( + E_adj["Ex"], field_data, center=source.center + ) + expected_gradient = 0.5 * np.sum(np.real(source_scale * adjoint_on_dataset).values) + + grad = results[("current_dataset", "Ex")] + assert grad.shape == source.current_dataset.Ex.shape + npt.assert_allclose(np.sum(grad), expected_gradient, rtol=1e-2) + + def test_zero_adjoint_field(self, source, source_bounds): + """Test with zero adjoint field.""" + E_adj = {"Ex": create_adjoint_field_dataarray(0.0)} + + di = DummySourceDI( + paths=[("current_dataset", "Ex")], + E_adj=E_adj, + frequencies=np.array([2e14]), + bounds=source_bounds, + ) + + results = source._compute_derivatives(di) + + # Should be zero + npt.assert_allclose(results[("current_dataset", "Ex")], 0.0, rtol=1e-10) + + def test_multiple_field_components(self, source, source_bounds): + """Test with multiple field components.""" + adjoint_field_value = -1j * 1.5 + E_adj = { + "Ex": create_adjoint_field_dataarray(adjoint_field_value), + "Ey": create_adjoint_field_dataarray(0.5 * adjoint_field_value), + "Ez": create_adjoint_field_dataarray(0.0), + } + source_scale = 1.0 + + di = DummySourceDI( + paths=[("current_dataset", "Ex"), ("current_dataset", "Ey"), ("current_dataset", "Ez")], + E_adj=E_adj, + frequencies=np.array([2e14]), + bounds=source_bounds, + source_time_scaling=source_scale, + ) + + results = source._compute_derivatives(di) + + # Check each component + field_data = source.current_dataset.Ex + from tidy3d.components.autograd.derivative_utils import transpose_interp_field_to_dataset + + adjoint_on_dataset_ex = transpose_interp_field_to_dataset( + E_adj["Ex"], field_data, center=source.center + ) + adjoint_on_dataset_ey = transpose_interp_field_to_dataset( + E_adj["Ey"], field_data, center=source.center + ) + + expected_ex = 0.5 * np.sum(np.real(source_scale * adjoint_on_dataset_ex).values) + expected_ey = 0.5 * np.sum(np.real(source_scale * adjoint_on_dataset_ey).values) + expected_ez = 0.0 + + npt.assert_allclose(np.sum(results[("current_dataset", "Ex")]), expected_ex, rtol=1e-2) + npt.assert_allclose(np.sum(results[("current_dataset", "Ey")]), expected_ey, rtol=1e-2) + npt.assert_allclose(np.sum(results[("current_dataset", "Ez")]), expected_ez, rtol=1e-10) + + +class TestCustomFieldSourceUniform: + """Test CustomFieldSource with uniform field distributions.""" + + @pytest.fixture + def source(self): + """Create a CustomFieldSource with uniform field data.""" + center = (0.0, 0.0, 0.0) + size = (1.0, 1.0, 0.0) # Planar source (z=0) + field_dataset = create_uniform_field_data(center, size, field_value=1.0) + + return td.CustomFieldSource( + center=center, + size=size, + source_time=td.GaussianPulse(freq0=2e14, fwidth=1e13), + field_dataset=field_dataset, + ) + + @pytest.fixture + def source_bounds(self, source): + """Get the source bounds.""" + return source.geometry.bounds + + def test_uniform_adjoint_field(self, source, source_bounds): + """Test with uniform adjoint field.""" + # Create uniform adjoint field + adjoint_field_value = 1j * 2.0 + E_adj = {} + H_adj = {"Hy": create_adjoint_field_dataarray(adjoint_field_value)} + + di = DummySourceDI( + paths=[("field_dataset", "Ex")], + E_adj=E_adj, + H_adj=H_adj, + frequencies=np.array([2e14]), + bounds=source_bounds, + ) + + results = source._compute_derivatives(di) + + # Analytical solution + from tidy3d.components.autograd.derivative_utils import ( + compute_spatial_weights, + transpose_interp_field_to_dataset, + ) + from tidy3d.constants import EPSILON_0 + + omega = 2 * np.pi * 2e14 + field_data = source.field_dataset.Ex + adjoint_on_dataset = transpose_interp_field_to_dataset( + H_adj["Hy"], field_data, center=source.center + ) + size_element = compute_spatial_weights(field_data, dims=("x", "y", "z")) + current_scale = omega * EPSILON_0 / size_element + tangential_spacings = [] + for dim in ("x", "y"): + coords = field_data.coords.get(dim) + if coords is None: + continue + vals = np.asarray(coords.data) + if vals.size > 1: + diffs = np.diff(vals) + diffs = diffs[np.isfinite(diffs) & (diffs > 0)] + if diffs.size: + tangential_spacings.append(float(np.min(diffs))) + normal_scale = 1.0 + if tangential_spacings: + normal_scale = 0.5 / float(np.min(tangential_spacings)) + + expected_gradient = normal_scale * np.sum( + np.real(current_scale * adjoint_on_dataset).values + ) + + npt.assert_allclose(np.sum(results[("field_dataset", "Ex")]), expected_gradient, rtol=1e-2) + + +class TestCustomCurrentSourceGaussian: + """Test CustomCurrentSource with Gaussian field distributions.""" + + @pytest.fixture + def source(self): + """Create a CustomCurrentSource with Gaussian field data.""" + center = (0.0, 0.0, 0.0) + size = (0.5, 0.5, 0.1) # Smaller source for Gaussian test + field_dataset = create_gaussian_field_data(center, size, amplitude=1.0, sigma=0.1) + + return td.CustomCurrentSource( + center=center, + size=size, + source_time=td.GaussianPulse(freq0=2e14, fwidth=1e13), + current_dataset=field_dataset, + ) + + @pytest.fixture + def source_bounds(self, source): + """Get the source bounds.""" + return source.geometry.bounds + + def test_gaussian_adjoint_field(self, source, source_bounds): + """Test with Gaussian adjoint field.""" + # Create Gaussian adjoint field + adjoint_amplitude = -1j * 1.0 + sigma = 0.1 + x = np.linspace(-0.25, 0.25, 10) + y = np.linspace(-0.25, 0.25, 10) + z = np.array([0.0]) + f = [2e14] + + X, Y, Z = np.meshgrid(x, y, z, indexing="ij") + r_sq = (X**2 + Y**2 + Z**2) / (2 * sigma**2) + adjoint_field_data = adjoint_amplitude * np.exp(-r_sq) + + # Add frequency dimension to match coordinates + adjoint_field_data = adjoint_field_data[..., np.newaxis] + coords = {"x": x, "y": y, "z": z, "f": f} + adjoint_field = td.ScalarFieldDataArray( + adjoint_field_data, coords=coords, dims=("x", "y", "z", "f") + ) + + E_adj = {"Ex": adjoint_field} + source_scale = 1.0 + + di = DummySourceDI( + paths=[("current_dataset", "Ex")], + E_adj=E_adj, + frequencies=np.array([2e14]), + bounds=source_bounds, + source_time_scaling=source_scale, + ) + + results = source._compute_derivatives(di) + + from tidy3d.components.autograd.derivative_utils import transpose_interp_field_to_dataset + + adjoint_on_dataset = transpose_interp_field_to_dataset( + adjoint_field, + source.current_dataset.Ex, + center=source.center, + ) + expected_gradient = 0.5 * np.sum(np.real(source_scale * adjoint_on_dataset).values) + + npt.assert_allclose( + np.sum(results[("current_dataset", "Ex")]), expected_gradient, rtol=5e-1 + ) + + +def test_source_autograd(use_emulated_run): # noqa: F811 + """Test autograd differentiation with respect to CustomCurrentSource parameters.""" + + def make_sim_with_traced_source(val): + """Create a simulation with a traced CustomCurrentSource.""" + + # Create a simple simulation + sim = td.Simulation( + size=(2.0, 2.0, 2.0), + run_time=1e-12, + grid_spec=td.GridSpec.uniform(dl=0.1), + sources=[], + monitors=[ + td.FieldMonitor( + size=(1.0, 1.0, 0.0), center=(0, 0, 0), freqs=[2e14], name="field_monitor" + ) + ], + ) + + data_shape = (10, 10, 1, 1) + + # Create a traced CustomCurrentSource + x = np.linspace(-0.5, 0.5, data_shape[0]) + y = np.linspace(-0.5, 0.5, data_shape[1]) + z = np.array([0]) + f = [2e14] + coords = {"x": x, "y": y, "z": z, "f": f} + + # Create traced field data + field_data = val * np.ones(data_shape) + scalar_field = td.ScalarFieldDataArray(field_data, coords=coords) + + # Create field dataset with traced data + field_dataset = td.FieldDataset(Ex=scalar_field) + + # Create CustomCurrentSource with traced dataset + custom_source = td.CustomCurrentSource( + center=(0, 0, 0), + size=(1.0, 1.0, 0.0), + source_time=td.GaussianPulse(freq0=2e14, fwidth=1e13), + current_dataset=field_dataset, + ) + + # Add source to simulation + sim = sim.updated_copy(sources=[custom_source]) + + return sim + + def objective(val): + """Objective function that depends on source parameters.""" + + sim = make_sim_with_traced_source(val) + + # Run simulation + sim_data = run(sim, task_name="test_source_autograd") + + # Extract field data from monitor + field_data = sim_data.load_field_monitor("field_monitor") + Ex_field = field_data.Ex + + # Compute objective (e.g., field intensity at a point) + objective_value = anp.abs(Ex_field.isel(x=5, y=5, z=0, f=0).values) ** 2 + + return objective_value + + # Compute gradient + grad = ag.grad(objective)(1.0) + + assert anp.all(grad != 0.0), "some gradients are 0" + + +def test_field_source_autograd(use_emulated_run): # noqa: F811 + """Test autograd differentiation with respect to CustomFieldSource parameters.""" + + def make_sim_with_traced_field_source(val): + """Create a simulation with a traced CustomFieldSource.""" + + # Create a simple simulation + sim = td.Simulation( + size=(2.0, 2.0, 2.0), + run_time=1e-12, + grid_spec=td.GridSpec.uniform(dl=0.1), + sources=[], + monitors=[ + td.FieldMonitor( + size=(1.0, 1.0, 0.0), center=(0, 0, 0), freqs=[2e14], name="field_monitor" + ) + ], + ) + + data_shape = (10, 10, 1, 1) + + # Create a traced CustomFieldSource + x = np.linspace(-0.5, 0.5, data_shape[0]) + y = np.linspace(-0.5, 0.5, data_shape[1]) + z = np.array([0]) + f = [2e14] + coords = {"x": x, "y": y, "z": z, "f": f} + + # Create traced field data + field_data = val * np.ones(data_shape) + scalar_field = td.ScalarFieldDataArray(field_data, coords=coords) + + # Create field dataset with traced data + field_dataset = td.FieldDataset(Ex=scalar_field) + + # Create CustomFieldSource with traced dataset + custom_source = td.CustomFieldSource( + center=(0, 0, 0), + size=(1.0, 1.0, 0.0), + source_time=td.GaussianPulse(freq0=2e14, fwidth=1e13), + field_dataset=field_dataset, + ) + + # Add source to simulation + sim = sim.updated_copy(sources=[custom_source]) + + return sim + + def objective(val): + """Objective function that depends on source parameters.""" + + sim = make_sim_with_traced_field_source(val) + + # Run simulation + sim_data = run(sim, task_name="test_field_source_autograd") + + # Extract field data from monitor + field_data = sim_data.load_field_monitor("field_monitor") + Ex_field = field_data.Ex + + # Compute objective (e.g., field intensity at a point) + objective_value = anp.abs(Ex_field.isel(x=5, y=5, z=0, f=0).values) ** 2 + + return objective_value + + # Compute gradient + grad = ag.grad(objective)(1.0) + + assert anp.all(grad != 0.0), "some gradients are 0" + + +def test_source_adjoint_monitors(): + """Test that adjoint monitors are properly created for sources.""" + + # Create a simulation with a traced source + sim = td.Simulation( + size=(2.0, 2.0, 2.0), + run_time=1e-12, + grid_spec=td.GridSpec.uniform(dl=0.1), + sources=[], + monitors=[ + td.FieldMonitor( + size=(1.0, 1.0, 0.0), center=(0, 0, 0), freqs=[2e14], name="field_monitor" + ) + ], + ) + + # Create traced field data + data_shape = (10, 10, 1, 1) + x = np.linspace(-0.5, 0.5, data_shape[0]) + y = np.linspace(-0.5, 0.5, data_shape[1]) + z = np.array([0]) + f = [2e14] + coords = {"x": x, "y": y, "z": z, "f": f} + + field_data = 1.0 * np.ones(data_shape) + scalar_field = td.ScalarFieldDataArray(field_data, coords=coords) + field_dataset = td.FieldDataset(Ex=scalar_field) + + # Create CustomCurrentSource with traced dataset + custom_source = td.CustomCurrentSource( + center=(0, 0, 0), + size=(1.0, 1.0, 0.0), + source_time=td.GaussianPulse(freq0=2e14, fwidth=1e13), + current_dataset=field_dataset, + ) + + # Add source to simulation + sim = sim.updated_copy(sources=[custom_source]) + + # Create sim_fields_keys for source + sim_fields_keys = [("sources", 0, "current_dataset", "Ex")] + + # Test that adjoint monitors are created + adjoint_monitors_fld, adjoint_monitors_eps = sim._make_adjoint_monitors(sim_fields_keys) + + # Check that field monitors were created for sources, but no for eps + assert len(adjoint_monitors_fld) == 1 + assert len(adjoint_monitors_eps) == 0 + + # Check that the field monitor covers the source region + field_monitor = adjoint_monitors_fld[0] + assert isinstance(field_monitor, td.FieldMonitor) + assert field_monitor.center == custom_source.center + assert field_monitor.size == custom_source.size + # Check that frequencies are not empty (they should come from _freqs_adjoint) + assert len(field_monitor.freqs) > 0 + # Just check that they have the same length and are not empty + assert len(field_monitor.freqs) == len(sim._freqs_adjoint) + assert len(field_monitor.freqs) > 0 + + +def test_source_field_adjoint_monitors(): + """Test that adjoint monitors are properly created for CustomFieldSource.""" + + # Create a simulation with a traced field source + sim = td.Simulation( + size=(2.0, 2.0, 2.0), + run_time=1e-12, + grid_spec=td.GridSpec.uniform(dl=0.1), + sources=[], + monitors=[ + td.FieldMonitor( + size=(1.0, 1.0, 0.0), center=(0, 0, 0), freqs=[2e14], name="field_monitor" + ) + ], + ) + + # Create traced field data + data_shape = (10, 10, 1, 1) + x = np.linspace(-0.5, 0.5, data_shape[0]) + y = np.linspace(-0.5, 0.5, data_shape[1]) + z = np.array([0]) + f = [2e14] + coords = {"x": x, "y": y, "z": z, "f": f} + + field_data = 1.0 * np.ones(data_shape) + scalar_field = td.ScalarFieldDataArray(field_data, coords=coords) + field_dataset = td.FieldDataset(Ex=scalar_field) + + # Create CustomFieldSource with traced dataset + custom_field_source = td.CustomFieldSource( + center=(0, 0, 0), + size=(1.0, 1.0, 0.0), + source_time=td.GaussianPulse(freq0=2e14, fwidth=1e13), + field_dataset=field_dataset, + ) + + # Add source to simulation + sim = sim.updated_copy(sources=[custom_field_source]) + + # Create sim_fields_keys for source + sim_fields_keys = [("sources", 0, "field_dataset", "Ex")] + + # Test that adjoint monitors are created + adjoint_monitors_fld, adjoint_monitors_eps = sim._make_adjoint_monitors(sim_fields_keys) + + # Check that field monitors were created for sources, but no for eps + assert len(adjoint_monitors_fld) == 1 + assert len(adjoint_monitors_eps) == 0 + + # Check that the field monitor covers the source region + field_monitor = adjoint_monitors_fld[0] + assert isinstance(field_monitor, td.FieldMonitor) + assert field_monitor.center == custom_field_source.center + assert field_monitor.size == custom_field_source.size + # Check that frequencies are not empty (they should come from _freqs_adjoint) + assert len(field_monitor.freqs) > 0 + # Just check that they have the same length and are not empty + assert len(field_monitor.freqs) == len(sim._freqs_adjoint) + assert len(field_monitor.freqs) > 0 + + +def test_mixed_structure_source_adjoint_monitors(): + """Test that adjoint monitors work correctly when both structures and sources are traced.""" + + # Create a simulation with both structures and sources + sim = td.Simulation( + size=(2.0, 2.0, 2.0), + run_time=1e-12, + grid_spec=td.GridSpec.uniform(dl=0.1), + sources=[], + structures=[ + td.Structure( + geometry=td.Box(center=(0.5, 0, 0), size=(0.5, 0.5, 0.5)), + medium=td.Medium(permittivity=2.0), + ) + ], + monitors=[ + td.FieldMonitor( + size=(1.0, 1.0, 0.0), center=(0, 0, 0), freqs=[2e14], name="field_monitor" + ) + ], + ) + + # Create traced field data for source + data_shape = (10, 10, 1, 1) + x = np.linspace(-0.5, 0.5, data_shape[0]) + y = np.linspace(-0.5, 0.5, data_shape[1]) + z = np.array([0]) + f = [2e14] + coords = {"x": x, "y": y, "z": z, "f": f} + + field_data = 1.0 * np.ones(data_shape) + scalar_field = td.ScalarFieldDataArray(field_data, coords=coords) + field_dataset = td.FieldDataset(Ex=scalar_field) + + # Create CustomCurrentSource with traced dataset + custom_source = td.CustomCurrentSource( + center=(-0.5, 0, 0), + size=(0.5, 0.5, 0.0), + source_time=td.GaussianPulse(freq0=2e14, fwidth=1e13), + current_dataset=field_dataset, + ) + + # Add source to simulation + sim = sim.updated_copy(sources=[custom_source]) + + # Create sim_fields_keys for both structure and source + sim_fields_keys = [ + ("structures", 0, "medium", "permittivity"), + ("sources", 0, "current_dataset", "Ex"), + ] + + # Test that adjoint monitors are created for both + adjoint_monitors_fld, adjoint_monitors_eps = sim._make_adjoint_monitors(sim_fields_keys) + + # Should have monitors for both structure and source + # Note: The structure might not create monitors if it doesn't have the right field keys + # Let's be more flexible about the expected number + assert len(adjoint_monitors_fld) == 2 # two field monitors (one for structure, one for source) + assert len(adjoint_monitors_eps) == 1 # only one eps monitor for structure + + # Check that we have at least one source monitor + source_monitor_found = False + for _i, field_monitor_item in enumerate(adjoint_monitors_fld): + # Handle both direct FieldMonitor and list of FieldMonitor + if isinstance(field_monitor_item, td.FieldMonitor): + # Direct FieldMonitor (could be structure or source) + field_monitor = field_monitor_item + # Check if this is our source monitor + if ( + field_monitor.center == custom_source.center + and field_monitor.size == custom_source.size + ): + assert len(field_monitor.freqs) > 0 + source_monitor_found = True + break + elif isinstance(field_monitor_item, list): + # List of FieldMonitor (source monitors are wrapped in lists) + for field_monitor in field_monitor_item: + if isinstance(field_monitor, td.FieldMonitor): + # Check if this is our source monitor + if ( + field_monitor.center == custom_source.center + and field_monitor.size == custom_source.size + ): + assert len(field_monitor.freqs) > 0 + source_monitor_found = True + break + if source_monitor_found: + break + + assert source_monitor_found, "No source monitor found in adjoint monitors" + + +def _make_uniform_field_dataset(val, data_shape=(10, 10, 1, 1), freq=2e14): + x = np.linspace(-0.5, 0.5, data_shape[0]) + y = np.linspace(-0.5, 0.5, data_shape[1]) + z = np.array([0]) + f = [freq] + coords = {"x": x, "y": y, "z": z, "f": f} + + field_data = val * np.ones(data_shape) + scalar_field = td.ScalarFieldDataArray(field_data, coords=coords) + return td.FieldDataset(Ex=scalar_field) + + +SOURCE_CASES = [ + pytest.param( + "custom_current_source", + lambda val, freq: td.CustomCurrentSource( + center=(0, 0, 0), + size=(1.0, 1.0, 0.0), + source_time=td.GaussianPulse(freq0=freq, fwidth=1e13), + current_dataset=_make_uniform_field_dataset(val, freq=freq), + ), + id="CustomCurrentSource", + ), + pytest.param( + "custom_field_source", + lambda val, freq: td.CustomFieldSource( + center=(0, 0, 0), + size=(1.0, 1.0, 0.0), + source_time=td.GaussianPulse(freq0=freq, fwidth=1e13), + field_dataset=_make_uniform_field_dataset(val, freq=freq), + ), + id="CustomFieldSource", + ), +] + + +@pytest.mark.parametrize("kind, make_source", SOURCE_CASES) +def test_traced_source_derivative_computation(use_emulated_run, kind, make_source): # noqa: F811 + """Test that traced source derivative computation works for different source types.""" + freq = 2e14 + + def make_sim(val): + sim = td.Simulation( + size=(2.0, 2.0, 2.0), + run_time=1e-12, + grid_spec=td.GridSpec.uniform(dl=0.1), + sources=[], + monitors=[ + td.FieldMonitor( + size=(1.0, 1.0, 0.0), + center=(0, 0, 0), + freqs=[freq], + name="field_monitor", + ) + ], + ) + src = make_source(val, freq) + return sim.updated_copy(sources=[src]) + + def objective(val): + sim = make_sim(val) + sim_data = run(sim, task_name=f"test_derivative_{kind}") + field_data = sim_data.load_field_monitor("field_monitor") + Ex_field = field_data.Ex + return anp.abs(Ex_field.isel(x=5, y=5, z=0, f=0).values) ** 2 + + grad = ag.grad(objective)(1.0) + + assert grad is not None + assert isinstance(grad, (float, np.ndarray)) diff --git a/tidy3d/components/autograd/derivative_utils.py b/tidy3d/components/autograd/derivative_utils.py index f26c75c588..d45be7025a 100644 --- a/tidy3d/components/autograd/derivative_utils.py +++ b/tidy3d/components/autograd/derivative_utils.py @@ -9,7 +9,7 @@ import numpy as np import xarray as xr -from tidy3d.components.data.data_array import FreqDataArray, ScalarFieldDataArray +from tidy3d.components.data.data_array import FreqDataArray, ScalarFieldDataArray, SpatialDataArray from tidy3d.components.data.utils import _zeros_like from tidy3d.components.types import ArrayLike, Bound, xyz from tidy3d.config import config @@ -119,6 +119,9 @@ class DerivativeInfo: # Optional fields with defaults + source_time_scaling: Optional[FreqDataArray | float] = None + """Optional scaling from source time dependence for source gradients.""" + H_der_map: Optional[FieldData] = None """Magnetic field gradient map. Dataset where the field components ("Hx", "Hy", "Hz") store the multiplication @@ -1041,7 +1044,188 @@ def integrate_within_bounds(arr: xr.DataArray, dims: list[str], bounds: Bound) - return _arr.integrate(coord=dims_integrate) +def compute_spatial_weights( + arr: SpatialDataArray, dims: tuple[str, ...] = ("x", "y", "z") +) -> SpatialDataArray: + """Compute cell-size weights for spatial coordinates. + + Parameters + ---------- + arr : SpatialDataArray + Data array providing spatial coordinates. + dims : tuple[str, ...] + Spatial dimension names to include in the weights. + + Returns + ------- + SpatialDataArray + DataArray of weights broadcastable to ``arr``. + """ + + def _cell_size_weights(coord: np.ndarray) -> np.ndarray: + if coord.size <= 1: + return np.array([1.0], dtype=float) + deltas = np.diff(coord) + diff_left = np.pad(deltas, (1, 0), mode="edge") + diff_right = np.pad(deltas, (0, 1), mode="edge") + return 0.5 * (diff_left + diff_right) + + weight_dims = [] + weight_arrays = [] + for dim in dims: + if dim not in arr.coords: + continue + coord = np.asarray(arr.coords[dim].data) + if coord.size <= 1: + continue + weight_dims.append(dim) + weight_arrays.append(_cell_size_weights(coord)) + + if not weight_dims: + return SpatialDataArray(1.0) + + weights = np.ix_(*weight_arrays) + weights_data = weights[0] + for weight_array in weights[1:]: + weights_data = weights_data * weight_array + + coords = {dim: np.asarray(arr.coords[dim].data) for dim in weight_dims} + return SpatialDataArray(weights_data, coords=coords, dims=tuple(weight_dims)) + + +def compute_source_weights( + field_data: SpatialDataArray, + adjoint_field: SpatialDataArray, + source_size: tuple[float, float, float], +) -> SpatialDataArray: + """Compute spatial integration weights for source gradients.""" + dims_to_integrate = tuple( + dim for dim in "xyz" if dim in field_data.coords and adjoint_field.sizes.get(dim, 0) > 1 + ) + weights = compute_spatial_weights(field_data, dims=dims_to_integrate) + scale = 1.0 + for axis, dim in enumerate("xyz"): + if dim not in field_data.coords: + continue + if dim in dims_to_integrate and field_data.sizes.get(dim, 0) == 1: + axis_size = float(source_size[axis]) + if axis_size > 0.0: + scale = scale * axis_size + elif axis_size == 0.0 and dim in adjoint_field.coords: + coord_vals = np.asarray(adjoint_field.coords[dim].data) + if coord_vals.size > 1: + step = np.min(np.abs(np.diff(coord_vals))) + if np.isfinite(step) and step > 0.0: + scale = scale * step + if dim not in dims_to_integrate and field_data.sizes.get(dim, 0) > 1: + scale = scale / field_data.sizes[dim] + return weights * scale + + +def transpose_interp_field_to_dataset( + adjoint_field: SpatialDataArray, + dataset_field: SpatialDataArray, + *, + center: tuple[float, float, float], +) -> SpatialDataArray: + """Accumulate adjoint fields onto dataset coordinates using adjoint interpolation.""" + + def _align_freq(field: SpatialDataArray, target: SpatialDataArray) -> SpatialDataArray: + target_freqs = np.asarray(target.coords["f"].data) + source_freqs = np.asarray(field.coords["f"].data) + if target_freqs.size == source_freqs.size and np.allclose( + target_freqs, source_freqs, rtol=1e-12, atol=0.0 + ): + return field + method = "nearest" if target_freqs.size <= 1 or source_freqs.size <= 1 else "linear" + return field.interp( + {"f": target_freqs}, + method=method, + kwargs={"bounds_error": False, "fill_value": 0.0}, + ).fillna(0.0) + + def _transpose_interp_axis( + field_values: np.ndarray, field_coords_1d: np.ndarray, param_coords_1d: np.ndarray + ) -> np.ndarray: + if param_coords_1d.size == 1: + return field_values.sum(axis=0, keepdims=True) + if np.any(param_coords_1d[1:] < param_coords_1d[:-1]): + raise ValueError("Spatial coordinates must be sorted before computing derivatives.") + + n_param = param_coords_1d.size + n_field = field_values.shape[0] + field_values_2d = field_values.reshape(n_field, -1) + + param_index_upper = np.searchsorted(param_coords_1d, field_coords_1d, side="right") + param_index_upper = np.clip(param_index_upper, 1, n_param - 1) + param_index_lower = param_index_upper - 1 + + segment_width = param_coords_1d[param_index_upper] - param_coords_1d[param_index_lower] + segment_width = np.where(segment_width == 0, 1.0, segment_width) + frac_upper = (field_coords_1d - param_coords_1d[param_index_lower]) / segment_width + frac_upper = np.clip(frac_upper, 0.0, 1.0) + + w_lower = (1.0 - frac_upper)[:, None] + w_upper = frac_upper[:, None] + + param_values_2d = np.zeros((n_param, field_values_2d.shape[1]), dtype=field_values.dtype) + np.add.at(param_values_2d, param_index_lower, field_values_2d * w_lower) + np.add.at(param_values_2d, param_index_upper, field_values_2d * w_upper) + + return param_values_2d.reshape((n_param,) + field_values.shape[1:]) + + def _interp_axis( + arr: np.ndarray, axis: int, field_axis: np.ndarray, param_axis: np.ndarray + ) -> np.ndarray: + moved = np.moveaxis(arr, axis, 0) + moved = _transpose_interp_axis(moved, field_axis, param_axis) + return np.moveaxis(moved, 0, axis) + + center = tuple(get_static(val) for val in center) + aligned = _align_freq(adjoint_field, dataset_field) + weights = compute_spatial_weights(aligned, dims=tuple("xyz")) + if weights.size > 1: + weights = weights.transpose(*weights.dims) + weighted = aligned * weights + + field_coords = { + dim: np.asarray(weighted.coords[dim].data) for dim in weighted.dims if dim in "xyz" + } + param_coords = {} + for axis, dim in enumerate("xyz"): + if dim in dataset_field.coords: + param_coords[dim] = np.asarray(dataset_field.coords[dim].data) + center[axis] + + values = np.asarray(weighted.data) + dims = list(weighted.dims) + for dim in "xyz": + if dim not in field_coords or dim not in param_coords: + continue + axis_index = dims.index(dim) + values = _interp_axis(values, axis_index, field_coords[dim], param_coords[dim]) + + out_coords = {dim: np.asarray(dataset_field.coords[dim].data) for dim in dataset_field.dims} + result = SpatialDataArray(values, coords=out_coords, dims=tuple(dims)) + if tuple(dims) != tuple(dataset_field.dims): + result = result.transpose(*dataset_field.dims) + return result + + +def get_frequency_omega( + field_data: SpatialDataArray, frequencies: ArrayLike +) -> FreqDataArray | float: + """Return angular frequency aligned with field_data frequencies.""" + if "f" in field_data.dims: + omega = 2 * np.pi * np.asarray(field_data.coords["f"].data) + return FreqDataArray(omega, coords={"f": np.asarray(field_data.coords["f"].data)}) + return 2 * np.pi * float(np.asarray(frequencies).squeeze()) + + __all__ = [ "DerivativeInfo", + "compute_source_weights", + "compute_spatial_weights", + "get_frequency_omega", "integrate_within_bounds", + "transpose_interp_field_to_dataset", ] diff --git a/tidy3d/components/base.py b/tidy3d/components/base.py index 7cfbbc5a5a..8eee237a27 100644 --- a/tidy3d/components/base.py +++ b/tidy3d/components/base.py @@ -1167,14 +1167,17 @@ def _json(self, indent=INDENT, exclude_unset=False, **kwargs: Any) -> str: return json_string def _strip_traced_fields( - self, starting_path: tuple[str, ...] = (), include_untraced_data_arrays: bool = False + self, + starting_paths: tuple[tuple[str, ...], ...] = (), + include_untraced_data_arrays: bool = False, ) -> AutogradFieldMap: """Extract a dictionary mapping paths in the model to the data traced by ``autograd``. Parameters ---------- - starting_path : tuple[str, ...] = () - If provided, starts recursing in self.dict() from this path of field names + starting_paths : tuple[tuple[str, ...], ...] = () + If provided, starts recursing in self.dict() from these paths of field names. + Can be a single path tuple or multiple path tuples. include_untraced_data_arrays : bool = False Whether to include ``DataArray`` objects without tracers. We need to include these when returning data, but are unnecessary for structures. @@ -1186,7 +1189,7 @@ def _strip_traced_fields( """ - path = tuple(starting_path) + paths = tuple(starting_paths) if self._has_tracers is False and not include_untraced_data_arrays: return dict_ag() @@ -1216,19 +1219,31 @@ def handle_value(x: Any, path: tuple[str, ...]) -> None: # recursively parse the dictionary of this object self_dict = self.dict() - # if an include_only string was provided, only look at that subset of the dict - if path: - for key in path: - self_dict = self_dict[key] - - handle_value(self_dict, path=path) + # Handle multiple starting paths + if paths: + # If paths is a single tuple, convert to tuple of tuples + if isinstance(paths[0], str): + paths = (paths,) + + # Process each starting path + for starting_path in paths: + # Navigate to the starting path in the dictionary + current_dict = self_dict + for key in starting_path: + current_dict = current_dict[key] + + # Handle the subtree starting from this path + handle_value(current_dict, path=starting_path) + else: + # No starting paths specified, process entire dictionary + handle_value(self_dict, path=()) if field_mapping: if not include_untraced_data_arrays: self._has_tracers = True return dict_ag(field_mapping) - if not include_untraced_data_arrays and not path: + if not include_untraced_data_arrays and not paths: self._has_tracers = False return dict_ag() diff --git a/tidy3d/components/simulation.py b/tidy3d/components/simulation.py index b8b9600aed..bb4f4a880c 100644 --- a/tidy3d/components/simulation.py +++ b/tidy3d/components/simulation.py @@ -4885,10 +4885,21 @@ def _with_adjoint_monitors(self, sim_fields_keys: list) -> Simulation: def _make_adjoint_monitors(self, sim_fields_keys: list) -> tuple[list, list]: """Get lists of field and permittivity monitors for this simulation.""" - index_to_keys = defaultdict(list) - - for _, index, *fields in sim_fields_keys: - index_to_keys[index].append(fields) + # Separate structures and sources into different dictionaries + structure_index_to_keys = defaultdict(list) + source_index_to_keys = defaultdict(list) + + for component_type, index, *fields in sim_fields_keys: + if component_type == "structures": + structure_index_to_keys[index].append(fields) + elif component_type == "sources": + source_index_to_keys[index].append(fields) + else: + raise ValueError( + f"Unknown component type '{component_type}' encountered while " + "constructing adjoint monitors. " + "Expected one of: 'structures', 'sources'." + ) freqs = self._freqs_adjoint sim_plane = self if self.size.count(0.0) == 1 else None @@ -4896,24 +4907,43 @@ def _make_adjoint_monitors(self, sim_fields_keys: list) -> tuple[list, list]: adjoint_monitors_fld = [] adjoint_monitors_eps = [] - # make a field and permittivity monitor for every structure needing one - for i, field_keys in index_to_keys.items(): + # Handle structures first + for i, field_keys in structure_index_to_keys.items(): structure = self.structures[i] - mnt_fld, mnt_eps = structure._make_adjoint_monitors( freqs=freqs, index=i, field_keys=field_keys, plane=sim_plane ) - adjoint_monitors_fld.append(mnt_fld) adjoint_monitors_eps.append(mnt_eps) + # Handle sources + for i, _field_keys in source_index_to_keys.items(): + source = self.sources[i] + + # For sources, we only need field monitors (no permittivity monitors) + # Create a field monitor that covers the source region + source_center = source.center + source_size = source.size + + # Create field monitor for the source + field_monitor = FieldMonitor( + center=source_center, + size=source_size, + freqs=freqs, + name=f"source_adjoint_{i}", + ) + + # For sources, we only return field monitors (no permittivity monitors) + adjoint_monitors_fld.append(field_monitor) + return adjoint_monitors_fld, adjoint_monitors_eps def _check_custom_medium_geometry_overlap(self, sim_fields_keys: AutogradFieldMap) -> None: index_to_keys = defaultdict(list) - for _, index, *fields in sim_fields_keys: - index_to_keys[index].append(fields) + for path_type, index, *fields in sim_fields_keys: + if path_type == "structures": + index_to_keys[index].append(fields) for structure_index, gradient_paths in index_to_keys.items(): if self.structures[structure_index].medium.is_custom: diff --git a/tidy3d/components/source/base.py b/tidy3d/components/source/base.py index 0b76239d08..85cd8e473b 100644 --- a/tidy3d/components/source/base.py +++ b/tidy3d/components/source/base.py @@ -3,7 +3,7 @@ from __future__ import annotations from abc import ABC -from typing import Any, Optional +from typing import TYPE_CHECKING, Any, Optional import pydantic.v1 as pydantic @@ -22,6 +22,10 @@ from .time import SourceTimeType +if TYPE_CHECKING: + from tidy3d.components.autograd import AutogradFieldMap + from tidy3d.components.autograd.derivative_utils import DerivativeInfo + class Source(Box, AbstractSource, ABC): """Abstract base class for all sources.""" @@ -62,6 +66,10 @@ def _pol_vector(self) -> tuple[float, float, float]: _warn_traced_center = _warn_unsupported_traced_argument("center") _warn_traced_size = _warn_unsupported_traced_argument("size") + def _compute_derivatives(self, derivative_info: DerivativeInfo) -> AutogradFieldMap: + """Compute adjoint derivatives for source parameters.""" + raise NotImplementedError(f"Can't compute derivative for 'Source': '{type(self)}'.") + @pydantic.validator("source_time", always=True) def _freqs_lower_bound(cls, val): """Raise validation error if central frequency is too low.""" diff --git a/tidy3d/components/source/current.py b/tidy3d/components/source/current.py index 994ef1eff3..a4996ee932 100644 --- a/tidy3d/components/source/current.py +++ b/tidy3d/components/source/current.py @@ -6,15 +6,19 @@ from math import cos, isclose, sin from typing import Any, Optional +import numpy as np import pydantic.v1 as pydantic from typing_extensions import Literal +from tidy3d.components.autograd import AutogradFieldMap +from tidy3d.components.autograd.derivative_utils import DerivativeInfo from tidy3d.components.base import cached_property from tidy3d.components.data.dataset import FieldDataset from tidy3d.components.data.validators import validate_can_interpolate, validate_no_nans from tidy3d.components.types import Polarization from tidy3d.components.validators import assert_single_freq_in_range, warn_if_dataset_none from tidy3d.constants import MICROMETER +from tidy3d.log import log from .base import Source from .time import SourceTimeType @@ -219,3 +223,58 @@ class CustomCurrentSource(ReverseInterpolatedSource): _current_dataset_none_warning = warn_if_dataset_none("current_dataset") _current_dataset_single_freq = assert_single_freq_in_range("current_dataset") _can_interpolate = validate_can_interpolate("current_dataset") + + def _compute_derivatives(self, derivative_info: DerivativeInfo) -> AutogradFieldMap: + """Compute derivatives with respect to CustomCurrentSource parameters.""" + from tidy3d.components.autograd.derivative_utils import ( + transpose_interp_field_to_dataset, + ) + + if self.current_dataset is None: + return {tuple(path): 0.0 for path in derivative_info.paths} + + derivative_map = {} + center = tuple(self.center) + h_adj = derivative_info.H_adj or {} + e_adj = derivative_info.E_adj or {} + + for field_path in derivative_info.paths: + field_path = tuple(field_path) + if len(field_path) < 2 or field_path[0] != "current_dataset": + log.warning( + f"Unsupported traced source path '{field_path}' for CustomCurrentSource." + ) + derivative_map[field_path] = 0.0 + continue + + field_name = field_path[1] + if ( + len(field_name) != 2 + or field_name[0] not in ("E", "H") + or field_name[1] not in ("x", "y", "z") + ): + log.warning(f"Unsupported field component '{field_name}' in CustomCurrentSource.") + derivative_map[field_path] = 0.0 + continue + + field_data = getattr(self.current_dataset, field_name, None) + if field_data is None: + raise ValueError(f"Cannot find field '{field_name}' in current dataset.") + + if field_name.startswith("H"): + adjoint_field = h_adj.get(field_name) + component_sign = -1.0 + else: # "E" case + adjoint_field = e_adj.get(field_name) + component_sign = 1.0 + + adjoint_on_dataset = transpose_interp_field_to_dataset( + adjoint_field, field_data, center=center + ) + vjp_field = 0.5 * np.real( + derivative_info.source_time_scaling * adjoint_on_dataset * component_sign + ) + + derivative_map[field_path] = vjp_field.transpose(*field_data.dims).values + + return derivative_map diff --git a/tidy3d/components/source/field.py b/tidy3d/components/source/field.py index 4255561698..2185582045 100644 --- a/tidy3d/components/source/field.py +++ b/tidy3d/components/source/field.py @@ -8,6 +8,8 @@ import numpy as np import pydantic.v1 as pydantic +from tidy3d.components.autograd import AutogradFieldMap +from tidy3d.components.autograd.derivative_utils import DerivativeInfo from tidy3d.components.base import Tidy3dBaseModel, cached_property, skip_if_fields_missing from tidy3d.components.data.dataset import FieldDataset from tidy3d.components.data.validators import validate_can_interpolate, validate_no_nans @@ -22,7 +24,7 @@ warn_backward_waist_distance, warn_if_dataset_none, ) -from tidy3d.constants import GLANCING_CUTOFF, MICROMETER, RADIAN, inf +from tidy3d.constants import EPSILON_0, GLANCING_CUTOFF, MICROMETER, RADIAN, inf from tidy3d.exceptions import SetupError from tidy3d.log import log @@ -244,6 +246,113 @@ def _tangential_component_defined(cls, val: FieldDataset, values: dict) -> Field return val raise SetupError("No tangential field found in the suppled 'field_dataset'.") + def _compute_derivatives(self, derivative_info: DerivativeInfo) -> AutogradFieldMap: + """Compute derivatives with respect to CustomFieldSource parameters.""" + from tidy3d.components.autograd.derivative_utils import ( + compute_spatial_weights, + get_frequency_omega, + transpose_interp_field_to_dataset, + ) + + if self.field_dataset is None: + return {tuple(path): 0.0 for path in derivative_info.paths} + + derivative_map = {} + center = tuple(self.center) + e_adj = derivative_info.E_adj or {} + h_adj = derivative_info.H_adj or {} + if self.injection_axis is None: + return {tuple(path): 0.0 for path in derivative_info.paths} + + for field_path in derivative_info.paths: + field_path = tuple(field_path) + if len(field_path) < 2 or field_path[0] != "field_dataset": + log.warning(f"Unsupported traced source path '{field_path}' for CustomFieldSource.") + derivative_map[field_path] = 0.0 + continue + + field_name = field_path[1] + field_data = getattr(self.field_dataset, field_name, None) + if field_data is None: + derivative_map[field_path] = 0.0 + continue + + if ( + len(field_name) != 2 + or field_name[0] not in ("E", "H") + or field_name[1] not in ("x", "y", "z") + ): + log.warning(f"Unsupported field component '{field_name}' in CustomFieldSource.") + derivative_map[field_path] = 0.0 + continue + + component_axis = "xyz".index(field_name[1]) + if component_axis == self.injection_axis: + derivative_map[field_path] = np.zeros_like(field_data.data) + continue + + def _get_adjoint_and_sign( + *, + field_name: str, + injection_axis: int, + component_axis: int, + e_adj: dict, + h_adj: dict, + ): + # n x e determines which orthogonal component couples (and its sign) + n_vec = np.eye(3)[injection_axis] + e_vec = np.eye(3)[component_axis] + cross = np.cross(n_vec, e_vec) + + if not np.any(cross): + return None, 0.0 # indicates "no gradient" + + target_axis = int(np.flatnonzero(cross)[0]) + component_sign = float(cross[target_axis]) + + if field_name.startswith("E"): + target_component = f"H{'xyz'[target_axis]}" + adjoint_field = h_adj.get(target_component) + else: + target_component = f"E{'xyz'[target_axis]}" + adjoint_field = e_adj.get(target_component) + + return adjoint_field, component_sign + + adjoint_field, component_sign = _get_adjoint_and_sign( + field_name=field_name, + injection_axis=self.injection_axis, + component_axis=component_axis, + e_adj=e_adj, + h_adj=h_adj, + ) + + if component_sign == 0.0: + # no gradient for injection_axis == component_axis + derivative_map[field_path] = np.zeros_like(field_data.data) + continue + + adjoint_on_dataset = transpose_interp_field_to_dataset( + adjoint_field, field_data, center=center + ) + size_element = compute_spatial_weights(field_data, dims=tuple("xyz")) + if size_element.size > 1: + size_element = size_element.transpose(*size_element.dims) + + omega_da = get_frequency_omega(field_data, derivative_info.frequencies) + + current_scale = omega_da * EPSILON_0 / size_element + + vjp_field = 0.5 * np.real( + component_sign + * current_scale + * derivative_info.source_time_scaling + * adjoint_on_dataset + ) + derivative_map[field_path] = vjp_field.transpose(*field_data.dims).values + + return derivative_map + """ Source current profiles defined by (1) angle or (2) desired mode. Sets theta and phi angles.""" diff --git a/tidy3d/plugins/autograd/README.md b/tidy3d/plugins/autograd/README.md index 1dc8280968..0857172996 100644 --- a/tidy3d/plugins/autograd/README.md +++ b/tidy3d/plugins/autograd/README.md @@ -190,6 +190,7 @@ The following components are traceable as inputs to the `td.Simulation` | dispersive materials | `PoleResidue.eps_inf`, `PoleResidue.poles` | | spatially dependent dispersive materials | `CustomPoleResidue.eps_inf`, `CustomPoleResidue.poles` | | cylinders | `Cylinder.radius`, `Cylinder.center` | +| sources | `CustomCurrentSource.current_dataset`, `CustomFieldSource.field_dataset`| The following components are traceable as outputs of the `td.SimulationData` @@ -223,12 +224,6 @@ We currently have the following restrictions: This can cause unnecessary data usage during the forward pass, especially if the monitors contain many frequencies that are not relevant for the objective function (i.e., they are not being differentiated w.r.t.). To avoid this, restrict the frequencies in the monitors only to the ones that are relevant for differentiation during optimization. -### To be supported soon - -Next on our roadmap (targeting 2.8 and 2.9, 2025) is to support: - -- `TriangleMesh`. -- `GUI` integration of invdes plugin. ### Finally diff --git a/tidy3d/web/api/autograd/autograd.py b/tidy3d/web/api/autograd/autograd.py index 9d38f3411b..9bc540a1bb 100644 --- a/tidy3d/web/api/autograd/autograd.py +++ b/tidy3d/web/api/autograd/autograd.py @@ -65,7 +65,7 @@ def is_valid_for_autograd(simulation: td.Simulation) -> bool: # if no tracers just use regular web.run() traced_fields = simulation._strip_traced_fields( - include_untraced_data_arrays=False, starting_path=("structures",) + include_untraced_data_arrays=False, starting_paths=(("structures",), ("sources",)) ) if not traced_fields: return False @@ -513,7 +513,7 @@ def setup_run(simulation: td.Simulation) -> AutogradFieldMap: # get a mapping of all the traced fields in the provided simulation return simulation._strip_traced_fields( - include_untraced_data_arrays=False, starting_path=("structures",) + include_untraced_data_arrays=False, starting_paths=(("structures",), ("sources",)) ) @@ -591,7 +591,7 @@ def _run_primitive( aux_data[AUX_KEY_FWD_TASK_ID] = task_id_fwd aux_data[AUX_KEY_SIM_DATA_ORIGINAL] = sim_data_orig field_map = sim_data_orig._strip_traced_fields( - include_untraced_data_arrays=True, starting_path=("data",) + include_untraced_data_arrays=True, starting_paths=(("data",),) ) return field_map @@ -655,7 +655,7 @@ def _run_async_primitive( aux_data_dict[task_name][AUX_KEY_FWD_TASK_ID] = task_id_fwd aux_data_dict[task_name][AUX_KEY_SIM_DATA_ORIGINAL] = sim_data_orig field_map = sim_data_orig._strip_traced_fields( - include_untraced_data_arrays=True, starting_path=("data",) + include_untraced_data_arrays=True, starting_paths=(("data",),) ) field_map_fwd_dict[task_name] = field_map diff --git a/tidy3d/web/api/autograd/backward.py b/tidy3d/web/api/autograd/backward.py index d1481f4013..e923fba20e 100644 --- a/tidy3d/web/api/autograd/backward.py +++ b/tidy3d/web/api/autograd/backward.py @@ -9,7 +9,7 @@ from tidy3d import Medium from tidy3d.components.autograd import AutogradFieldMap, get_static from tidy3d.components.autograd.derivative_utils import DerivativeInfo -from tidy3d.components.data.data_array import DataArray +from tidy3d.components.data.data_array import DataArray, FreqDataArray from tidy3d.config import config from tidy3d.exceptions import AdjointError from tidy3d.packaging import disable_local_subpixel @@ -51,7 +51,7 @@ def setup_adj( # start with the full simulation data structure and either zero out the fields # that have no tracer data for them or insert the tracer data full_sim_data_dict = sim_data_orig._strip_traced_fields( - include_untraced_data_arrays=True, starting_path=("data",) + include_untraced_data_arrays=True, starting_paths=(("data",),) ) for path in full_sim_data_dict.keys(): if path in data_fields_vjp: @@ -112,240 +112,337 @@ def postprocess_adj( ) -> AutogradFieldMap: """Postprocess some data from the adjoint simulation into the VJP for the original sim flds.""" - # map of index into 'structures' to the list of paths we need vjps for + # group the paths by component type and index sim_vjp_map = defaultdict(list) - for _, structure_index, *structure_path in sim_fields_keys: - structure_path = tuple(structure_path) - sim_vjp_map[structure_index].append(structure_path) + for component_type, component_index, *component_path in sim_fields_keys: + sim_vjp_map[(component_type, component_index)].append(component_path) - # store the derivative values given the forward and adjoint data + # compute the VJP for each component sim_fields_vjp = {} - for structure_index, structure_paths in sim_vjp_map.items(): - # grab the forward and adjoint data - fld_fwd = sim_data_fwd._get_adjoint_data(structure_index, data_type="fld") - eps_fwd = sim_data_fwd._get_adjoint_data(structure_index, data_type="eps") - fld_adj = sim_data_adj._get_adjoint_data(structure_index, data_type="fld") - eps_adj = sim_data_adj._get_adjoint_data(structure_index, data_type="eps") - - # post normalize the adjoint fields if a single, broadband source - fwd_flds_adj_normed = {} - for key, val in fld_adj.field_components.items(): - fwd_flds_adj_normed[key] = val * sim_data_adj.simulation.post_norm - - fld_adj = fld_adj.updated_copy(**fwd_flds_adj_normed) - - # maps of the E_fwd * E_adj and D_fwd * D_adj, each as as td.FieldData & 'Ex', 'Ey', 'Ez' - der_maps = get_derivative_maps( - fld_fwd=fld_fwd, - eps_fwd=eps_fwd, - fld_adj=fld_adj, - eps_adj=eps_adj, - ) - E_der_map = der_maps["E"] - D_der_map = der_maps["D"] - H_der_map = der_maps["H"] + for (component_type, component_index), component_paths in sim_vjp_map.items(): + if component_type == "structures": + sim_fields_vjp.update( + _process_structure_gradients( + sim_data_adj, sim_data_orig, sim_data_fwd, component_index, component_paths + ) + ) + elif component_type == "sources": + sim_fields_vjp.update( + _process_source_gradients( + sim_data_adj, sim_data_orig, sim_data_fwd, component_index, component_paths + ) + ) + else: + raise ValueError( + f"Unexpected component_type='{component_type}' for component_index={component_index}. " + "Expected 'structures' or 'sources'." + ) - H_info_exists = H_der_map is not None + return sim_fields_vjp - D_fwd = E_to_D(fld_fwd, eps_fwd) - D_adj = E_to_D(fld_adj, eps_fwd) - structure = sim_data_fwd.simulation.structures[structure_index] +def _process_source_gradients( + sim_data_adj: td.SimulationData, + sim_data_orig: td.SimulationData, + sim_data_fwd: td.SimulationData, + source_index: int, + source_paths: list[tuple], +) -> AutogradFieldMap: + """Process gradients for a specific source.""" - # compute epsilon arrays for all frequencies - # use frequencies from the actual computed derivative map to ensure they exist - # in both forward and adjoint data (E_der_map = fld_fwd * fld_adj) - first_field_component = next(iter(E_der_map.field_components.values())) - adjoint_frequencies = np.array(first_field_component.coords["f"].values) + source = sim_data_fwd.simulation.sources[source_index] + monitor_name = f"source_adjoint_{source_index}" - monitor_freqs = np.array(fld_adj.monitor.freqs) - if len(adjoint_frequencies) != len(monitor_freqs) or not np.allclose( - np.sort(adjoint_frequencies), np.sort(monitor_freqs), rtol=1e-10, atol=0 - ): - raise ValueError( - f"Frequency mismatch in adjoint postprocessing for structure {structure_index}. " - f"Expected frequencies from monitor: {monitor_freqs}, " - f"but derivative map has: {adjoint_frequencies}. " - ) + fld_adj = sim_data_adj[monitor_name] + fld_adj = fld_adj.grid_corrected_copy + + fwd_flds_adj_normed = {} + for key, val in fld_adj.field_components.items(): + fwd_flds_adj_normed[key] = val * sim_data_adj.simulation.post_norm + fld_adj = fld_adj.updated_copy(**fwd_flds_adj_normed) + + e_adj = {k: v for k, v in fld_adj.field_components.items() if k.startswith("E")} + h_adj = {k: v for k, v in fld_adj.field_components.items() if k.startswith("H")} - # auto permittivity detection - sim_orig = sim_data_orig.simulation - plane_eps = eps_fwd.monitor.geometry - sim_orig_grid_spec = td.components.grid.grid_spec.GridSpec.from_grid(sim_orig.grid) + first_field_component = next(iter(e_adj.values())) + adjoint_frequencies = np.array(first_field_component.coords["f"].values) - # permittivity without this structure - structs_no_struct = list(sim_orig.structures) - structs_no_struct.pop(structure_index) - sim_no_structure = sim_orig.updated_copy( - structures=structs_no_struct, monitors=[], sources=[], grid_spec=sim_orig_grid_spec + monitor_freqs = np.array(fld_adj.monitor.freqs) + if len(adjoint_frequencies) != len(monitor_freqs) or not np.allclose( + np.sort(adjoint_frequencies), np.sort(monitor_freqs), rtol=1e-10, atol=0 + ): + raise ValueError( + f"Frequency mismatch in adjoint postprocessing for source {source_index}. " + f"Expected frequencies from monitor: {monitor_freqs}, " + f"but derivative map has: {adjoint_frequencies}. " ) - # for the outside permittivity of the structure, resize the bounds of the permittivity region - # to make sure we capture data outside the structure bounds - low_coords = [center - 0.5 * size for center, size in zip(plane_eps.center, plane_eps.size)] - high_coords = [ - center + 0.5 * size for center, size in zip(plane_eps.center, plane_eps.size) - ] + spectrum = source.source_time.spectrum( + sim_data_orig.simulation.tmesh, + adjoint_frequencies, + sim_data_orig.simulation.dt, + ) + spectrum = np.asarray(spectrum) + scale = 2.0 * spectrum / sim_data_orig.simulation.dt * np.sqrt(2 / np.pi) + source_time_scaling = FreqDataArray(scale, coords={"f": adjoint_frequencies}) + + bounds = source.geometry.bounds + derivative_info = DerivativeInfo( + paths=source_paths, + E_der_map={}, + D_der_map={}, + E_fwd={}, + E_adj=e_adj, + D_fwd={}, + D_adj={}, + H_fwd={}, + H_adj=h_adj, + eps_data={}, + eps_in=1.0, + eps_out=1.0, + frequencies=adjoint_frequencies, + source_time_scaling=source_time_scaling, + bounds=bounds, + bounds_intersect=bounds, + simulation_bounds=sim_data_orig.simulation.bounds, + ) - low_bounds = sim_orig.grid.boundaries.get_bounding_values(low_coords, "left", buffer=1) - high_bounds = sim_orig.grid.boundaries.get_bounding_values(high_coords, "right", buffer=1) + source_vjp = source._compute_derivatives(derivative_info) - resized_center = [0.5 * (low + high) for low, high in zip(low_bounds, high_bounds)] - resized_size = [(high - low) for low, high in zip(low_bounds, high_bounds)] + sim_fields_vjp = {} + for source_path, vjp_value in source_vjp.items(): + sim_path = ("sources", source_index, *list(source_path)) + sim_fields_vjp[sim_path] = vjp_value - resize_plane_eps = plane_eps.updated_copy(center=resized_center, size=resized_size) + return sim_fields_vjp - eps_no_structure_data = [ - sim_no_structure.epsilon(box=resize_plane_eps, coord_key="centers", freq=f) - for f in adjoint_frequencies - ] - eps_no_structure = xr.concat(eps_no_structure_data, dim="f").assign_coords( - f=adjoint_frequencies +def _process_structure_gradients( + sim_data_adj: td.SimulationData, + sim_data_orig: td.SimulationData, + sim_data_fwd: td.SimulationData, + structure_index: int, + structure_paths: list[tuple], +) -> AutogradFieldMap: + """Process gradients for a specific structure.""" + + # grab the forward and adjoint data + fld_fwd = sim_data_fwd._get_adjoint_data(structure_index, data_type="fld") + eps_fwd = sim_data_fwd._get_adjoint_data(structure_index, data_type="eps") + fld_adj = sim_data_adj._get_adjoint_data(structure_index, data_type="fld") + eps_adj = sim_data_adj._get_adjoint_data(structure_index, data_type="eps") + + # post normalize the adjoint fields if a single, broadband source + fwd_flds_adj_normed = {} + for key, val in fld_adj.field_components.items(): + fwd_flds_adj_normed[key] = val * sim_data_adj.simulation.post_norm + + fld_adj = fld_adj.updated_copy(**fwd_flds_adj_normed) + + # maps of the E_fwd * E_adj and D_fwd * D_adj, each as as td.FieldData & 'Ex', 'Ey', 'Ez' + der_maps = get_derivative_maps( + fld_fwd=fld_fwd, + eps_fwd=eps_fwd, + fld_adj=fld_adj, + eps_adj=eps_adj, + ) + E_der_map = der_maps["E"] + D_der_map = der_maps["D"] + H_der_map = der_maps["H"] + + H_info_exists = H_der_map is not None + + D_fwd = E_to_D(fld_fwd, eps_fwd) + D_adj = E_to_D(fld_adj, eps_fwd) + + structure = sim_data_fwd.simulation.structures[structure_index] + + # compute epsilon arrays for all frequencies + # use frequencies from the actual computed derivative map to ensure they exist + # in both forward and adjoint data (E_der_map = fld_fwd * fld_adj) + first_field_component = next(iter(E_der_map.field_components.values())) + adjoint_frequencies = np.array(first_field_component.coords["f"].values) + + monitor_freqs = np.array(fld_adj.monitor.freqs) + if len(adjoint_frequencies) != len(monitor_freqs) or not np.allclose( + np.sort(adjoint_frequencies), np.sort(monitor_freqs), rtol=1e-10, atol=0 + ): + raise ValueError( + f"Frequency mismatch in adjoint postprocessing for structure {structure_index}. " + f"Expected frequencies from monitor: {monitor_freqs}, " + f"but derivative map has: {adjoint_frequencies}. " ) - if structure.medium.is_custom: - # we can't make an infinite structure from a custom medium permittivity - eps_inf_structure = None - else: - geometry_box = structure.geometry.bounding_box - background_structures_2d = [] - sim_inf_background_medium = sim_orig.medium - if np.any(np.array(geometry_box.size) == 0.0): - zero_coordinate = tuple(geometry_box.size).index(0.0) - new_size = [td.inf, td.inf, td.inf] - new_size[zero_coordinate] = 0.0 - - background_structures_2d = [ - structure.updated_copy(geometry=geometry_box.updated_copy(size=new_size)) - ] - else: - sim_inf_background_medium = structure.medium - - # permittivity with infinite structure - structs_inf_struct = list(sim_orig.structures)[structure_index + 1 :] - sim_inf_structure = sim_orig.updated_copy( - structures=background_structures_2d + structs_inf_struct, - medium=sim_inf_background_medium, - monitors=[], - sources=[], - grid_spec=sim_orig_grid_spec, - ) + # auto permittivity detection + sim_orig = sim_data_orig.simulation + plane_eps = eps_fwd.monitor.geometry + sim_orig_grid_spec = td.components.grid.grid_spec.GridSpec.from_grid(sim_orig.grid) - eps_inf_structure_data = [ - sim_inf_structure.epsilon(box=plane_eps, coord_key="centers", freq=f) - for f in adjoint_frequencies - ] + # permittivity without this structure + structs_no_struct = list(sim_orig.structures) + structs_no_struct.pop(structure_index) + sim_no_structure = sim_orig.updated_copy( + structures=structs_no_struct, monitors=[], sources=[], grid_spec=sim_orig_grid_spec + ) - eps_inf_structure = xr.concat(eps_inf_structure_data, dim="f").assign_coords( - f=adjoint_frequencies - ) + # for the outside permittivity of the structure, resize the bounds of the permittivity region + # to make sure we capture data outside the structure bounds + low_coords = [center - 0.5 * size for center, size in zip(plane_eps.center, plane_eps.size)] + high_coords = [center + 0.5 * size for center, size in zip(plane_eps.center, plane_eps.size)] + + low_bounds = sim_orig.grid.boundaries.get_bounding_values(low_coords, "left", buffer=1) + high_bounds = sim_orig.grid.boundaries.get_bounding_values(high_coords, "right", buffer=1) + + resized_center = [0.5 * (low + high) for low, high in zip(low_bounds, high_bounds)] + resized_size = [(high - low) for low, high in zip(low_bounds, high_bounds)] + + resize_plane_eps = plane_eps.updated_copy(center=resized_center, size=resized_size) + + eps_no_structure_data = [ + sim_no_structure.epsilon(box=resize_plane_eps, coord_key="centers", freq=f) + for f in adjoint_frequencies + ] + + eps_no_structure = xr.concat(eps_no_structure_data, dim="f").assign_coords( + f=adjoint_frequencies + ) - # compute bounds intersection - struct_bounds = rmin_struct, rmax_struct = structure.geometry.bounds - rmin_sim, rmax_sim = sim_orig.bounds - rmin_intersect = tuple([max(a, b) for a, b in zip(rmin_sim, rmin_struct)]) - rmax_intersect = tuple([min(a, b) for a, b in zip(rmax_sim, rmax_struct)]) - bounds_intersect = (rmin_intersect, rmax_intersect) - - # get chunk size - if None, process all frequencies as one chunk - freq_chunk_size = config.adjoint.solver_freq_chunk_size - n_freqs = len(adjoint_frequencies) - if not freq_chunk_size or freq_chunk_size <= 0: - freq_chunk_size = n_freqs + if structure.medium.is_custom: + # we can't make an infinite structure from a custom medium permittivity + eps_inf_structure = None + else: + geometry_box = structure.geometry.bounding_box + background_structures_2d = [] + sim_inf_background_medium = sim_orig.medium + if np.any(np.array(geometry_box.size) == 0.0): + zero_coordinate = tuple(geometry_box.size).index(0.0) + new_size = [td.inf, td.inf, td.inf] + new_size[zero_coordinate] = 0.0 + + background_structures_2d = [ + structure.updated_copy(geometry=geometry_box.updated_copy(size=new_size)) + ] else: - freq_chunk_size = min(freq_chunk_size, n_freqs) + sim_inf_background_medium = structure.medium + + # permittivity with infinite structure + structs_inf_struct = list(sim_orig.structures)[structure_index + 1 :] + sim_inf_structure = sim_orig.updated_copy( + structures=background_structures_2d + structs_inf_struct, + medium=sim_inf_background_medium, + monitors=[], + sources=[], + grid_spec=sim_orig_grid_spec, + ) - # process in chunks - vjp_value_map = {} + eps_inf_structure_data = [ + sim_inf_structure.epsilon(box=plane_eps, coord_key="centers", freq=f) + for f in adjoint_frequencies + ] - for chunk_start in range(0, n_freqs, freq_chunk_size): - chunk_end = min(chunk_start + freq_chunk_size, n_freqs) - freq_slice = slice(chunk_start, chunk_end) + eps_inf_structure = xr.concat(eps_inf_structure_data, dim="f").assign_coords( + f=adjoint_frequencies + ) - select_adjoint_freqs = adjoint_frequencies[freq_slice] + # compute bounds intersection + struct_bounds = rmin_struct, rmax_struct = structure.geometry.bounds + rmin_sim, rmax_sim = sim_orig.bounds + rmin_intersect = tuple([max(a, b) for a, b in zip(rmin_sim, rmin_struct)]) + rmax_intersect = tuple([min(a, b) for a, b in zip(rmax_sim, rmax_struct)]) + bounds_intersect = (rmin_intersect, rmax_intersect) + + # get chunk size - if None, process all frequencies as one chunk + freq_chunk_size = config.adjoint.solver_freq_chunk_size + n_freqs = len(adjoint_frequencies) + if not freq_chunk_size or freq_chunk_size <= 0: + freq_chunk_size = n_freqs + else: + freq_chunk_size = min(freq_chunk_size, n_freqs) - # slice field data for current chunk - E_der_map_chunk = _slice_field_data(E_der_map.field_components, select_adjoint_freqs) - D_der_map_chunk = _slice_field_data(D_der_map.field_components, select_adjoint_freqs) - E_fwd_chunk = _slice_field_data( - fld_fwd.field_components, select_adjoint_freqs, component_indicator="E" - ) - E_adj_chunk = _slice_field_data( - fld_adj.field_components, select_adjoint_freqs, component_indicator="E" - ) - D_fwd_chunk = _slice_field_data(D_fwd.field_components, select_adjoint_freqs) - D_adj_chunk = _slice_field_data(D_adj.field_components, select_adjoint_freqs) - eps_data_chunk = _slice_field_data(eps_fwd.field_components, select_adjoint_freqs) + # process in chunks + vjp_value_map = {} - H_der_map_chunk = None - H_fwd_chunk = None - H_adj_chunk = None + for chunk_start in range(0, n_freqs, freq_chunk_size): + chunk_end = min(chunk_start + freq_chunk_size, n_freqs) + freq_slice = slice(chunk_start, chunk_end) - if H_info_exists: - H_der_map_chunk = _slice_field_data( - H_der_map.field_components, select_adjoint_freqs - ) - H_fwd_chunk = _slice_field_data( - fld_fwd.field_components, select_adjoint_freqs, component_indicator="H" - ) - H_adj_chunk = _slice_field_data( - fld_adj.field_components, select_adjoint_freqs, component_indicator="H" - ) + select_adjoint_freqs = adjoint_frequencies[freq_slice] - # slice epsilon arrays - eps_no_structure_chunk = ( - eps_no_structure.sel(f=select_adjoint_freqs) - if eps_no_structure is not None - else None + # slice field data for current chunk + E_der_map_chunk = _slice_field_data(E_der_map.field_components, select_adjoint_freqs) + D_der_map_chunk = _slice_field_data(D_der_map.field_components, select_adjoint_freqs) + E_fwd_chunk = _slice_field_data( + fld_fwd.field_components, select_adjoint_freqs, component_indicator="E" + ) + E_adj_chunk = _slice_field_data( + fld_adj.field_components, select_adjoint_freqs, component_indicator="E" + ) + D_fwd_chunk = _slice_field_data(D_fwd.field_components, select_adjoint_freqs) + D_adj_chunk = _slice_field_data(D_adj.field_components, select_adjoint_freqs) + eps_data_chunk = _slice_field_data(eps_fwd.field_components, select_adjoint_freqs) + + H_der_map_chunk = None + H_fwd_chunk = None + H_adj_chunk = None + + if H_info_exists: + H_der_map_chunk = _slice_field_data(H_der_map.field_components, select_adjoint_freqs) + H_fwd_chunk = _slice_field_data( + fld_fwd.field_components, select_adjoint_freqs, component_indicator="H" ) - eps_inf_structure_chunk = ( - eps_inf_structure.sel(f=select_adjoint_freqs) - if eps_inf_structure is not None - else None + H_adj_chunk = _slice_field_data( + fld_adj.field_components, select_adjoint_freqs, component_indicator="H" ) - # create derivative info with sliced data - derivative_info = DerivativeInfo( - paths=structure_paths, - E_der_map=E_der_map_chunk, - D_der_map=D_der_map_chunk, - H_der_map=H_der_map_chunk, - E_fwd=E_fwd_chunk, - E_adj=E_adj_chunk, - D_fwd=D_fwd_chunk, - D_adj=D_adj_chunk, - H_fwd=H_fwd_chunk, - H_adj=H_adj_chunk, - eps_data=eps_data_chunk, - eps_in=eps_inf_structure_chunk, - eps_out=eps_no_structure_chunk, - frequencies=select_adjoint_freqs, # only chunk frequencies - bounds=struct_bounds, - bounds_intersect=bounds_intersect, - simulation_bounds=sim_data_orig.simulation.bounds, - is_medium_pec=structure.medium.is_pec, - background_medium_is_pec=structure.background_medium - and structure.background_medium.is_pec, - ) + # slice epsilon arrays + eps_no_structure_chunk = ( + eps_no_structure.sel(f=select_adjoint_freqs) if eps_no_structure is not None else None + ) + eps_inf_structure_chunk = ( + eps_inf_structure.sel(f=select_adjoint_freqs) if eps_inf_structure is not None else None + ) - # compute derivatives for chunk - vjp_chunk = structure._compute_derivatives(derivative_info) - - # accumulate results - for path, value in vjp_chunk.items(): - if path in vjp_value_map: - val = vjp_value_map[path] - if isinstance(val, (list, tuple)) and isinstance(value, (list, tuple)): - vjp_value_map[path] = type(val)(x + y for x, y in zip(val, value)) - else: - vjp_value_map[path] += value - else: - vjp_value_map[path] = value + # create derivative info with sliced data + derivative_info = DerivativeInfo( + paths=structure_paths, + E_der_map=E_der_map_chunk, + D_der_map=D_der_map_chunk, + H_der_map=H_der_map_chunk, + E_fwd=E_fwd_chunk, + E_adj=E_adj_chunk, + D_fwd=D_fwd_chunk, + D_adj=D_adj_chunk, + H_fwd=H_fwd_chunk, + H_adj=H_adj_chunk, + eps_data=eps_data_chunk, + eps_in=eps_inf_structure_chunk, + eps_out=eps_no_structure_chunk, + frequencies=select_adjoint_freqs, # only chunk frequencies + bounds=struct_bounds, + bounds_intersect=bounds_intersect, + simulation_bounds=sim_data_orig.simulation.bounds, + is_medium_pec=structure.medium.is_pec, + background_medium_is_pec=structure.background_medium + and structure.background_medium.is_pec, + ) - # store vjps in output map - for structure_path, vjp_value in vjp_value_map.items(): - sim_path = ("structures", structure_index, *list(structure_path)) - sim_fields_vjp[sim_path] = vjp_value + # compute derivatives for chunk + vjp_chunk = structure._compute_derivatives(derivative_info) + + # accumulate results + for path, value in vjp_chunk.items(): + if path in vjp_value_map: + val = vjp_value_map[path] + if isinstance(val, (list, tuple)) and isinstance(value, (list, tuple)): + vjp_value_map[path] = type(val)(x + y for x, y in zip(val, value)) + else: + vjp_value_map[path] += value + else: + vjp_value_map[path] = value + sim_fields_vjp = {} + # store vjps in output map + for structure_path, vjp_value in vjp_value_map.items(): + sim_path = ("structures", structure_index, *list(structure_path)) + sim_fields_vjp[sim_path] = vjp_value return sim_fields_vjp diff --git a/tidy3d/web/api/autograd/forward.py b/tidy3d/web/api/autograd/forward.py index 69ddaade19..d1e737049a 100644 --- a/tidy3d/web/api/autograd/forward.py +++ b/tidy3d/web/api/autograd/forward.py @@ -37,7 +37,7 @@ def postprocess_fwd( # strip out the tracer AutogradFieldMap for the .data from the original sim data_traced = sim_data_original._strip_traced_fields( - include_untraced_data_arrays=True, starting_path=("data",) + include_untraced_data_arrays=True, starting_paths=(("data",),) ) # return the AutogradFieldMap that autograd registers as the "output" of the primitive