Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Python] File staging to user worker support #34208

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,16 @@ def _add_argparse_args(cls, parser):
'staged in the staging area (--staging_location option) and the '
'workers will install them in same order they were specified on '
'the command line.'))
parser.add_argument(
'--files_to_stage',
dest='files_to_stage',
action='append',
default=None,
help=(
'Local path to a file. During job submission, the files will be '
'staged in the staging area (--staging_location option) and then '
'workers will upload them to the worker specific staging location '
'(e.g. /tmp/staged/ for portable runner.'))
parser.add_argument(
'--prebuild_sdk_container_engine',
help=(
Expand Down Expand Up @@ -1891,9 +1901,6 @@ def _add_argparse_args(cls, parser):
)


# TODO(silviuc): Add --files_to_stage option.
# This could potentially replace the --requirements_file and --setup_file.

# TODO(silviuc): Non-standard options. Keep them? If yes, add help too!
# Remote execution must check that this option is not None.

Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ def create_job_resources(
Stager._create_file_stage_to_artifact(
tarball_file, WORKFLOW_TARBALL_FILE))

if setup_options.files_to_stage is not None:
for file in setup_options.files_to_stage.split(','):
resources.append(
Stager._create_file_stage_to_artifact(
file, os.path.basename(file)))

# Handle extra local packages that should be staged.
if setup_options.extra_packages is not None:
resources.extend(
Expand Down
24 changes: 24 additions & 0 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,30 @@ def test_sdk_location_local_directory(self):
with open(tarball_path) as f:
self.assertEqual(f.read(), 'Package content.')

def test_files_to_stage(self):
staging_dir = self.make_temp_dir()
source_dir = self.make_temp_dir()

foo_ca = os.path.join(source_dir, 'foo.ca')
self.create_temp_file(foo_ca, 'ca content')
test_txt = os.path.join(source_dir, 'test.txt')
self.create_temp_file(test_txt, 'test content')
files_to_stage = ','.join([foo_ca, test_txt])
options = PipelineOptions()
self.update_options(options)
options.view_as(SetupOptions).files_to_stage = files_to_stage

self.assertEqual(
['foo.ca', 'test.txt', stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1])
foo = os.path.join(staging_dir, 'foo.ca')
with open(foo) as f:
self.assertEqual(f.read(), 'ca content')
txt = os.path.join(staging_dir, 'test.txt')
with open(txt) as f:
self.assertEqual(f.read(), 'test content')

def test_sdk_location_local_source_file(self):
staging_dir = self.make_temp_dir()
sdk_directory = self.make_temp_dir()
Expand Down
Loading