Skip to content

Commit

Permalink
feat(pubsub): Added pubsub (#109)
Browse files Browse the repository at this point in the history
* feat(pubsub): init

Signed-off-by: Flc゛ <[email protected]>

* feat(pubsub): Added Pubsub

Signed-off-by: Flc゛ <[email protected]>

---------

Signed-off-by: Flc゛ <[email protected]>
  • Loading branch information
flc1125 authored Feb 18, 2024
1 parent 1af20e4 commit 57256f2
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 0 deletions.
4 changes: 4 additions & 0 deletions errors/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ func (g *Group) First() error {

return g.errors[0]
}

func (g *Group) IsNil() bool {
return len(g.errors) == 0
}
8 changes: 8 additions & 0 deletions errors/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,11 @@ func TestGroup_First(t *testing.T) {
assert.Equal(t, g, g.Add(err2))
assert.Equal(t, err1, g.First())
}

func TestGroup_IsNil(t *testing.T) {
g := NewGroup()
assert.True(t, g.IsNil())

assert.Equal(t, g, g.Add(err1))
assert.False(t, g.IsNil())
}
15 changes: 15 additions & 0 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pubsub

type Subscription[T any] struct {
Topic *Topic[T]
handler func(msg T) error
}

func (s *Subscription[T]) Unsubscribe() {
for i, sub := range s.Topic.subscriptions {
if sub == s {
s.Topic.subscriptions = append(s.Topic.subscriptions[:i], s.Topic.subscriptions[i+1:]...)
return
}
}
}
65 changes: 65 additions & 0 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package pubsub

type Topic[T any] struct {
subscriptions []*Subscription[T]
}

func NewTopic[T any]() *Topic[T] {
return &Topic[T]{
subscriptions: make([]*Subscription[T], 0),
}
}

func (t *Topic[T]) Subscribe(handler func(msg T) error) *Subscription[T] {
sub := &Subscription[T]{Topic: t, handler: handler}
t.subscriptions = append(t.subscriptions, sub)
return sub
}

type publishOptions struct {
skipErrors bool
async bool
}

type PublishOption func(*publishOptions)

func PublishSkipErrors() PublishOption {
return func(o *publishOptions) {
o.skipErrors = true
}
}

func PublishAsync() PublishOption {
return func(o *publishOptions) {
o.async = true
}
}

func (t *Topic[T]) Publish(msg T, opts ...PublishOption) error {
o := &publishOptions{}
for _, opt := range opts {
opt(o)
}

for _, sub := range t.subscriptions {
if o.async {
go func() {
_ = sub.handler(msg) //nolint:govet
}()
} else {
if err := sub.handler(msg); err != nil {
if o.skipErrors {
continue
}

return err
}
}
}

return nil
}

func (t *Topic[T]) PublishAsync(msg T, opts ...PublishOption) error {
return t.Publish(msg, append(opts, PublishAsync())...)
}
44 changes: 44 additions & 0 deletions pubsub/topic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package pubsub

import (
"fmt"
"testing"
)

type loginEvent struct {
UserID string
}

func TestTopic(*testing.T) {
topic := NewTopic[*loginEvent]()

topic.Subscribe(func(msg *loginEvent) error {
fmt.Println(msg.UserID)
return nil
})

sub2 := topic.Subscribe(func(_ *loginEvent) error {
return fmt.Errorf("error")
})

err := topic.Publish(&loginEvent{UserID: "123"})
if err != nil {
return
}

sub2.Unsubscribe()

if err = topic.Publish(&loginEvent{UserID: "456"}, PublishAsync(), PublishSkipErrors()); err != nil {
return
}

topic2 := NewTopic[*loginEvent]()
topic2.Subscribe(func(msg *loginEvent) error {
fmt.Println("Topic 2", msg.UserID)
return nil
})

if err = topic2.Publish(&loginEvent{UserID: "789"}); err != nil {
return
}
}

0 comments on commit 57256f2

Please sign in to comment.