Skip to content

pin s3 worker count for kubernetes tasks #2259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open

Conversation

savingoyal
Copy link
Collaborator

No description provided.

@@ -596,6 +597,8 @@ def create_job_object(
"METAFLOW_ARGO_WORKFLOWS_ENV_VARS_TO_SKIP",
ARGO_WORKFLOWS_ENV_VARS_TO_SKIP,
)
# TODO: Set this for AWS Batch too.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to set the right vars in argo workflows and airflow too

@@ -382,6 +382,7 @@ def _to_job(self, node):
"METAFLOW_DATATOOLS_S3ROOT": DATATOOLS_S3ROOT,
"METAFLOW_DEFAULT_DATASTORE": self.flow_datastore.TYPE,
"METAFLOW_DEFAULT_METADATA": "service",
"METAFLOW_S3_WORKER_COUNT": max(1, int(k8s_deco.attributes["cpu"]) - 2),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we need a similar change in argo workflows for pods and jobsets?

@@ -262,6 +262,7 @@ def create_job(
.environment_variable("METAFLOW_DEFAULT_DATASTORE", "s3")
.environment_variable("METAFLOW_DEFAULT_METADATA", DEFAULT_METADATA)
.environment_variable("METAFLOW_CARD_S3ROOT", CARD_S3ROOT)
.environment_variable("METAFLOW_S3_WORKER_COUNT", max(1, int(cpu) - 2))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we need a change in step functions as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

step-functions calls the batch implementation so this should apply there as well. will verify

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirmed, step-functions is setting the env correctly through the changes in Batch

@saikonen
Copy link
Collaborator

saikonen commented Feb 18, 2025

noting something extra that needs to be handled here:

from metaflow import step, FlowSpec, resources
import os

class S3WorkerFlow(FlowSpec):
    @resources(cpu=1)
    @step
    def start(self):
        val = int(os.environ.get("METAFLOW_S3_WORKER_COUNT"))
        assert val == 1, "Worker count should be 1!"
        self.next(self.bigger)

    @resources(cpu=4)
    @step
    def bigger(self):
        val = int(os.environ.get("METAFLOW_S3_WORKER_COUNT"))
        assert val == 2, "Worker count should be 2!"
        self.next(self.end)

    @step
    def end(self):
        print("Done! 🏁")


if __name__ == "__main__":
    S3WorkerFlow()

running this --with kubernetes or argo workflows fails, as resource decorator values are cast to a str-float, which fails to convert back to an int

@saikonen
Copy link
Collaborator

saikonen commented Mar 4, 2025

running this --with kubernetes or argo workflows fails, as resource decorator values are cast to a str-float, which fails to convert back to an int

The cause for this is the different defaults in @batch compared to @kubernetes for the cpu value, which leads to a string-float cpu value with resources&kubernetes, but a string-int cpu value for resources&batch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants