Skip to content

Allow specifying default result persistence separately for tasks and flows #15401

@cicdw

Description

@cicdw

Discussed in #15381

Originally posted by jggatter September 13, 2024
Hi,

I have a flow that orchestrates several subflows. My subflows have tasks that each return large dataset objects (AnnData objects) which can be several GB large.

I want to persist results from these subflows, so I enabled PREFECT_RESULTS_PERSIST_BY_DEFAULT=true. However, I was surprised by this blurb about enabling persistence:

Enabling persistence on a flow enables persistence by default for its tasks

Enabling result persistence on a flow through any of the above keywords will also enable it for all tasks called within that flow by default.

Any settings explicitly set on a task take precedence over the flow settings.

I discovered that, when enabled by the env variable, my flows started running out of memory when the tasks tried to persist these large objects to S3. I had expected that this option only affects flows. Now I can definitely see it being useful for tasks, but I do not want it automatically enabled for all my tasks across all my subflows because they need to return these large objects.

I still want to persist the state of my subflows so that I can retry and pick up the parent flow from the failed subflow instead of the beginning. Rather than going through all the tasks in my flows and explicitly disabling this option, it would be great if there were instead additional settings to control or override the behavior for tasks versus flows. Would it be possible to add:

PREFECT_RESULTS_PERSIST_TASKS_BY_DEFAULT
PREFECT_RESULTS_PERSIST_FLOWS_BY_DEFAULT

To further control the behavior via the environment or config?

Or perhaps I am overlooking something: Is it possible to set only the parent flow to persist results of the subflow deployments, and within the subflow deployments themselves not have it enabled?

My parent flow code resembles:

from prefect import flow, task, get_run_logger
import prefect.deployments


@task
def run_curation(
    plate_name: str, bfx_run_label: str, bio_exp_id: int, dev: bool = False
):
    flow_run = prefect.deployments.run_deployment(
        name=f'ild-curation/ild-curation-{"dev" if dev else "prod"}',
        parameters={
            "plate_name": plate_name,
            "bfx_run_label": bfx_run_label,
            "bio_exp_id": bio_exp_id,
        },
    )
    if flow_run.state.name != 'Completed':  # Before I was using flow_run.state.result() to check the persisted state!
        raise RuntimeError(
            f'Flow run {flow_run.name!r} state is {flow_run.state.name!r}'
        )


@flow
def ild_workflow(
    plate_name: str, bfx_run_label: str, bio_exp_id: int, dev: bool = False
):
    logger = get_run_logger()
    logger.info(f'Running {"dev" if dev else "prod"} ILD workflow for {plate_name=}...')

    logger.info("Launching curation...")
    run_curation(plate_name, bfx_run_label, bio_exp_id, dev)

   # More subflows are called in the same fashion</div>

Metadata

Metadata

Assignees

Labels

enhancementAn improvement of an existing feature

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions