From f519216ec640bbc3d257cb25a53cfe29fa8919c9 Mon Sep 17 00:00:00 2001 From: Jim Carrothers Date: Fri, 19 Jun 2015 16:31:24 -0700 Subject: [PATCH] [FIXED JENKINS-28994] implement optional synchronicity limit for parallel blocks The closure parameters to a parallel block call can now be pre-pended by a number which is interpreted as a limit on the number of closure blocks to be executed in parallel. Example: parallel(2, {build("job1")}, {build("job2")}, {build("job3")}) The above would execute up to two of the specified jobs simultaneously. --- .../com/cloudbees/plugins/flow/FlowDSL.groovy | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/main/groovy/com/cloudbees/plugins/flow/FlowDSL.groovy b/src/main/groovy/com/cloudbees/plugins/flow/FlowDSL.groovy index e175fd8..28783d2 100644 --- a/src/main/groovy/com/cloudbees/plugins/flow/FlowDSL.groovy +++ b/src/main/groovy/com/cloudbees/plugins/flow/FlowDSL.groovy @@ -387,12 +387,18 @@ public class FlowDelegate { // allows syntax like : parallel(["Kohsuke","Nicolas"].collect { name -> return { build("job1", param1:name) } }) def List parallel(Collection closures) { - parallel(closures as Closure[]) + parallel(0, closures as Closure[]) + } + def List parallel(int maxThreads, Collection closures) { + parallel(maxThreads, closures as Closure[]) } // allows collecting job status by name rather than by index // inspired by https://github.com/caolan/async#parallel def Map parallel(Map args) { + parallel(0, args) + } + def Map parallel(int maxThreads, Map args) { def keys = new ArrayList() def closures = new ArrayList() args.entrySet().each { e -> @@ -400,20 +406,26 @@ public class FlowDelegate { closures.add(e.value) } def results = new LinkedHashMap() - def flowStates = parallel(closures) // as List + def flowStates = parallel(maxThreads, closures) // as List flowStates.eachWithIndex { v, i -> results[keys[i]] = v } results } def List parallel(Closure ... closures) { + parallel(0, closures) + } + def List parallel(int maxThreads, Closure ... closures) { statusCheck() - ExecutorService pool = Executors.newCachedThreadPool() + ExecutorService pool = (maxThreads <= 0) ? + Executors.newCachedThreadPool() : Executors.newFixedThreadPool(maxThreads) Set upstream = flowRun.state.lastCompleted Set lastCompleted = Collections.synchronizedSet(new HashSet()) def results = new CopyOnWriteArrayList() def tasks = new ArrayList>() - println("parallel {") + def startMsg = "parallel" + if ( maxThreads > 0 ) startMsg += "( "+maxThreads+" )" + println(startMsg + " {") ++indent def current_state = flowRun.state