Skip to content

Commit d8c2929

Browse files
committed
Implemented eventbus and eventhandling for git, but GitWorker needs more work
1 parent 1e239f4 commit d8c2929

21 files changed

+300
-27
lines changed
Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,40 @@
11
package org.pipelinelabs.pipeline.listen
22

3+
import com.google.common.eventbus.AsyncEventBus
34
import com.yammer.dropwizard.Service
45
import com.yammer.dropwizard.config.Bootstrap
56
import com.yammer.dropwizard.config.Environment
7+
import org.pipelinelabs.pipeline.listen.core.DeadEventHandler
8+
import org.pipelinelabs.pipeline.listen.core.GitWorker
69
import org.pipelinelabs.pipeline.listen.resources.GitHubWebHookResource
710

11+
import static java.util.concurrent.TimeUnit.SECONDS
12+
813
class PipeListenService extends Service<PipeListenConfiguration> {
914

1015
@Override
1116
void initialize(Bootstrap<PipeListenConfiguration> bootstrap) {
1217
}
1318

1419
@Override
15-
void run(PipeListenConfiguration configuration, Environment environment) throws Exception {
16-
environment.addResource(new GitHubWebHookResource())
20+
void run(PipeListenConfiguration config, Environment env) throws Exception {
21+
def bus = createEventBus(env)
22+
env.manage(new DeadEventHandler(bus))
23+
env.manage(new GitWorker(bus));
24+
env.addResource(new GitHubWebHookResource(bus))
25+
}
26+
27+
private createEventBus(Environment env) {
28+
final corePoolSize = 2
29+
final maxPoolSize = 6
30+
final keepAliveTimeInSeconds = 30
31+
new AsyncEventBus(
32+
env.managedExecutorService(
33+
"eventbus-worker-%s",
34+
corePoolSize,
35+
maxPoolSize,
36+
keepAliveTimeInSeconds, SECONDS
37+
)
38+
)
1739
}
1840
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.pipelinelabs.pipeline.listen.core
2+
3+
import com.google.common.eventbus.DeadEvent
4+
import com.google.common.eventbus.EventBus
5+
import com.google.common.eventbus.Subscribe
6+
import com.yammer.dropwizard.lifecycle.Managed
7+
import groovy.util.logging.Slf4j
8+
9+
@Slf4j
10+
class DeadEventHandler implements Managed {
11+
12+
private final EventBus bus
13+
14+
DeadEventHandler(EventBus bus) {
15+
this.bus = bus
16+
}
17+
18+
@Override
19+
void start() throws Exception {
20+
bus.register(this)
21+
}
22+
23+
@Override
24+
void stop() throws Exception {
25+
bus.unregister(this)
26+
}
27+
28+
@Subscribe
29+
void work(DeadEvent event) {
30+
log.error("No event handler available for event [{}]", event.event)
31+
}
32+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.pipelinelabs.pipeline.listen.core
2+
3+
4+
class GitTriggerEvent {
5+
6+
final String url
7+
8+
GitTriggerEvent(String url) {
9+
this.url = url
10+
}
11+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.pipelinelabs.pipeline.listen.core
2+
3+
import com.google.common.eventbus.EventBus
4+
import com.google.common.eventbus.Subscribe
5+
import com.yammer.dropwizard.lifecycle.Managed
6+
7+
import java.nio.file.Files
8+
9+
class GitWorker implements Managed {
10+
11+
private final EventBus bus
12+
13+
GitWorker(EventBus bus) {
14+
this.bus = bus
15+
}
16+
17+
@Override
18+
void start() throws Exception {
19+
bus.register(this)
20+
}
21+
22+
@Override
23+
void stop() throws Exception {
24+
bus.unregister(this)
25+
}
26+
27+
@Subscribe
28+
void work(GitTriggerEvent event) {
29+
def dir = Files.createTempDirectory("pipeline/")
30+
List envProps = null
31+
"git clone ${event.url}".execute(envProps, dir.toFile())
32+
// TODO: Call pipe-runner
33+
}
34+
}
Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package org.pipelinelabs.pipeline.listen.resources
22

3+
import com.google.common.eventbus.EventBus
34
import groovy.json.JsonSlurper
45
import groovy.util.logging.Slf4j
6+
import org.pipelinelabs.pipeline.listen.core.GitTriggerEvent
57

68
import javax.ws.rs.*
79
import javax.ws.rs.core.Response
@@ -15,28 +17,44 @@ import static javax.ws.rs.core.Response.Status.BAD_REQUEST
1517
@Consumes(APPLICATION_JSON)
1618
class GitHubWebHookResource {
1719

20+
private final EventBus bus
21+
22+
GitHubWebHookResource(EventBus bus) {
23+
this.bus = bus
24+
}
25+
1826
@POST
1927
Response trigger(@FormParam('payload') String payload) {
2028
final slurper = new JsonSlurper()
2129
def info = slurper.parseText(payload)
2230
try {
23-
assert info.commits
24-
assert info.head_commit
25-
assert info.pusher
26-
assert info.pusher.email
27-
assert info.pusher.name
28-
assert info.ref
29-
assert info.repository
30-
assert info.repository.name
31-
assert info.repository.master_branch
32-
assert info.repository.language
33-
assert info.repository.private == true ||
34-
info.repository.private == false
35-
assert info.repository.url
31+
verifyRequest(info)
3632
} catch (AssertionError e) {
3733
log.warn("Received GitHub WebHook payload with unexpected format:\n{}", info)
3834
return Response.status(BAD_REQUEST.statusCode).build()
3935
}
40-
return Response.noContent().build()
36+
return handleRequest(info)
37+
}
38+
39+
private Response handleRequest(request) {
40+
def event = new GitTriggerEvent(request.repository.url)
41+
bus.post(event)
42+
Response.noContent().build()
43+
}
44+
45+
private verifyRequest(request) throws AssertionError {
46+
assert request.commits
47+
assert request.head_commit
48+
assert request.pusher
49+
assert request.pusher.email
50+
assert request.pusher.name
51+
assert request.ref
52+
assert request.repository
53+
assert request.repository.name
54+
assert request.repository.master_branch
55+
assert request.repository.language
56+
assert request.repository.private == true ||
57+
request.repository.private == false
58+
assert request.repository.url
4159
}
4260
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,57 @@
11
package org.pipelinelabs.pipeline.listen
22

33
import com.yammer.dropwizard.config.Environment
4+
import org.pipelinelabs.pipeline.listen.core.DeadEventHandler
5+
import org.pipelinelabs.pipeline.listen.core.GitWorker
46
import org.pipelinelabs.pipeline.listen.resources.GitHubWebHookResource
57
import spock.lang.Specification
68

9+
import java.util.concurrent.ExecutorService
10+
711
class PipeListenServiceSpec extends Specification {
812

913
def 'Service serves the GitHubWebHookResource'() {
1014
given:
1115
def env = Mock(Environment)
16+
def executor = Mock(ExecutorService)
1217
def service = new PipeListenService()
1318
def config = new PipeListenConfiguration()
1419

1520
when:
1621
service.run(config, env)
1722

1823
then:
24+
1 * env.managedExecutorService(_, _, _, _, _) >> executor
1925
1 * env.addResource(_ as GitHubWebHookResource)
2026
}
27+
28+
def 'Service manages the GitWorker'() {
29+
given:
30+
def env = Mock(Environment)
31+
def executor = Mock(ExecutorService)
32+
def service = new PipeListenService()
33+
def config = new PipeListenConfiguration()
34+
35+
when:
36+
service.run(config, env)
37+
38+
then:
39+
1 * env.managedExecutorService(_, _, _, _, _) >> executor
40+
1 * env.manage(_ as GitWorker)
41+
}
42+
43+
def 'Service manages the DeadEventHandler'() {
44+
given:
45+
def env = Mock(Environment)
46+
def executor = Mock(ExecutorService)
47+
def service = new PipeListenService()
48+
def config = new PipeListenConfiguration()
49+
50+
when:
51+
service.run(config, env)
52+
53+
then:
54+
1 * env.managedExecutorService(_, _, _, _, _) >> executor
55+
1 * env.manage(_ as DeadEventHandler)
56+
}
2157
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.pipelinelabs.pipeline.listen.core
2+
3+
import com.google.common.eventbus.DeadEvent
4+
import com.google.common.eventbus.EventBus
5+
import com.google.common.eventbus.Subscribe
6+
import spock.lang.Ignore
7+
import spock.lang.Specification
8+
9+
10+
class DeadEventHandlerSpec extends Specification {
11+
private EventBus bus = Mock(EventBus)
12+
13+
def 'Start registers itself to the event bus'() {
14+
given:
15+
def worker = new DeadEventHandler(bus)
16+
17+
when:
18+
worker.start()
19+
20+
then:
21+
1 * bus.register(worker)
22+
}
23+
24+
def 'Stop unregisters itself from the event bus'() {
25+
given:
26+
def worker = new DeadEventHandler(bus)
27+
28+
when:
29+
worker.stop()
30+
31+
then:
32+
1 * bus.unregister(worker)
33+
}
34+
35+
def 'Work handles DeadEvent and is annotated with @Subscribe'() {
36+
def method = DeadEventHandler.getMethod("work", DeadEvent)
37+
38+
expect:
39+
method.isAnnotationPresent(Subscribe)
40+
}
41+
42+
@Ignore('How to verify if injected logger is used?')
43+
def 'Work logs an error'() {
44+
def worker = new DeadEventHandler(bus)
45+
final event = Mock(DeadEvent)
46+
47+
expect:
48+
worker.work(event)
49+
}
50+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.pipelinelabs.pipeline.listen.core
2+
3+
import spock.lang.Specification
4+
5+
class GitTriggerEventSpec extends Specification {
6+
7+
def 'Event has an url'() {
8+
def url = "url"
9+
10+
expect:
11+
new GitTriggerEvent(url).url == url
12+
}
13+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.pipelinelabs.pipeline.listen.core
2+
3+
import com.google.common.eventbus.EventBus
4+
import com.google.common.eventbus.Subscribe
5+
import spock.lang.Ignore
6+
import spock.lang.Specification
7+
8+
class GitWorkerSpec extends Specification {
9+
private EventBus bus = Mock(EventBus)
10+
11+
def 'Start registers itself to the event bus'() {
12+
given:
13+
def worker = new GitWorker(bus)
14+
15+
when:
16+
worker.start()
17+
18+
then:
19+
1 * bus.register(worker)
20+
}
21+
22+
def 'Stop unregisters itself from the event bus'() {
23+
given:
24+
def worker = new GitWorker(bus)
25+
26+
when:
27+
worker.stop()
28+
29+
then:
30+
1 * bus.unregister(worker)
31+
}
32+
33+
def 'Work handles GitTriggerEvent and is annotated with @Subscribe'() {
34+
def method = GitWorker.getMethod("work", GitTriggerEvent)
35+
36+
expect:
37+
method.isAnnotationPresent(Subscribe)
38+
}
39+
40+
@Ignore('Needs to be testable somehow')
41+
def 'Work clones the git repo and starts pipe-runner'() {
42+
def worker = new GitWorker(bus)
43+
final event = Mock(GitTriggerEvent)
44+
45+
expect:
46+
worker.work(event)
47+
}
48+
}

0 commit comments

Comments
 (0)