Skip to content

Update pd_accessors.py#308

Closed
rrawatt wants to merge 3 commits intoucbepic:mainfrom
rrawatt:main
Closed

Update pd_accessors.py#308
rrawatt wants to merge 3 commits intoucbepic:mainfrom
rrawatt:main

Conversation

@rrawatt
Copy link
Contributor

@rrawatt rrawatt commented Feb 9, 2025

added split and gather

added split and gather
@shreyashankar
Copy link
Collaborator

Thank you for the contribution! By any chance do you have any testing code? Otherwise i can write some, no worries.

@rrawatt
Copy link
Contributor Author

rrawatt commented Mar 6, 2025

sorry, I had midsem exams so I was busy with that.

@rrawatt
Copy link
Contributor Author

rrawatt commented Mar 6, 2025

import sys
import types
import pytest
import pandas as pd
from typing import Any, Dict, List, NamedTuple, Optional, Union
from rich.panel import Panel

#Dummy Module Setup
class DummyConsole:
    def log(self, message):
        pass


class DummyDSLRunner:
    def __init__(self, config, from_df_accessors=False):
        self.config = config
        self.max_threads = 1
        self.console = DummyConsole()
        self.status = None


runner_module = types.ModuleType("docetl.runner")
runner_module.DSLRunner = DummyDSLRunner
sys.modules["docetl.runner"] = runner_module

optimizer_module = types.ModuleType("docetl.optimizer")
class DummyOptimizer:
    def __init__(self, runner):
        self.runner = runner
optimizer_module.Optimizer = DummyOptimizer
sys.modules["docetl.optimizer"] = optimizer_module


join_optimizer_module = types.ModuleType("docetl.optimizers.join_optimizer")
class DummyJoinOptimizer:
    def __init__(self, runner, config, target_recall=0.95, estimated_selectivity=None):
        self.runner = runner
        self.config = config
    def optimize_equijoin(self, left_data, right_data, skip_map_gen=True, skip_containment_gen=True):
        return self.config, 0.0, None
    def optimize_resolve(self, input_data):
        return self.config, 0.0
join_optimizer_module.JoinOptimizer = DummyJoinOptimizer
sys.modules["docetl.optimizers.join_optimizer"] = join_optimizer_module

map_module = types.ModuleType("docetl.operations.map")
class MapOperation:
    def __init__(self, runner, config, default_model, max_threads, console, status):
        self.runner = runner
        self.config = config
    def execute(self, input_data):
        return input_data, 0.0
map_module.MapOperation = MapOperation
sys.modules["docetl.operations.map"] = map_module

eq_module = types.ModuleType("docetl.operations.equijoin")
class EquijoinOperation:
    def __init__(self, runner, config, default_model, max_threads, console, status):
        self.runner = runner
        self.config = config
    def execute(self, left_data, right_data):
        return left_data + right_data, 0.0
eq_module.EquijoinOperation = EquijoinOperation
sys.modules["docetl.operations.equijoin"] = eq_module

filter_module = types.ModuleType("docetl.operations.filter")
class FilterOperation:
    def __init__(self, runner, config, default_model, max_threads, console, status):
        self.runner = runner
        self.config = config
    def execute(self, input_data):
        return input_data, 0.0
filter_module.FilterOperation = FilterOperation
sys.modules["docetl.operations.filter"] = filter_module

reduce_module = types.ModuleType("docetl.operations.reduce")
class ReduceOperation:
    def __init__(self, runner, config, default_model, max_threads, console, status):
        self.runner = runner
        self.config = config
    def execute(self, input_data):
        return input_data, 0.0
reduce_module.ReduceOperation = ReduceOperation
sys.modules["docetl.operations.reduce"] = reduce_module

resolve_module = types.ModuleType("docetl.operations.resolve")
class ResolveOperation:
    def __init__(self, runner, config, default_model, max_threads, console, status):
        self.runner = runner
        self.config = config
    def execute(self, input_data):
        return input_data, 0.0
resolve_module.ResolveOperation = ResolveOperation
sys.modules["docetl.operations.resolve"] = resolve_module

split_module = types.ModuleType("docetl.operations.split")
class SplitOperation:
    def __init__(self, runner, config, default_model, max_threads, console, status):
        self.runner = runner
        self.config = config
    def execute(self, input_data):
        pass
split_module.SplitOperation = SplitOperation
sys.modules["docetl.operations.split"] = split_module

gather_module = types.ModuleType("docetl.operations.gather")
class GatherOperation:
    def __init__(self, runner, config, default_model, max_threads, console, status):
        self.runner = runner
        self.config = config
    def execute(self, input_data):
        pass
gather_module.GatherOperation = GatherOperation
sys.modules["docetl.operations.gather"] = gather_module

#Pandas Accessor Implementation

class OpHistory(NamedTuple):
    op_type: str
    config: Dict[str, Any]
    output_columns: List[str]

@pd.api.extensions.register_dataframe_accessor("semantic")
class SemanticAccessor:
    def __init__(self, df: pd.DataFrame):
        self._df = df
        self._history = df.attrs.get("_semantic_history", []).copy()
        self._costs = df.attrs.get("_semantic_costs", 0.0)
        config = df.attrs.get(
            "_semantic_config",
            {
                "default_model": "gpt-4o-mini",
                "operations": [],
                "datasets": {},
                "pipeline": {"steps": []},
            },
        )
        from docetl.runner import DSLRunner
        self.runner = DSLRunner(config, from_df_accessors=True)
        from docetl.optimizer import Optimizer
        builder = Optimizer(self.runner)
        self.runner.optimizer = builder

    def set_config(self, **config):
        self.runner.config.update(config)
        from docetl.optimizer import Optimizer
        builder = Optimizer(self.runner)
        self.runner.optimizer = builder

    def _record_operation(self, data: List[Dict], op_type: str, config: Dict[str, Any], cost: float) -> pd.DataFrame:
        result_df = pd.DataFrame(data)
        new_cols = list(set(result_df.columns) - set(self._df.columns))
        serializable_config = self._make_serializable(config)
        entry = OpHistory(op_type, serializable_config, new_cols)
        self._history.append(entry)
        self._costs += cost
        result_df.attrs["_semantic_history"] = self._history
        result_df.attrs["_semantic_costs"] = self._costs
        result_df.attrs["_semantic_config"] = self._make_serializable(self.runner.config)
        return result_df

    def _make_serializable(self, obj: Any) -> Any:
        if isinstance(obj, dict):
            return {k: self._make_serializable(v) for k, v in obj.items()}
        elif isinstance(obj, (list, tuple)):
            return [self._make_serializable(x) for x in obj]
        elif isinstance(obj, (type({}.keys()), type({}.values()), type({}.items()))):
            return list(obj)
        else:
            return obj

    def _get_column_history(self, column: str) -> List[OpHistory]:
        return [op for op in self._history if column in op.output_columns]

    def _synthesize_comparison_context(self, keys: List[str]) -> str:
        context_parts = []
        for key in keys:
            history = self._get_column_history(key)
            if history:
                last_op = history[-1]
                if "prompt" in last_op.config:
                    last_op_prompt = last_op.config["prompt"]
                    last_op_prompt = last_op_prompt.replace("{{", "{").replace("}}", "}")
                    context_parts.append(f"The field '{key}' was created using this prompt: {last_op_prompt}")
        if context_parts:
            return "\n\nContext about these fields:\n" + "\n".join(context_parts)
        return ""


    def split(self, prompt: str, output_schema: Dict[str, Any], **kwargs) -> pd.DataFrame:
        input_data = self._df.to_dict("records")
        split_config = {
            "type": "split",
            "name": f"semantic_split_{len(self._history)}",
            "prompt": prompt,
            "output": {"schema": output_schema},
            **kwargs,
        }
        from docetl.operations.split import SplitOperation
        split_op = SplitOperation(
            runner=self.runner,
            config=split_config,
            default_model=self.runner.config["default_model"],
            max_threads=self.runner.max_threads,
            console=self.runner.console,
            status=self.runner.status,
        )
        results, cost = split_op.execute(input_data)
        return self._record_operation(results, "split", split_config, cost)

    def gather(self, prompt: str, output_schema: Dict[str, Any], **kwargs) -> pd.DataFrame:
        input_data = self._df.to_dict("records")
        gather_config = {
            "type": "gather",
            "name": f"semantic_gather_{len(self._history)}",
            "prompt": prompt,
            "output": {"schema": output_schema},
            **kwargs,
        }
        from docetl.operations.gather import GatherOperation
        gather_op = GatherOperation(
            runner=self.runner,
            config=gather_config,
            default_model=self.runner.config["default_model"],
            max_threads=self.runner.max_threads,
            console=self.runner.console,
            status=self.runner.status,
        )
        results, cost = gather_op.execute(input_data)
        return self._record_operation(results, "gather", gather_config, cost)

    @property
    def total_cost(self) -> float:
        return self._costs

    @property
    def history(self) -> List[OpHistory]:
        return self._history.copy()

#Dummy Implementations

def dummy_split_execute(self, input_data):
    """Simulate a split operation by splitting the 'text' column into words.
       Each word becomes a new row with a 'sentence' key."""
    new_data = []
    for row in input_data:
        text = row.get("text", "")
        for word in text.split():
            new_data.append({"sentence": word})
    return new_data, 0.01

def dummy_gather_execute(self, input_data):
    """Simulate a gather operation by concatenating all 'text' values into a single summary."""
    combined_text = " ".join(row.get("text", "") for row in input_data)
    return [{"summary": combined_text}], 0.02

#Test Cases---

def test_split(monkeypatch):
    from docetl.operations.split import SplitOperation
    monkeypatch.setattr(SplitOperation, "execute", dummy_split_execute)
    df = pd.DataFrame({"text": ["Hello world", "Test sentence"]})
    result = df.semantic.split(
        prompt="Split the text in {{input.text}} into words.",
        output_schema={"sentence": "str"}
    )
    assert "sentence" in result.columns, "Output DataFrame must have a 'sentence' column."
    expected_rows = sum(len(text.split()) for text in df["text"])
    assert len(result) == expected_rows, "Row count should equal the total words from input."
    history = result.attrs.get("_semantic_history", [])
    assert any(op.op_type == "split" for op in history), "History should record a 'split' operation."
    total_cost = result.attrs.get("_semantic_costs", 0.0)
    assert total_cost == 0.01, "Total cost should match the dummy cost for split."

def test_gather(monkeypatch):
    from docetl.operations.gather import GatherOperation
    monkeypatch.setattr(GatherOperation, "execute", dummy_gather_execute)
    df = pd.DataFrame({"text": ["Hello world", "Test sentence"]})
    result = df.semantic.gather(
        prompt="Summarize the following texts: {{input.text}}",
        output_schema={"summary": "str"}
    )
    assert "summary" in result.columns, "Output DataFrame must have a 'summary' column."
    assert len(result) == 1, "Gather operation should return a single summarized row."
    expected_summary = "Hello world Test sentence"
    assert result.iloc[0]["summary"] == expected_summary, "Summary output does not match expected."
    history = result.attrs.get("_semantic_history", [])
    assert any(op.op_type == "gather" for op in history), "History should record a 'gather' operation."
    total_cost = result.attrs.get("_semantic_costs", 0.0)
    assert total_cost == 0.02, "Total cost should match the dummy cost for gather."

if __name__ == '__main__':
    pytest.main([__file__])

@shreyashankar
Copy link
Collaborator

Closing as the operation output schemas don't match up with what is in the documentation for split and gather.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants