Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

Dagger is a configuration-driven framework that transforms YAML definitions into Apache Airflow DAGs. It uses dataset lineage (matching inputs/outputs) to automatically build dependency graphs across workflows.

## Common Commands

### Development Setup
```bash
make install-dev # Create venv, install package in editable mode with dev/test deps
source venv/bin/activate
```

### Testing
```bash
make test # Run all tests with coverage (sets AIRFLOW_HOME automatically)

# Run a single test file
AIRFLOW_HOME=$(pwd)/tests/fixtures/config_finder/root/ ENV=local pytest -s tests/path/to/test_file.py

# Run a specific test
AIRFLOW_HOME=$(pwd)/tests/fixtures/config_finder/root/ ENV=local pytest -s tests/path/to/test_file.py::test_function_name
```

### Linting
```bash
make lint # Run flake8 on dagger and tests directories
black dagger tests # Format code
```

### Local Airflow Testing
```bash
make test-airflow # Build and start Airflow in Docker (localhost:8080, user: dev_user, pass: dev_user)
make stop-airflow # Stop Airflow containers
```

### CLI
```bash
dagger --help
dagger list-tasks # Show available task types
dagger list-ios # Show available IO types
dagger init-pipeline # Create a new pipeline.yaml
dagger init-task --type=<task_type> # Add a task configuration
dagger init-io --type=<io_type> # Add an IO definition
dagger print-graph # Visualize dependency graph
```

## Architecture

### Core Flow
1. **ConfigFinder** discovers pipeline directories (each with `pipeline.yaml` + task YAML files)
2. **ConfigProcessor** loads YAML configs with environment variable support
3. **TaskFactory/IOFactory** use reflection to instantiate task/IO objects from YAML
4. **TaskGraph** builds a 3-layer graph: Pipeline → Task → Dataset nodes
5. **DagCreator** traverses the graph and generates Airflow DAGs using **OperatorFactory**

### Key Directories
- `dagger/pipeline/tasks/` - Task type definitions (DbtTask, SparkTask, AthenaTransformTask, etc.)
- `dagger/pipeline/ios/` - IO type definitions (S3, Redshift, Athena, Databricks, etc.)
- `dagger/dag_creator/airflow/operator_creators/` - One creator per task type, translates tasks to Airflow operators
- `dagger/graph/` - Graph construction from task inputs/outputs
- `dagger/config_finder/` - YAML discovery and loading
- `tests/fixtures/config_finder/root/dags/` - Example DAG configurations for testing

### Adding a New Task Type
1. Create task definition in `dagger/pipeline/tasks/` (subclass of Task)
2. Create any needed IOs in `dagger/pipeline/ios/` (if new data sources)
3. Create operator creator in `dagger/dag_creator/airflow/operator_creators/`
4. Register in `dagger/dag_creator/airflow/operator_factory.py`

### Configuration Files
- `pipeline.yaml` - Pipeline metadata (owner, schedule, alerts, airflow_parameters)
- `[taskname].yaml` - Task configs (type, inputs, outputs, task-specific params)
- `dagger_config.yaml` - System config (Neo4j, Elasticsearch, Spark settings)

### Key Patterns
- **Factory Pattern**: TaskFactory/IOFactory auto-discover types via reflection
- **Strategy Pattern**: OperatorCreator subclasses handle task-specific operator creation
- **Dataset Aliasing**: IO `alias()` method enables automatic dependency detection across pipelines

## Coding Standards

### Avoid getattr
Do not use `getattr` for accessing task or IO properties. Instead, define explicit properties on the class. This ensures:
- Type safety and IDE autocompletion
- Clear interface contracts
- Easier debugging and testing

```python
# Bad - avoid this pattern
value = getattr(self._task, 'some_property', default)

# Good - use explicit properties
value = self._task.some_property # Property defined on task class
```
25 changes: 20 additions & 5 deletions dagger/cli/module.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
import json

import click
import yaml

from dagger.utilities.module import Module
from dagger.utils import Printer
import json


def parse_key_value(ctx, param, value):
#print('YYY', value)
"""Parse key=value pairs where value is a path to JSON or YAML file.

Args:
ctx: Click context.
param: Click parameter.
value: List of key=value pairs.

Returns:
Dictionary mapping variable names to parsed file contents.
"""
if not value:
return {}
key_value_dict = {}
for pair in value:
try:
key, val_file_path = pair.split('=', 1)
#print('YYY', key, val_file_path, pair)
val = json.load(open(val_file_path))
with open(val_file_path, 'r') as f:
if val_file_path.endswith(('.yaml', '.yml')):
val = yaml.safe_load(f)
else:
val = json.load(f)
key_value_dict[key] = val
except ValueError:
raise click.BadParameter(f"Key-value pair '{pair}' is not in the format key=value")
Expand All @@ -22,7 +37,7 @@ def parse_key_value(ctx, param, value):
@click.command()
@click.option("--config_file", "-c", help="Path to module config file")
@click.option("--target_dir", "-t", help="Path to directory to generate the task configs to")
@click.option("--jinja_parameters", "-j", callback=parse_key_value, multiple=True, default=None, help="Path to jinja parameters json file in the format: <jinja_variable_name>=<path to json file>")
@click.option("--jinja_parameters", "-j", callback=parse_key_value, multiple=True, default=None, help="Jinja parameters file in the format: <var_name>=<path to json/yaml file>")
def generate_tasks(config_file: str, target_dir: str, jinja_parameters: dict) -> None:
"""
Generating tasks for a module based on config
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Operator creator for Databricks DLT (Delta Live Tables) pipelines."""

import logging
from typing import Any

from airflow.models import BaseOperator, DAG

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.pipeline.tasks.databricks_dlt_task import DatabricksDLTTask

_logger = logging.getLogger(__name__)


def _cancel_databricks_run(context: dict[str, Any]) -> None:
"""Cancel a Databricks job run when task fails or is cleared.

This callback retrieves the run_id from XCom and cancels the corresponding
Databricks job run. Used as on_failure_callback to ensure jobs are cancelled
when tasks are marked as failed.

Args:
context: Airflow context dictionary containing task instance and other metadata.
"""
ti = context.get("task_instance")
if not ti:
_logger.warning("No task instance in context, cannot cancel Databricks run")
return

# Get run_id from XCom (pushed by DatabricksRunNowOperator)
run_id = ti.xcom_pull(task_ids=ti.task_id, key="run_id")
if not run_id:
_logger.warning(f"No run_id found in XCom for task {ti.task_id}")
return

# Get the databricks_conn_id from the operator (set during operator creation)
databricks_conn_id = ti.task.databricks_conn_id

# Import here to avoid import errors if databricks provider not installed
# and to only import when actually needed (after early returns)
try:
from airflow.providers.databricks.hooks.databricks import DatabricksHook

hook = DatabricksHook(databricks_conn_id=databricks_conn_id)
hook.cancel_run(run_id)
_logger.info(f"Cancelled Databricks run {run_id} for task {ti.task_id}")
except ImportError:
_logger.error(
"airflow-providers-databricks is not installed, cannot cancel run"
)
except Exception as e:
_logger.error(f"Failed to cancel Databricks run {run_id}: {e}")


class DatabricksDLTCreator(OperatorCreator):
"""Creates operators for triggering Databricks DLT pipelines via Jobs.

This creator uses DatabricksRunNowOperator to trigger a Databricks Job
that wraps the DLT pipeline. The job is identified by name and must be
defined in the Databricks Asset Bundle.

Attributes:
ref_name: Reference name used by OperatorFactory to match this creator
with DatabricksDLTTask instances.
"""

ref_name: str = "databricks_dlt"

def __init__(self, task: DatabricksDLTTask, dag: DAG) -> None:
"""Initialize the DatabricksDLTCreator.

Args:
task: The DatabricksDLTTask containing pipeline configuration.
dag: The Airflow DAG this operator will belong to.
"""
super().__init__(task, dag)

def _create_operator(self, **kwargs: Any) -> BaseOperator:
"""Create a DatabricksRunNowOperator for the DLT pipeline.

Creates an Airflow operator that triggers an existing Databricks Job
by name. The job must have a pipeline_task that references the DLT
pipeline.

Args:
**kwargs: Additional keyword arguments passed to the operator.

Returns:
A configured DatabricksRunNowOperator instance.

Raises:
ValueError: If job_name is empty or not provided.
"""
# Import here to avoid import errors if databricks provider not installed
from datetime import timedelta

from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
)

# Get task parameters - defaults are handled in DatabricksDLTTask
job_name: str = self._task.job_name
if not job_name:
raise ValueError(
f"job_name is required for DatabricksDLTTask '{self._task.name}'"
)
databricks_conn_id: str = self._task.databricks_conn_id
wait_for_completion: bool = self._task.wait_for_completion
poll_interval_seconds: int = self._task.poll_interval_seconds
timeout_seconds: int = self._task.timeout_seconds

# DatabricksRunNowOperator triggers an existing Databricks Job by name
# The job must have a pipeline_task that references the DLT pipeline
# Note: timeout is handled via Airflow's execution_timeout, not a direct parameter
# Note: on_kill() is already implemented in DatabricksRunNowOperator to cancel runs
# We add on_failure_callback to also cancel when task is marked as failed
operator: BaseOperator = DatabricksRunNowOperator(
dag=self._dag,
task_id=self._task.name,
databricks_conn_id=databricks_conn_id,
job_name=job_name,
wait_for_termination=wait_for_completion,
polling_period_seconds=poll_interval_seconds,
execution_timeout=timedelta(seconds=timeout_seconds),
do_xcom_push=True, # Required to store run_id for cancellation callback
on_failure_callback=_cancel_databricks_run,
**kwargs,
)

return operator
1 change: 1 addition & 0 deletions dagger/dag_creator/airflow/operator_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
airflow_op_creator,
athena_transform_creator,
batch_creator,
databricks_dlt_creator,
dbt_creator,
dummy_creator,
python_creator,
Expand Down
Loading