Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ uv.lock
examples/**/eval_*
examples/**/uv.lock
**/lancedb_data/

.cocoindex_code/
1 change: 0 additions & 1 deletion .python-version

This file was deleted.

14 changes: 14 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,17 @@ We distinguish between **internal modules** (under packages with `_` prefix, e.g
### Testing Guidelines

We prefer end-to-end tests on user-facing APIs, over unit tests on smaller internal functions. With this said, there're cases where unit tests are necessary, e.g. for internal logic with various situations and edge cases, in which case it's usually easier to cover various scenarios with unit tests.

### Sync vs Async

The Rust core (`rust/core`, `rust/utils`) uses **async-first** design with Tokio. The `rust/py` crate bridges Rust async to Python, offering both sync and async APIs:

* Rust core exposes async functions
* `rust/py` provides sync wrappers that use `block_on()` to call async Rust from sync Python
* Python gets both `cocoindex` (sync) and `cocoindex.asyncio` (async) APIs

When adding new functionality that involves I/O or concurrency:

* Implement async in Rust
* Bridge to Python via `rust/py`, providing both sync and async variants if needed
* Avoid `asyncio.run()` in Python when Rust can handle the sync/async bridging
72 changes: 72 additions & 0 deletions docs/docs/programming_guide/function.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,75 @@ def process_chunk(chunk: Chunk) -> Embedding:
:::note
The change tracking capability is still under construction.
:::

### Batching

With `batching=True`, multiple concurrent calls to the function are automatically batched together. This is useful for operations that are more efficient when processing multiple inputs at once, such as embedding models.

When batching is enabled:

- The function implementation receives a `list[T]` and returns a `list[R]`
- The external signature becomes `T -> R` (single input, single output)
- Concurrent calls are collected and processed together

```python
@coco.function(batching=True, max_batch_size=32)
def embed(texts: list[str]) -> list[list[float]]:
# Called with a batch of texts, returns a batch of embeddings
return model.encode(texts)

# External usage: single input, single output
embedding = embed("hello world") # Returns list[float]

# Concurrent calls are automatically batched
with ThreadPoolExecutor() as pool:
embeddings = list(pool.map(embed, ["text1", "text2", "text3"]))
```

The `max_batch_size` parameter limits how many inputs can be processed in a single batch.

:::tip When to use batching

Batching is beneficial when:

- The underlying operation has significant per-call overhead (e.g., GPU kernel launch)
- The operation can process multiple inputs more efficiently than one at a time
- You have concurrent calls from multiple threads or coroutines

Common use cases:

- **Embedding models** — most embedding APIs and models are optimized for batch processing
- **LLM inference** — batch multiple prompts together for better GPU utilization
- **Database operations** — batch inserts or lookups

:::

### Runner

The `runner` parameter allows functions to execute in a specific context, such as a subprocess for GPU isolation. This is useful when you need to isolate GPU memory or run code in a separate process.

```python
@coco.function(runner=coco.GPU)
def gpu_inference(data: bytes) -> bytes:
# This runs in a subprocess with GPU isolation
return model.predict(data)
```

The `coco.GPU` runner:

- Executes the function in a subprocess
- All functions using the same runner share a queue, ensuring serial execution
- Useful for GPU workloads that need memory isolation

You can combine batching with a runner:

```python
@coco.function(batching=True, max_batch_size=16, runner=coco.GPU)
def batch_gpu_embed(texts: list[str]) -> list[list[float]]:
# Batched execution in a subprocess with GPU isolation
return gpu_model.encode(texts)
```

:::note
When using a runner, the function and all its arguments must be picklable since they are serialized for subprocess execution.
:::
5 changes: 5 additions & 0 deletions python/cocoindex/_internal/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

from .function import function

from .runner import GPU, Runner

from .memo_key import register_memo_key_function

from .pending_marker import PendingS, ResolvedS, MaybePendingS, ResolvesTo
Expand Down Expand Up @@ -63,6 +65,9 @@
"lifespan",
# .fn
"function",
# .runner
"GPU",
"Runner",
# .memo_key
"register_memo_key_function",
# .pending_marker
Expand Down
71 changes: 67 additions & 4 deletions python/cocoindex/_internal/core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -270,20 +270,83 @@ class RWLockReadGuard:
def release(self) -> None: ...
def __enter__(self) -> "RWLockReadGuard": ...
def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None: ...
def __aenter__(self) -> Coroutine[Any, Any, "RWLockReadGuard"]: ...
def __aexit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> Coroutine[Any, Any, None]: ...

class RWLockWriteGuard:
def release(self) -> None: ...
def __enter__(self) -> "RWLockWriteGuard": ...
def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None: ...
def __aenter__(self) -> Coroutine[Any, Any, "RWLockWriteGuard"]: ...
def __aexit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> Coroutine[Any, Any, None]: ...

########################################################
# Batching Infrastructure
########################################################

# --- BatchingOptions ---
class BatchingOptions:
"""Options for batching behavior."""

max_batch_size: int | None

def __new__(cls, max_batch_size: int | None = None) -> "BatchingOptions": ...

# --- BatchQueue ---
class BatchQueue:
"""A shared queue that processes batches in FIFO order.

Multiple batchers can share the same queue. Each batcher provides its own
runner function, and batches are processed using the runner from the batcher
that created them.
"""

def __new__(cls) -> "BatchQueue": ...

# --- Batcher ---
class Batcher:
"""A batcher that collects inputs and submits them to a shared queue.

Each batcher maintains at most one non-full, non-sealed batch in the queue.
When inputs are submitted, they are added to the current batch or a new batch is created.

Multiple batchers can share the same queue with different runner functions.
Each batch uses the runner function from the batcher that created it.
"""

@staticmethod
def new_sync(
queue: BatchQueue,
options: BatchingOptions,
runner_fn: Callable[[list[Any]], list[Any]],
async_ctx: AsyncContext,
) -> "Batcher": ...
@staticmethod
def new_async(
queue: BatchQueue,
options: BatchingOptions,
runner_fn: Callable[[list[Any]], Coroutine[Any, Any, list[Any]]],
async_ctx: AsyncContext,
) -> "Batcher": ...
def run(self, input: Any) -> Coroutine[Any, Any, Any]: ...
def run_sync(self, input: Any) -> Any: ...
30 changes: 24 additions & 6 deletions python/cocoindex/_internal/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class Environment:
"_settings",
"_context_provider",
"_loop_runner",
"_async_context",
"_info",
"__weakref__",
)
Expand All @@ -196,6 +197,7 @@ class Environment:
_settings: setting.Settings
_context_provider: ContextProvider
_loop_runner: _LoopRunner
_async_context: core.AsyncContext
_info: EnvironmentInfo

def __init__(
Expand All @@ -214,10 +216,7 @@ def __init__(
self._context_provider = context_provider or ContextProvider()

if event_loop is None:
try:
event_loop = asyncio.get_running_loop()
except RuntimeError:
event_loop = asyncio.new_event_loop()
event_loop = get_event_loop_or_default()

if event_loop.is_running():
self._loop_runner = _LoopRunner.from_running_loop(event_loop)
Expand All @@ -227,8 +226,10 @@ def __init__(
runner.ensure_running()
self._loop_runner = runner

async_context = core.AsyncContext(self._loop_runner.loop)
self._core_env = core.Environment(dump_engine_object(settings), async_context)
self._async_context = core.AsyncContext(self._loop_runner.loop)
self._core_env = core.Environment(
dump_engine_object(settings), self._async_context
)
self._info = info or EnvironmentInfo(self)

@property
Expand All @@ -247,6 +248,11 @@ def context_provider(self) -> ContextProvider:
def event_loop(self) -> asyncio.AbstractEventLoop:
return self._loop_runner.loop

@property
def async_context(self) -> core.AsyncContext:
"""Get the AsyncContext for this environment's event loop."""
return self._async_context

def get_context(self, key: ContextKey[T]) -> T:
"""Get a context value provided during this environment's lifespan.

Expand Down Expand Up @@ -502,3 +508,15 @@ def reset_default_env_for_tests() -> None:
This is intended for tests so lifespan registration does not leak across test modules.
"""
asyncio.run(_default_env._reset())


def get_event_loop_or_default() -> asyncio.AbstractEventLoop:
"""Get the running event loop, or the default background loop if none.

Returns the currently running event loop if called from an async context.
Otherwise, returns the shared background loop (from default_env_loop()).
"""
try:
return asyncio.get_running_loop()
except RuntimeError:
return default_env_loop()
Loading
Loading