diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index fd6b9c5a0503..064eec349f50 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1560,6 +1560,16 @@ def _add_argparse_args(cls, parser): 'staged in the staging area (--staging_location option) and the ' 'workers will install them in same order they were specified on ' 'the command line.')) + parser.add_argument( + '--files_to_stage', + dest='files_to_stage', + action='append', + default=None, + help=( + 'Local path to a file. During job submission, the files will be ' + 'staged in the staging area (--staging_location option) and then ' + 'workers will upload them to the worker specific staging location ' + '(e.g. /tmp/staged/ for portable runner.')) parser.add_argument( '--prebuild_sdk_container_engine', help=( @@ -1891,9 +1901,6 @@ def _add_argparse_args(cls, parser): ) -# TODO(silviuc): Add --files_to_stage option. -# This could potentially replace the --requirements_file and --setup_file. - # TODO(silviuc): Non-standard options. Keep them? If yes, add help too! # Remote execution must check that this option is not None. diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index c7142bfddcaf..3a6500581148 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -292,6 +292,12 @@ def create_job_resources( Stager._create_file_stage_to_artifact( tarball_file, WORKFLOW_TARBALL_FILE)) + if setup_options.files_to_stage is not None: + for file in setup_options.files_to_stage.split(','): + resources.append( + Stager._create_file_stage_to_artifact( + file, os.path.basename(file))) + # Handle extra local packages that should be staged. if setup_options.extra_packages is not None: resources.extend( diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 5535989a5786..0cbd233c3e9e 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -428,6 +428,30 @@ def test_sdk_location_local_directory(self): with open(tarball_path) as f: self.assertEqual(f.read(), 'Package content.') + def test_files_to_stage(self): + staging_dir = self.make_temp_dir() + source_dir = self.make_temp_dir() + + foo_ca = os.path.join(source_dir, 'foo.ca') + self.create_temp_file(foo_ca, 'ca content') + test_txt = os.path.join(source_dir, 'test.txt') + self.create_temp_file(test_txt, 'test content') + files_to_stage = ','.join([foo_ca, test_txt]) + options = PipelineOptions() + self.update_options(options) + options.view_as(SetupOptions).files_to_stage = files_to_stage + + self.assertEqual( + ['foo.ca', 'test.txt', stager.SUBMISSION_ENV_DEPENDENCIES_FILE], + self.stager.create_and_stage_job_resources( + options, staging_location=staging_dir)[1]) + foo = os.path.join(staging_dir, 'foo.ca') + with open(foo) as f: + self.assertEqual(f.read(), 'ca content') + txt = os.path.join(staging_dir, 'test.txt') + with open(txt) as f: + self.assertEqual(f.read(), 'test content') + def test_sdk_location_local_source_file(self): staging_dir = self.make_temp_dir() sdk_directory = self.make_temp_dir()