Skip to content

Native map / fan-out component for iterating a sub-pipeline over a list with aggregation #11521

@AlessioNar

Description

@AlessioNar

Problem statement

Haystack pipelines are single-pass DAGs. There is currently no first-class way to iterate a portion of a pipeline over a list of items and aggregate the results at the end within one pipeline declaration. Concretely, given a long list of documents that cannot all fit in a single LLM context window, there is no native construct to say "run this PromptBuilder → Generator → Extractor sub-graph once per chunk of N documents, then merge the outputs."

This forces every team doing batch LLM processing to push the iteration outside the pipeline (a host-language for loop calling pipeline.run() per batch). That works, but it:

  • splits one logical operation across the pipeline definition and external orchestration code, so the YAML is no longer self-contained;
  • loses any pipeline-level view of the aggregate (the merge happens in app code, not in the graph);
  • makes concurrency the caller's problem, and concurrency is unsafe against a single cached Pipeline instance (run-state and any per-run mutation like streaming_callback are not reentrant), so callers must rebuild/pool instances themselves.

What I verified (Haystack 2.29.0)

  • Pipeline.run(data, …) is a single DAG pass — no run_batch/map API.
  • Loops (docs/pipeline-loops) exist but are designed for feedback/self-correction and are bounded by max_runs_per_component (default 100). Using them for list iteration requires a custom "slice next batch" + "accumulate" component plus ConditionalRouter/BranchJoiner routing, runs strictly sequentially, and works against the grain of the feature.
  • AsyncPipeline (docs/asyncpipeline) parallelizes independent components within one DAG pass (concurrency_limit, default 4) — it does not provide dynamic fan-out over a list.

So there is no idiomatic, declarable way to express "for each batch of the input list, run this sub-graph, then aggregate."

Proposed solution

A native fan-out/aggregate construct — a Map (a.k.a. ForEach / Iterator) component or pipeline primitive — that:

  1. takes a list input and a batch_size (or splitter strategy),
  2. runs a designated sub-pipeline once per chunk,
  3. collects the per-chunk outputs into a single aggregated output,
  4. optionally executes the chunks concurrently on top of AsyncPipeline, with a concurrency_limit, handling instance isolation internally so users don't have to.

Sketch of the desired YAML ergonomics

components:
  analyse_batches:
    type: haystack.components.control.Map      # proposed
    init_parameters:
      over: documents          # the list input to chunk
      batch_size: 20
      concurrency_limit: 4     # 1 = sequential; >1 uses AsyncPipeline under the hood
      aggregate: concat        # concat | flatten | custom joiner ref
      body:                    # a sub-pipeline run once per chunk
        components:
          prompt_builder: { type: ...PromptBuilder, init_parameters: { template: "... {% for doc in batch %} ... {% endfor %}" } }
          generator:      { type: ...OpenAIGenerator, init_parameters: { ... } }
          extractor:      { type: ...JsonExtractor }
        connections:
          - { sender: prompt_builder.prompt, receiver: generator.prompt }
          - { sender: generator.replies,     receiver: extractor.replies }
        outputs: { result: extractor.documents }

connections:
  - { sender: analyse_batches.result, receiver: output_joiner.value }

Why the existing alternatives are insufficient

Alternative Limitation
External for loop over pipeline.run() Iteration + aggregation live outside the graph; not self-contained; concurrency and instance-isolation are the caller's burden.
Cyclic loop (ConditionalRouter + BranchJoiner + custom accumulator) Requires a bespoke slice/accumulate component; sequential only; semantically a misuse of refinement loops; brittle routing.
AsyncPipeline Parallelizes independent declared branches, not iteration over a list; no dynamic fan-out.

Additional context

Use case: document analysis pipelines that prompt an LLM over batches of documents, then merge per-batch JSON results. Batching exists purely to respect context/max_tokens limits. A native map-with-aggregation — ideally reusing AsyncPipeline for concurrent chunks with safe per-chunk isolation — would let the whole operation live in one declarative pipeline and remove a recurring class of external orchestration boilerplate.


👋 Hello there! This issue will be handled internally and isn’t open for external contributions. If you’d like to contribute, please take a look at issues labeled contributions welcome or good first issue. We’d really appreciate it!

Metadata

Metadata

Assignees

No one assigned

    Labels

    P3Low priority, leave it in the backlog
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions