Skip to content

Commit

Permalink
develop: fixup scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
egafni committed Dec 29, 2016
1 parent 77880b5 commit 0c8545b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 22 deletions.
6 changes: 3 additions & 3 deletions bin/cosmos
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def shell(db_url_or_path_to_sqlite):
cosmos_app.shell()


def run(in_jobs, default_drm, queue, restart, max_cores, max_attempts):
def run(in_jobs, default_drm, default_queue, restart, max_cores, max_attempts):
"""
Create an embarassingly parallel workflow from all jobs in `in_jobs`. `in_jobs` should be a json of a dict
keyed by uid => command.
Expand All @@ -45,7 +45,7 @@ def run(in_jobs, default_drm, queue, restart, max_cores, max_attempts):

cosmos_app = Cosmos(database_url='sqlite:///cosmos.sqlite',
default_drm=default_drm,
default_queue=queue,
default_queue=default_queue,
get_submit_args=partial(default_get_submit_args, parallel_env='smp'))
cosmos_app.initdb()
wf = cosmos_app.start(name='workflow', restart=restart, skip_confirm=True)
Expand Down Expand Up @@ -78,7 +78,7 @@ if __name__ == '__main__':
sp.add_argument('--queue', '-q')

sp.add_argument('--max_cores', '--max-cores', '-c', type=int,
help="Maximum number (based on the sum of cpu_requirement) of cores to use at once. 0 means unlimited.", default=None)
help="Maximum number (based on the sum of cpu_requirement) of cores to use at once. 0 means unlimited.", default=None)
sp.add_argument('--max_attempts', '--max-attempts', '-a', type=int,
help="Maximum number of times to try running a Task that must succeed before the workflow fails.", default=1)
sp.add_argument('--restart', '-r', action='store_true',
Expand Down
41 changes: 22 additions & 19 deletions examples/ex2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,49 @@ def recipe(workflow):
word = echo_task.params['word']
for n in [1, 2]:
cat_task = workflow.add_task(
func=cat,
params=dict(in_txts=[echo_task.params['out_txt']],
out_txt='%s/%s/cat.txt' % (word, n)),
parents=[echo_task],
uid='%s_%s' % (word, n))
func=cat,
params=dict(in_txts=[echo_task.params['out_txt']],
out_txt='%s/%s/cat.txt' % (word, n)),
parents=[echo_task],
uid='%s_%s' % (word, n))

# Count the words in the previous stage. An example of a simple one2one relationship
# For each task in StageA, there is a single dependent task in StageB.
word_count_task = workflow.add_task(
func=word_count,
# Dependency instances allow you to specify an input and parent simultaneously
params=dict(in_txts=[Dependency(cat_task, 'out_txt')],
out_txt='%s/%s/wc.txt' % (word, n),
chars=True),
# parents=[cat_task], <-- not necessary!
uid='%s_%s' % (word, n), )
func=word_count,
# Dependency instances allow you to specify an input and parent simultaneously
params=dict(in_txts=[Dependency(cat_task, 'out_txt')],
out_txt='%s/%s/wc.txt' % (word, n),
chars=True),
# parents=[cat_task], <-- not necessary!
uid='%s_%s' % (word, n), )
word_count_tasks.append(word_count_task)

# Cat the contents of all word_counts into one file. Only one node is being created who's
# parents are all of the WordCounts (a many2one relationship, aka a reduce operation).
summarize_task = workflow.add_task(
func=cat,
params=dict(in_txts=[Dependency(t, 'out_txt') for t in word_count_tasks],
out_txt='summary.txt'),
parents=word_count_tasks,
stage_name='Summary_Analysis',
uid='') # It's the only Task in this Stage, so doesn't need a specific uid
func=cat,
params=dict(in_txts=[Dependency(t, 'out_txt') for t in word_count_tasks],
out_txt='summary.txt'),
parents=word_count_tasks,
stage_name='Summary_Analysis',
uid='') # It's the only Task in this Stage, so doesn't need a specific uid


if __name__ == '__main__':
import argparse

p = argparse.ArgumentParser()
p.add_argument('-drm', default='local', help='', choices=('local', 'drmaa:ge', 'ge'))
p.add_argument('-q', '--queue', help='Submit to this queue of the DRM supports it')

args = p.parse_args()

cosmos = Cosmos('sqlite:///%s/sqlite.db' % os.path.dirname(os.path.abspath(__file__)),
# example of how to change arguments if you're NOT using default_drm='local'
get_submit_args=partial(default_get_submit_args, parallel_env='smp'),
default_drm=args.drm)
default_drm=args.drm,
default_queue=args.queue)
cosmos.initdb()

sp.check_call('mkdir -p analysis_output/ex2', shell=True)
Expand Down

0 comments on commit 0c8545b

Please sign in to comment.