@@ -42,7 +42,8 @@ def parse_arguments():
42
42
Argument Parsing.
43
43
44
44
:return argparse.Namespace, list[str]: namespace parsed according to
45
- arguments defined here, them undefined arguments
45
+ arguments defined here, and additional options arguments undefined
46
+ here and to be handled downstream
46
47
"""
47
48
48
49
# Main looper program help text messages
@@ -390,26 +391,37 @@ def run(prj, args, remaining_args):
390
391
_LOGGER .warn ("Submission settings "
391
392
"lack memory specification" )
392
393
393
- # Add the command string and job name to the submit_settings object
394
+ # Add command string and job name to the submit_settings object.
394
395
submit_settings ["JOBNAME" ] = \
395
396
sample .sample_name + "_" + pipeline_key
396
397
submit_settings ["CODE" ] = cmd
397
398
398
- # Submit job!
399
- _LOGGER .debug ("Attempting job submission: '%s' ('%s')" ,
400
- sample .sample_name , pl_name )
401
- submitted = cluster_submit (
402
- sample , prj .compute .submission_template ,
403
- prj .compute .submission_command , submit_settings ,
404
- prj .metadata .submission_subdir , sample_output_folder ,
405
- pl_name , args .time_delay , submit = True ,
406
- dry_run = args .dry_run , ignore_flags = args .ignore_flags ,
407
- remaining_args = remaining_args )
408
- if submitted :
399
+ # Create submission script (write script to disk)!
400
+ _LOGGER .debug ("Creating submission script for pipeline %s: '%s'" ,
401
+ pl_name , sample .sample_name )
402
+ submit_script = create_submission_script (
403
+ sample , prj .compute .submission_template , submit_settings ,
404
+ submission_folder = prj .metadata .submission_subdir ,
405
+ pipeline_name = pl_name , remaining_args = remaining_args )
406
+
407
+ # Determine how to update submission counts and (perhaps) submit.
408
+ flag_files = glob .glob (os .path .join (
409
+ sample_output_folder , pl_name + "*.flag" ))
410
+ if not args .ignore_flags and len (flag_files ) > 0 :
411
+ _LOGGER .info ("> Not submitting, flag(s) found: {}" .
412
+ format (flag_files ))
413
+ _LOGGER .debug ("NOT SUBMITTED" )
414
+ else :
415
+ if args .dry_run :
416
+ _LOGGER .info ("> DRY RUN: I would have submitted this: '%s'" ,
417
+ submit_script )
418
+ else :
419
+ submission_command = "{} {}" .format (
420
+ prj .compute .submission_command , submit_script )
421
+ subprocess .call (submission_command , shell = True )
422
+ time .sleep (args .time_delay ) # Delay next job's submission.
409
423
_LOGGER .debug ("SUBMITTED" )
410
424
submit_count += 1
411
- else :
412
- _LOGGER .debug ("NOT SUBMITTED" )
413
425
414
426
# Report what went down.
415
427
_LOGGER .info ("Looper finished" )
@@ -630,38 +642,22 @@ def _submission_status_text(curr, total, sample_name, sample_library):
630
642
631
643
632
644
633
- def cluster_submit (
634
- sample , submit_template , submission_command , variables_dict ,
635
- submission_folder , sample_output_folder , pipeline_name , time_delay ,
636
- submit = False , dry_run = False , ignore_flags = False , remaining_args = None ):
645
+ def create_submission_script (
646
+ sample , submit_template , variables_dict ,
647
+ submission_folder , pipeline_name , remaining_args = None ):
637
648
"""
638
649
Write cluster submission script to disk and submit job for given Sample.
639
650
640
651
:param models.Sample sample: the Sample object for submission
641
652
:param str submit_template: path to submission script template
642
- :param str submission_command: actual command with which to execute the
643
- submission of the cluster job for the given sample
644
653
:param variables_dict: key-value pairs to use to populate fields in
645
654
the submission template
646
655
:param str submission_folder: path to the folder in which to place
647
656
submission files
648
- :param str sample_output_folder: path to folder into which the pipeline
649
- will write file(s), and where to search for flag file to check
650
- if a sample's already been submitted
651
657
:param str pipeline_name: name of the pipeline that the job will run
652
- :param int time_delay: number of seconds by which to delay submission
653
- of next job
654
- :param bool submit: whether to even attempt to actually submit the job;
655
- this is useful for skipping certain samples within a project
656
- :param bool dry_run: whether the call is a test and thus the cluster job
657
- created should not actually be submitted; in this case, the return
658
- is a true proxy for whether the job would've been submitted
659
- :param bool ignore_flags: whether to ignore the presence of flag file(s)
660
- in making the determination of whether to submit the job
661
658
:param Iterable[str] remaining_args: arguments for this submission,
662
659
unconsumed by previous option/argument parsing
663
- :return bool: whether the submission was done,
664
- or would've been if not a dry run
660
+ :return str: filepath to submission script
665
661
"""
666
662
667
663
# Create the script and logfile paths.
@@ -706,26 +702,7 @@ def cluster_submit(
706
702
name_sample_subtype , sample .name )
707
703
sample .to_yaml (subs_folder_path = submission_folder )
708
704
709
- # Check if job is already submitted (unless ignore_flags is set to True)
710
- if not ignore_flags :
711
- flag_files = glob .glob (os .path .join (
712
- sample_output_folder , pipeline_name + "*.flag" ))
713
- if len (flag_files ) > 0 :
714
- _LOGGER .info ("> Not submitting, flag(s) found: {}" .
715
- format (flag_files ))
716
- submit = False
717
- else :
718
- pass
719
-
720
- if not submit :
721
- return False
722
- if dry_run :
723
- _LOGGER .info ("> DRY RUN: I would have submitted this: '%s'" ,
724
- submit_script )
725
- else :
726
- subprocess .call (submission_command + " " + submit_script , shell = True )
727
- time .sleep (time_delay ) # Delay next job's submission.
728
- return True
705
+ return submit_script
729
706
730
707
731
708
0 commit comments