Skip to content

Commit

Permalink
update: api & benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
hyponet committed Apr 12, 2022
1 parent d04e6d5 commit 128b42e
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 52 deletions.
56 changes: 36 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# eventbus

Lightweight eventbus for golang
EventBus is a very useful tool for event-driven style code. You can use events to chain different logic together without coupling components

## Usage

Expand All @@ -19,7 +19,7 @@ func aAndBComputer(a, b int) {
}

func main() {
_, _ = bus.Register("op.int.and", aAndBComputer)
_, _ = bus.Subscribe("op.int.and", aAndBComputer)
bus.Publish("op.int.and", 1, 2)
}
```
Expand All @@ -43,8 +43,8 @@ func aliceDoSomething() {
}

func main() {
_, _ = bus.Register("partner.bob.do", bobDoSomething)
_, _ = bus.Register("partner.alice.do", aliceDoSomething)
_, _ = bus.Subscribe("partner.bob.do", bobDoSomething)
_, _ = bus.Subscribe("partner.alice.do", aliceDoSomething)
bus.Publish("partner.*.do")
}
```
Expand All @@ -60,34 +60,50 @@ pkg: github.com/hyponet/eventbus
cpu: Intel(R) Core(TM) i7-8700B CPU @ 3.20GHz

BenchmarkBusPublish
BenchmarkBusPublish-12 10000 2058 ns/op 518 B/op 15 allocs/op
BenchmarkBusPublish-2 10000 2954 ns/op 746 B/op 15 allocs/op
BenchmarkBusPublish-4 10000 1624 ns/op 497 B/op 15 allocs/op
BenchmarkBusPublish-8 10000 1333 ns/op 497 B/op 15 allocs/op

BenchmarkBusPublish
BenchmarkBusPublish-12 100000 1819 ns/op 500 B/op 15 allocs/op
BenchmarkBusPublish-2 100000 2369 ns/op 543 B/op 15 allocs/op
BenchmarkBusPublish-4 100000 1358 ns/op 497 B/op 15 allocs/op
BenchmarkBusPublish-8 100000 1380 ns/op 497 B/op 15 allocs/op

BenchmarkBusPublish
BenchmarkBusPublish-12 1000000 1918 ns/op 498 B/op 15 allocs/op
BenchmarkBusPublish-2 1000000 2202 ns/op 504 B/op 15 allocs/op
BenchmarkBusPublish-4 1000000 1419 ns/op 497 B/op 15 allocs/op
BenchmarkBusPublish-8 1000000 1466 ns/op 497 B/op 15 allocs/op

BenchmarkBusPublish
BenchmarkBusPublish-12 10000000 1695 ns/op 497 B/op 15 allocs/op
BenchmarkBusPublish-2 10000000 1992 ns/op 498 B/op 15 allocs/op
BenchmarkBusPublish-4 10000000 1408 ns/op 497 B/op 15 allocs/op
BenchmarkBusPublish-8 10000000 1507 ns/op 497 B/op 15 allocs/op
```

### Register
### Subscribe
```bash
goos: darwin
goarch: amd64
pkg: github.com/hyponet/eventbus
cpu: Intel(R) Core(TM) i7-8700B CPU @ 3.20GHz

BenchmarkBusRegister
BenchmarkBusRegister-12 10000 2604 ns/op 962 B/op 16 allocs/op

BenchmarkBusRegister
BenchmarkBusRegister-12 100000 3451 ns/op 951 B/op 16 allocs/op

BenchmarkBusRegister
BenchmarkBusRegister-12 1000000 3212 ns/op 1074 B/op 16 allocs/op

BenchmarkBusRegister
BenchmarkBusRegister-12 10000000 3303 ns/op 1016 B/op 16 allocs/op
BenchmarkBusSubscribe
BenchmarkBusSubscribe-2 10000 2682 ns/op 962 B/op 16 allocs/op
BenchmarkBusSubscribe-4 10000 2521 ns/op 983 B/op 16 allocs/op
BenchmarkBusSubscribe-8 10000 2762 ns/op 1196 B/op 16 allocs/op

BenchmarkBusSubscribe
BenchmarkBusSubscribe-2 100000 2831 ns/op 951 B/op 16 allocs/op
BenchmarkBusSubscribe-4 100000 3036 ns/op 975 B/op 16 allocs/op
BenchmarkBusSubscribe-8 100000 3066 ns/op 1087 B/op 16 allocs/op

BenchmarkBusSubscribe
BenchmarkBusSubscribe-2 1000000 3315 ns/op 1074 B/op 16 allocs/op
BenchmarkBusSubscribe-4 1000000 3180 ns/op 1076 B/op 16 allocs/op
BenchmarkBusSubscribe-8 1000000 2518 ns/op 770 B/op 16 allocs/op

BenchmarkBusSubscribe
BenchmarkBusSubscribe-2 10000000 3757 ns/op 1016 B/op 16 allocs/op
BenchmarkBusSubscribe-4 10000000 3489 ns/op 1013 B/op 16 allocs/op
BenchmarkBusSubscribe-8 10000000 4136 ns/op 1256 B/op 16 allocs/op
```
6 changes: 3 additions & 3 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func doNothing() {}

func init() {
for _, topic := range topics {
_, _ = bus.Register(topic, doNothing)
_, _ = bus.Subscribe(topic, doNothing)
}
}

Expand All @@ -41,9 +41,9 @@ func BenchmarkBusPublish(b *testing.B) {
}
}

func BenchmarkBusRegister(b *testing.B) {
func BenchmarkBusSubscribe(b *testing.B) {
target := append(topics, wildcards...)
for n := 0; n < b.N; n++ {
_, _ = bus.Register(target[n%len(target)], doNothing)
_, _ = bus.Subscribe(target[n%len(target)], doNothing)
}
}
26 changes: 13 additions & 13 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ type eventbus struct {
mux sync.RWMutex
}

func (b *eventbus) register(l *listener) {
func (b *eventbus) subscribe(l *listener) {
b.mux.Lock()
b.listeners[l.id] = l
b.exchange.add(l.topic, l.id)
b.mux.Unlock()
}

func (b *eventbus) unregister(lID string) {
func (b *eventbus) unsubscribe(lID string) {
b.mux.Lock()
b.unregisterWithLock(lID)
b.unsubscribeWithLock(lID)
b.mux.Unlock()
}

func (b *eventbus) unregisterWithLock(lID string) {
func (b *eventbus) unsubscribeWithLock(lID string) {
delete(b.listeners, lID)
b.exchange.remove(lID)
}
Expand All @@ -44,7 +44,7 @@ func (b *eventbus) publish(topic string, args ...interface{}) {
for i, lID := range lIDs {
needDo = append(needDo, b.listeners[lID])
if needDo[i].once {
b.unregisterWithLock(lID)
b.unsubscribeWithLock(lID)
}
}
b.mux.Unlock()
Expand All @@ -57,38 +57,38 @@ func (b *eventbus) publish(topic string, args ...interface{}) {
}
}

func Register(topic string, fn interface{}) (string, error) {
func Subscribe(topic string, fn interface{}) (string, error) {
l, err := buildNewListener(topic, fn, false, false)
if err != nil {
return "", err
}

evb.register(l)
evb.subscribe(l)
return l.id, nil
}

func RegisterOnce(topic string, fn interface{}) (string, error) {
func SubscribeOnce(topic string, fn interface{}) (string, error) {
l, err := buildNewListener(topic, fn, false, true)
if err != nil {
return "", err
}

evb.register(l)
evb.subscribe(l)
return l.id, nil
}

func RegisterWithBlock(topic string, fn interface{}) (string, error) {
func SubscribeWithBlock(topic string, fn interface{}) (string, error) {
l, err := buildNewListener(topic, fn, true, false)
if err != nil {
return "", err
}

evb.register(l)
evb.subscribe(l)
return l.id, nil
}

func Unregister(lID string) {
evb.unregister(lID)
func Unsubscribe(lID string) {
evb.unsubscribe(lID)
}

func Publish(topic string, args ...interface{}) {
Expand Down
26 changes: 13 additions & 13 deletions bus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ var _ = Describe("TestEventBusApi", func() {
}
})
Describe("", func() {
It("test register handler", func() {
It("test subscribe handler", func() {
Context("succeed", func() {
_, err = Register("test.topic.a", func() {})
_, err = Subscribe("test.topic.a", func() {})
Expect(err).Should(BeNil())

_, err = Register("test.topic.a", func(astr, bstr string) error { return nil })
_, err = Subscribe("test.topic.a", func(astr, bstr string) error { return nil })
Expect(err).Should(BeNil())
})
Context("not func", func() {
_, err = Register("test.topic.c", "wrong val")
_, err = Subscribe("test.topic.c", "wrong val")
Expect(err).ShouldNot(BeNil())

_, err = Register("test.topic.c", nil)
_, err = Subscribe("test.topic.c", nil)
Expect(err).ShouldNot(BeNil())

_, err = Register("test.topic.c", 0)
_, err = Subscribe("test.topic.c", 0)
Expect(err).ShouldNot(BeNil())
})
})
Expand All @@ -40,7 +40,7 @@ var _ = Describe("TestEventBusApi", func() {
isExec = false
topic = "test.topic.a"
)
_, err = Register("test.topic.a", func() {
_, err = Subscribe("test.topic.a", func() {
isExec = true
})
Expect(err).Should(BeNil())
Expand All @@ -51,17 +51,17 @@ var _ = Describe("TestEventBusApi", func() {
return isExec == true
}, time.Minute, time.Second).Should(BeTrue())
})
It("test unregister handler", func() {
It("test unsubscribe handler", func() {
var (
lID string
isExec = false
topic = "test.topic.a"
)
lID, err = Register("test.topic.a", func() {
lID, err = Subscribe("test.topic.a", func() {
isExec = true
})
Expect(err).Should(BeNil())
Unregister(lID)
Unsubscribe(lID)
Publish(topic)
time.Sleep(time.Second * 5)
Expect(isExec).Should(BeFalse())
Expand Down Expand Up @@ -101,7 +101,7 @@ var _ = Describe("TestEventBus", func() {
var err error
l, err = buildNewListener(topic, runFn, false, false)
Expect(err).Should(BeNil())
testBus.register(l)
testBus.subscribe(l)

needRun := 1000
for i := 0; i < needRun; i++ {
Expand All @@ -117,7 +117,7 @@ var _ = Describe("TestEventBus", func() {
var err error
l, err = buildNewListener(topic, runFn, true, false)
Expect(err).Should(BeNil())
testBus.register(l)
testBus.subscribe(l)

needRun := 1000
for i := 0; i < needRun; i++ {
Expand All @@ -134,7 +134,7 @@ var _ = Describe("TestEventBus", func() {
var err error
l, err = buildNewListener(topic, runFn, false, true)
Expect(err).Should(BeNil())
testBus.register(l)
testBus.subscribe(l)

needRun := 10
for i := 0; i < needRun; i++ {
Expand Down
2 changes: 1 addition & 1 deletion example/compute/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func aAndBComputer(a, b int) {
}

func main() {
_, _ = bus.Register("op.int.and", aAndBComputer)
_, _ = bus.Subscribe("op.int.and", aAndBComputer)
bus.Publish("op.int.and", 1, 2)
<-waiting
}
4 changes: 2 additions & 2 deletions example/wildcard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func aliceDoSomething() {
}

func main() {
_, _ = bus.Register("partner.bob.do", bobDoSomething)
_, _ = bus.Register("partner.alice.do", aliceDoSomething)
_, _ = bus.Subscribe("partner.bob.do", bobDoSomething)
_, _ = bus.Subscribe("partner.alice.do", aliceDoSomething)
bus.Publish("partner.*.do")
<-waitBob
<-waitAlice
Expand Down

0 comments on commit 128b42e

Please sign in to comment.