Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [0.3.0] Unreleased
- Support for OMOP extension tables with extension classes and builders

## [0.2.0] - 2026-02-25

Expand Down
21 changes: 18 additions & 3 deletions circe/execution/builders/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,25 @@ def decorator(func: Callable[[Criteria, BuildContext], ir.Table]):

def get_builder(criteria: Criteria):
name = criteria.__class__.__name__
builder = _REGISTRY.get(name)
if builder is not None:
return builder

# Fall back to the global extension registry so that dynamically
# registered ibis builders (via circe.extensions.ibis_builder or
# ExtensionRegistry.register_ibis_builder) are discovered without
# any hard-coded imports of extension modules.
try:
return _REGISTRY[name]
except KeyError as exc:
raise ValueError(f"No builder registered for criteria {name}") from exc
from ...extensions import get_registry

builder = get_registry().get_ibis_builder(name)
except ImportError:
builder = None

if builder is not None:
return builder

raise ValueError(f"No builder registered for criteria {name}")


def build_events(criteria: Criteria, ctx: BuildContext) -> ir.Table:
Expand Down
72 changes: 70 additions & 2 deletions circe/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

This module provides the central registry for managing extensions to circe-py,
allowing external projects to register custom criteria classes, SQL builders,
and markdown renderers.
ibis execution builders, and markdown renderers.

Decorator Usage
---------------
Extension authors can use the provided decorator functions to register their
classes automatically, rather than calling the registry methods directly::

from circe.extensions import criteria_class, sql_builder, markdown_template
from circe.extensions import criteria_class, sql_builder, markdown_template, ibis_builder

@criteria_class("WaveformOccurrence")
class WaveformOccurrence(Criteria):
Expand All @@ -23,6 +23,10 @@ class WaveformOccurrenceSqlBuilder(CriteriaSqlBuilder):
@markdown_template(WaveformOccurrence, "waveform_occurrence.j2")
class WaveformOccurrenceMarkdownRenderer:
...

@ibis_builder("WaveformOccurrence")
def build_waveform_occurrence(criteria, ctx):
...
"""

from pathlib import Path
Expand All @@ -49,6 +53,9 @@ def __init__(self):
# Maps criteria types to markdown template names
self._markdown_templates: dict[type[Criteria], str] = {}

# Maps criteria names to ibis execution builders
self._ibis_builders: dict[str, Callable] = {}

# List of paths to search for Jinja2 templates
self._template_paths: list[Path] = []

Expand Down Expand Up @@ -83,6 +90,29 @@ def register_markdown_template(self, criteria_cls: type["Criteria"], template_na
"""
self._markdown_templates[criteria_cls] = template_name

def register_ibis_builder(self, criteria_name: str, func: Callable) -> None:
"""Register an ibis execution builder for a criteria type.

The callable must accept ``(criteria, build_context)`` and return an
``ibis.expr.types.Table``.

Args:
criteria_name: The criteria class name (e.g. ``"WaveformOccurrence"``).
func: A callable ``(Criteria, BuildContext) -> ibis.Table``.
"""
self._ibis_builders[criteria_name] = func

def get_ibis_builder(self, criteria_name: str) -> Optional[Callable]:
"""Look up a registered ibis execution builder by criteria class name.

Args:
criteria_name: The criteria class name.

Returns:
The registered callable, or ``None`` if not found.
"""
return self._ibis_builders.get(criteria_name)

def add_template_path(self, path: Path) -> None:
"""Add a path to search for Jinja2 templates.

Expand Down Expand Up @@ -225,3 +255,41 @@ def template_path(path: Union[str, Path]) -> None:
template_path(Path(__file__).parent / "templates")
"""
_registry.add_template_path(Path(path))


def ibis_builder(criteria_name: str) -> Callable:
"""Function decorator that registers an ibis execution builder for a criteria type.

The decorated function must accept ``(criteria, build_context)`` and return
an ``ibis.expr.types.Table`` following the standard pipeline contract
(person_id, event_id, start_date, end_date, visit_occurrence_id).

This also inserts the builder into the low-level execution registry
(``circe.execution.builders.registry``) so that ``build_events`` can
discover it without any hard-coded imports.

Args:
criteria_name: The criteria class name (e.g. ``"WaveformOccurrence"``).

Example::

from circe.extensions import ibis_builder

@ibis_builder("WaveformOccurrence")
def build_waveform_occurrence(criteria, ctx):
...
"""

def decorator(func: Callable) -> Callable:
_registry.register_ibis_builder(criteria_name, func)
# Also push into the low-level execution registry so build_events
# can resolve the builder without the fallback path.
try:
from circe.execution.builders.registry import register as _register_exec

_register_exec(criteria_name)(func)
except ImportError:
pass
return func

return decorator
20 changes: 19 additions & 1 deletion circe/extensions/waveform/builders/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,19 @@
"""builders sub-package for the waveform extension."""
"""builders sub-package for the waveform extension.

Importing this package registers both the SQL builders (via @sql_builder /
@markdown_template decorators) and the ibis execution builders (via @register)
for all four waveform criteria types.
"""

# SQL / markdown builders — decorators fire on import
# Ibis execution builders — @register decorators fire on import
from . import (
ibis_waveform_channel_metadata, # noqa: F401
ibis_waveform_feature, # noqa: F401
ibis_waveform_occurrence, # noqa: F401
ibis_waveform_registry, # noqa: F401
waveform_channel_metadata, # noqa: F401
waveform_feature, # noqa: F401
waveform_occurrence, # noqa: F401
waveform_registry, # noqa: F401
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Ibis execution builder for WaveformChannelMetadata criteria."""

from __future__ import annotations

from circe.execution.build_context import BuildContext
from circe.execution.builders.common import (
apply_concept_filters,
apply_numeric_range,
apply_text_filter,
standardize_output,
)
from circe.execution.builders.groups import apply_criteria_group
from circe.extensions import ibis_builder

from ..criteria import WaveformChannelMetadata


@ibis_builder("WaveformChannelMetadata")
def build_waveform_channel_metadata(criteria: WaveformChannelMetadata, ctx: BuildContext):
"""Build an ibis event table from waveform_channel_metadata.

Channel metadata rows have no timestamps of their own; start_date/end_date
are sourced from the parent waveform_registry file bounds via a join.
person_id is resolved through the registry → occurrence chain.
"""
table = ctx.table("waveform_channel_metadata")

# Join waveform_registry to get file dates and occurrence link
registry = ctx.table("waveform_registry").select(
"waveform_registry_id",
"waveform_occurrence_id",
"file_start_datetime",
"file_end_datetime",
)
table = table.join(
registry,
table.waveform_registry_id == registry.waveform_registry_id,
)

# Join waveform_occurrence to get person_id and visit context
occurrence = ctx.table("waveform_occurrence").select(
"waveform_occurrence_id",
"person_id",
"visit_occurrence_id",
)
table = table.join(
occurrence,
table.waveform_occurrence_id == occurrence.waveform_occurrence_id,
)

# Registry link filter
table = apply_numeric_range(table, "waveform_registry_id", criteria.waveform_registry_id)

# Channel identification
if criteria.channel_concept_id:
table = apply_concept_filters(table, "channel_concept_id", criteria.channel_concept_id)
table = apply_text_filter(table, "waveform_channel_source_value", criteria.waveform_channel_source_value)

# Metadata type
if criteria.metadata_concept_id:
table = apply_concept_filters(table, "metadata_concept_id", criteria.metadata_concept_id)
table = apply_text_filter(table, "metadata_source_value", criteria.metadata_source_value)

# Metadata values
table = apply_numeric_range(table, "value_as_number", criteria.value_as_number)
if criteria.value_as_concept_id:
table = apply_concept_filters(table, "value_as_concept_id", criteria.value_as_concept_id)

# Units
if criteria.unit_concept_id:
table = apply_concept_filters(table, "unit_concept_id", criteria.unit_concept_id)

# Device / procedure linkage
table = apply_numeric_range(table, "device_exposure_id", criteria.device_exposure_id)
table = apply_numeric_range(table, "procedure_occurrence_id", criteria.procedure_occurrence_id)

events = standardize_output(
table,
primary_key=criteria.get_primary_key_column(),
start_column=criteria.get_start_date_column(),
end_column=criteria.get_end_date_column(),
)
return apply_criteria_group(events, criteria.correlated_criteria, ctx)
77 changes: 77 additions & 0 deletions circe/extensions/waveform/builders/ibis_waveform_feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Ibis execution builder for WaveformFeature criteria."""

from __future__ import annotations

from circe.execution.build_context import BuildContext
from circe.execution.builders.common import (
apply_concept_filters,
apply_date_range,
apply_numeric_range,
apply_text_filter,
standardize_output,
)
from circe.execution.builders.groups import apply_criteria_group
from circe.extensions import ibis_builder

from ..criteria import WaveformFeature


@ibis_builder("WaveformFeature")
def build_waveform_feature(criteria: WaveformFeature, ctx: BuildContext):
"""Build an ibis event table from waveform_feature.

waveform_feature stores derived measurements (heart rate, SpO2, arrhythmia
detections, AI embeddings, etc.). person_id and visit_occurrence_id are
resolved by joining to waveform_occurrence.
"""
table = ctx.table("waveform_feature")

# Join waveform_occurrence to obtain person_id and visit context
occurrence = ctx.table("waveform_occurrence").select(
"waveform_occurrence_id",
"person_id",
"visit_occurrence_id",
)
table = table.join(
occurrence,
table.waveform_occurrence_id == occurrence.waveform_occurrence_id,
)

# Parent link filters
table = apply_numeric_range(table, "waveform_occurrence_id", criteria.waveform_occurrence_id)
table = apply_numeric_range(table, "waveform_registry_id", criteria.waveform_registry_id)
table = apply_numeric_range(table, "waveform_channel_metadata_id", criteria.waveform_channel_metadata_id)

# Feature type (e.g., heart rate, SpO2, QRS)
if criteria.feature_concept_id:
table = apply_concept_filters(table, "feature_concept_id", criteria.feature_concept_id)

# Algorithm used to derive feature
if criteria.algorithm_concept_id:
table = apply_concept_filters(table, "algorithm_concept_id", criteria.algorithm_concept_id)
table = apply_text_filter(table, "algorithm_source_value", criteria.algorithm_source_value)

# Temporal window for feature
table = apply_date_range(table, "waveform_feature_start_timestamp", criteria.feature_start_timestamp)
table = apply_date_range(table, "waveform_feature_end_timestamp", criteria.feature_end_timestamp)

# Feature values
table = apply_numeric_range(table, "value_as_number", criteria.value_as_number)
if criteria.value_as_concept_id:
table = apply_concept_filters(table, "value_as_concept_id", criteria.value_as_concept_id)

# Units
if criteria.unit_concept_id:
table = apply_concept_filters(table, "unit_concept_id", criteria.unit_concept_id)

# Links to standard OMOP tables
table = apply_numeric_range(table, "measurement_id", criteria.measurement_id)
table = apply_numeric_range(table, "observation_id", criteria.observation_id)

events = standardize_output(
table,
primary_key=criteria.get_primary_key_column(),
start_column=criteria.get_start_date_column(),
end_column=criteria.get_end_date_column(),
)
return apply_criteria_group(events, criteria.correlated_criteria, ctx)
Loading
Loading