Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- Added `warn_once` option to logging configuration (`td.config.logging.warn_once`) that causes each unique warning message to be shown only once per process, reducing noise from repeated validation warnings.
- Adjoint gradients for `CustomCurrentSource.current_dataset` and `CustomFieldSource.field_dataset`.

### Changed
- Unified inside/outside permittivity handling for all geometries when computing shape gradients.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
from __future__ import annotations

import sys
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from typing import Callable

import autograd as ag
import autograd.numpy as anp
import numpy as np
import pytest

import tidy3d as td
import tidy3d.web as web


@pytest.fixture(autouse=True)
def _enable_local_cache(monkeypatch):
monkeypatch.setattr(td.config.local_cache, "enabled", True)


WVL0 = 1
FREQ0 = td.C_0 / WVL0
FWIDTH = FREQ0 / 10
PULSE = td.GaussianPulse(freq0=FREQ0, fwidth=FWIDTH)
SIM_SIZE = (3 * WVL0, 3 * WVL0, 3 * WVL0)
MONITOR_CENTER = (-0.3, 0.1, 0.2)
MONITOR_SIZE = (0.5, 0.5, 0)
SOURCE_SIZE = (1, 1, 0.1)
SOURCE_CENTER = (0.1, 0.4, -0.2)
DATASET_SPACING = 0.1


def _axis_coords(size: float, spacing: float) -> np.ndarray:
"""Return 1D coords for an axis; if size==0, return a single point at 0."""
if size <= 0:
return np.array([0.0])
# Prefer rounding to avoid silent off-by-one due to float -> int truncation
n = max(2, int(np.round(size / spacing)))
return np.linspace(-size / 2, size / 2, n)


def _make_coords(
size_xyz: tuple[float, float, float], spacing: float, freq0: float
) -> dict[str, object]:
x = _axis_coords(size_xyz[0], spacing)
y = _axis_coords(size_xyz[1], spacing)
z = _axis_coords(size_xyz[2], spacing)
return {"x": x, "y": y, "z": z, "f": [freq0]}


def _make_field_data(
amp: float,
shape: tuple[int, int, int, int],
*,
add_noise: bool,
noise_scale: float = 1.0,
seed: int = 12345,
) -> np.ndarray:
"""Uniform field with optional deterministic Gaussian noise."""
base = amp * np.ones(shape, dtype=float)
if not add_noise:
return base
rng = np.random.RandomState(seed)
noise = rng.normal(size=shape)
return base + noise_scale * noise


def _make_field_dataset_from_amplitudes(
amplitudes: Sequence[float],
field_prefix: str,
coords: Mapping[str, object],
*,
add_noise: bool,
noise_scale: float = 1.0,
noise_seed: int = 12345,
) -> td.FieldDataset:
x = np.asarray(coords["x"])
y = np.asarray(coords["y"])
z = np.asarray(coords["z"])
shape = (len(x), len(y), len(z), 1)

field_components = {}
for amp, axis in zip(amplitudes, "xyz"):
data = _make_field_data(
amp,
shape,
add_noise=add_noise,
noise_scale=noise_scale,
seed=noise_seed,
)
field_components[f"{field_prefix}{axis}"] = td.ScalarFieldDataArray(data, coords=coords)

return td.FieldDataset(**field_components)


def _make_custom_field_source_components(
amplitudes: Sequence[float],
field_prefix: str,
*,
add_noise: bool,
) -> td.CustomFieldSource:
coords = _make_coords((SOURCE_SIZE[0], SOURCE_SIZE[1], 0.0), DATASET_SPACING, FREQ0)
field_dataset = _make_field_dataset_from_amplitudes(
amplitudes,
field_prefix,
coords,
add_noise=add_noise,
noise_scale=1.0,
noise_seed=12345,
)
return td.CustomFieldSource(
center=SOURCE_CENTER,
size=(SOURCE_SIZE[0], SOURCE_SIZE[1], 0.0),
source_time=PULSE,
field_dataset=field_dataset,
)


def _make_custom_current_source_components(
amplitudes: Sequence[float],
field_prefix: str,
*,
add_noise: bool,
) -> td.CustomCurrentSource:
coords = _make_coords(SOURCE_SIZE, DATASET_SPACING, FREQ0)
field_dataset = _make_field_dataset_from_amplitudes(
amplitudes,
field_prefix,
coords,
add_noise=add_noise,
noise_scale=1.0,
noise_seed=12345,
)
return td.CustomCurrentSource(
center=SOURCE_CENTER,
size=SOURCE_SIZE,
source_time=PULSE,
current_dataset=field_dataset,
)


def angled_overlap_deg(v1, v2):
norm_v1 = np.linalg.norm(v1)
norm_v2 = np.linalg.norm(v2)

if np.isclose(norm_v1, 0.0) or np.isclose(norm_v2, 0.0):
if not (np.isclose(norm_v1, 0.0) and np.isclose(norm_v2, 0.0)):
return np.inf

return 0.0

dot = np.minimum(1.0, np.sum((v1 / np.linalg.norm(v1)) * (v2 / np.linalg.norm(v2))))
angle_deg = np.arccos(dot) * 180.0 / np.pi

return angle_deg


def _make_sim(source: td.Source) -> td.Simulation:
monitor = td.FieldMonitor(
name="field_monitor",
center=MONITOR_CENTER,
size=MONITOR_SIZE,
freqs=[FREQ0],
)
return td.Simulation(
size=SIM_SIZE,
run_time=1e-12,
grid_spec=td.GridSpec.auto(min_steps_per_wvl=40, wavelength=WVL0),
sources=[source],
monitors=[monitor],
boundary_spec=td.BoundarySpec.all_sides(boundary=td.PML()),
)


def _eval_objective(sim_data: td.SimulationData, field_component: str) -> float:
field_data = sim_data.load_field_monitor("field_monitor")
component = getattr(field_data, field_component, None)
if component is None:
return 0.0
indexers = {}
for dim in ("x", "y", "z", "f"):
if dim in component.dims:
indexers[dim] = component.sizes[dim] // 2
field_value = component.isel(**indexers).values
return anp.abs(field_value) ** 2


def _eval_objective_components(
sim_data: td.SimulationData, field_components: Sequence[str]
) -> float:
total = 0.0
for component in field_components:
total += _eval_objective(sim_data, component)
return total


@dataclass(frozen=True)
class SourceCase:
name: str
monitor_components: tuple[str, str, str]
delta: float
make_source: Callable


VECTOR_SOURCE_CASES = [
# ------------------
# custom field source
# ------------------
SourceCase(
name="custom_field_vec_e",
monitor_components=("Ex", "Ey", "Ez"),
delta=1e-4,
make_source=lambda amps, *, add_noise: _make_custom_field_source_components(
amps, "E", add_noise=add_noise
),
),
SourceCase(
name="custom_field_vec_h",
monitor_components=("Hx", "Hy", "Hz"),
delta=1e-4,
make_source=lambda amps, *, add_noise: _make_custom_field_source_components(
amps, "H", add_noise=add_noise
),
),
# ------------------
# custom current source
# ------------------
SourceCase(
name="custom_current_vec_e",
monitor_components=("Ex", "Ey", "Ez"),
delta=1e-4,
make_source=lambda amps, *, add_noise: _make_custom_current_source_components(
amps, "E", add_noise=add_noise
),
),
SourceCase(
name="custom_current_vec_h",
monitor_components=("Hx", "Hy", "Hz"),
delta=1e-4,
make_source=lambda amps, *, add_noise: _make_custom_current_source_components(
amps, "H", add_noise=add_noise
),
),
]
PARAM_VECTORS = [
pytest.param((1.0, -0.5, 0.25), id="p1"),
pytest.param((0.2, 0.7, -0.9), id="p2"),
]

NOISE_CASES = [
pytest.param(False, id="no_noise"),
pytest.param(True, id="noise"),
]


@pytest.mark.numerical
@pytest.mark.parametrize("params", PARAM_VECTORS)
@pytest.mark.parametrize("add_noise", NOISE_CASES)
@pytest.mark.parametrize("case", VECTOR_SOURCE_CASES, ids=lambda case: case.name)
def test_custom_source_gradients(
_enable_local_cache,
tmp_path,
case,
params,
add_noise,
):
delta = case.delta
monitor_components = case.monitor_components
label = f"{case.name}_{'noise' if add_noise else 'clean'}_{params!r}"

make_source = lambda amps: case.make_source(amps, add_noise=add_noise)

def objective_adj(ax, ay, az):
sim = _make_sim(make_source((ax, ay, az)))
sim_data = web.run(
sim,
task_name=f"{label}_adj",
path=tmp_path / f"{label}_adj.hdf5",
local_gradient=True,
verbose=False,
)
return _eval_objective_components(sim_data, monitor_components)

grad_adjoint = np.array(
[
ag.grad(objective_adj, 0)(*params),
ag.grad(objective_adj, 1)(*params),
ag.grad(objective_adj, 2)(*params),
],
dtype=float,
)

sims = {}
for idx, axis in enumerate("xyz"):
params_plus = list(params)
params_plus[idx] += delta
params_minus = list(params)
params_minus[idx] -= delta
sims[f"{label}_fd_{axis}_plus"] = _make_sim(make_source(tuple(params_plus)))
sims[f"{label}_fd_{axis}_minus"] = _make_sim(make_source(tuple(params_minus)))

sim_data_map = web.run_async(
sims,
path_dir=tmp_path,
local_gradient=False,
verbose=False,
)

grad_fd = np.zeros(3, dtype=float)
for idx, axis in enumerate("xyz"):
obj_plus = _eval_objective_components(
sim_data_map[f"{label}_fd_{axis}_plus"], monitor_components
)
obj_minus = _eval_objective_components(
sim_data_map[f"{label}_fd_{axis}_minus"], monitor_components
)
grad_fd[idx] = (obj_plus - obj_minus) / (2 * delta)

angle = angled_overlap_deg(grad_adjoint, grad_fd)
print(f"[{label}] grad_adjoint = {grad_adjoint}", file=sys.stderr)
print(f"[{label}] grad_fd = {grad_fd}", file=sys.stderr)
print(f"[{label}] angle_deg = {angle}", file=sys.stderr)

assert angle < 5.0
2 changes: 1 addition & 1 deletion tests/test_components/autograd/test_adjoint_monitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import tidy3d as td

SIM_FIELDS_KEYS = [("dummy", 0, "geometry")]
SIM_FIELDS_KEYS = [("structures", 0, "geometry")]

POLY_VERTS_2D: np.ndarray = np.array(
[
Expand Down
Loading
Loading