Skip to content

Commit

Permalink
[FIXED JENKINS-28994] implement optional synchronicity limit for para…
Browse files Browse the repository at this point in the history
…llel blocks

The closure parameters to a parallel block call can now be pre-pended by a
number which is interpreted as a limit on the number of closure blocks to be
executed in parallel.

Example: parallel(2, {build("job1")}, {build("job2")}, {build("job3")})
The above would execute up to two of the specified jobs simultaneously.
  • Loading branch information
jcarrothers-sap committed Jun 19, 2015
1 parent bd179ac commit f519216
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions src/main/groovy/com/cloudbees/plugins/flow/FlowDSL.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -387,33 +387,45 @@ public class FlowDelegate {

// allows syntax like : parallel(["Kohsuke","Nicolas"].collect { name -> return { build("job1", param1:name) } })
def List<FlowState> parallel(Collection<? extends Closure> closures) {
parallel(closures as Closure[])
parallel(0, closures as Closure[])
}
def List<FlowState> parallel(int maxThreads, Collection<? extends Closure> closures) {
parallel(maxThreads, closures as Closure[])
}

// allows collecting job status by name rather than by index
// inspired by https://github.com/caolan/async#parallel
def Map<?, FlowState> parallel(Map<?, ? extends Closure> args) {
parallel(0, args)
}
def Map<?, FlowState> parallel(int maxThreads, Map<?, ? extends Closure> args) {
def keys = new ArrayList<?>()
def closures = new ArrayList<? extends Closure>()
args.entrySet().each { e ->
keys.add(e.key)
closures.add(e.value)
}
def results = new LinkedHashMap<?, FlowState>()
def flowStates = parallel(closures) // as List<FlowState>
def flowStates = parallel(maxThreads, closures) // as List<FlowState>
flowStates.eachWithIndex { v, i -> results[keys[i]] = v }
results
}

def List<FlowState> parallel(Closure ... closures) {
parallel(0, closures)
}
def List<FlowState> parallel(int maxThreads, Closure ... closures) {
statusCheck()
ExecutorService pool = Executors.newCachedThreadPool()
ExecutorService pool = (maxThreads <= 0) ?
Executors.newCachedThreadPool() : Executors.newFixedThreadPool(maxThreads)
Set<Run> upstream = flowRun.state.lastCompleted
Set<Run> lastCompleted = Collections.synchronizedSet(new HashSet<Run>())
def results = new CopyOnWriteArrayList<FlowState>()
def tasks = new ArrayList<Future<FlowState>>()

println("parallel {")
def startMsg = "parallel"
if ( maxThreads > 0 ) startMsg += "( "+maxThreads+" )"
println(startMsg + " {")
++indent

def current_state = flowRun.state
Expand Down

0 comments on commit f519216

Please sign in to comment.