Skip to content

Commit 8c33923

Browse files
committed
chore: refactor message handling and update dependencies in Go project
- Change `QueuedMessage` to `TaskMessage` in various function signatures and implementations - Replace `m.Bytes()` with `m.Payload()` for message handling - Update Go version in `go.mod` from `1.18` to `1.22` - Update dependencies in `go.mod` to newer versions - Replace `github.com/goccy/go-json` with `github.com/jpillora/backoff` in `go.mod` Signed-off-by: appleboy <[email protected]>
1 parent 5a0ca1c commit 8c33923

File tree

8 files changed

+50
-84
lines changed

8 files changed

+50
-84
lines changed

README.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
See the [Exchanges and Exchange Types][11] section of [AMQP 0-9-1 Model Explained][12].
88

9-
[11]:https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges
10-
[12]:https://www.rabbitmq.com/tutorials/amqp-concepts.html
9+
[11]: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges
10+
[12]: https://www.rabbitmq.com/tutorials/amqp-concepts.html
1111

1212
### Direct Exchange
1313

@@ -68,9 +68,9 @@ func main() {
6868
rabbitmq.WithExchangeName(*exchange),
6969
rabbitmq.WithExchangeType(*exchangeType),
7070
rabbitmq.WithRoutingKey(*bindingKey),
71-
rabbitmq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
71+
rabbitmq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
7272
var v *job
73-
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
73+
if err := json.Unmarshal(m.Payload(), &v); err != nil {
7474
return err
7575
}
7676
rets <- v.Message

_example/producer-consumer/consumer/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ func main() {
5050
rabbitmq.WithExchangeName(*exchange),
5151
rabbitmq.WithExchangeType(*exchangeType),
5252
rabbitmq.WithRoutingKey(*bindingKey),
53-
rabbitmq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
53+
rabbitmq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
5454
var v *job
55-
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
55+
if err := json.Unmarshal(m.Payload(), &v); err != nil {
5656
return err
5757
}
5858
rets <- v.Message

_example/worker/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ func main() {
4747
rabbitmq.WithQueue(*q),
4848
rabbitmq.WithExchangeName(*exchange),
4949
rabbitmq.WithRoutingKey(*bindingKey),
50-
rabbitmq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
50+
rabbitmq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
5151
var v *job
52-
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
52+
if err := json.Unmarshal(m.Payload(), &v); err != nil {
5353
return err
5454
}
5555
rets <- v.Message

example_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ func Example_direct_exchange() {
1919
WithExchangeName("direct_exchange"),
2020
WithExchangeType("direct"),
2121
WithRoutingKey("direct_exchange"),
22-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
23-
fmt.Println("worker01 get data:", string(m.Bytes()))
22+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
23+
fmt.Println("worker01 get data:", string(m.Payload()))
2424
time.Sleep(100 * time.Millisecond)
2525
return nil
2626
}),
@@ -39,8 +39,8 @@ func Example_direct_exchange() {
3939
WithExchangeName("direct_exchange"),
4040
WithExchangeType("direct"),
4141
WithRoutingKey("direct_exchange"),
42-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
43-
fmt.Println("worker02 get data:", string(m.Bytes()))
42+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
43+
fmt.Println("worker02 get data:", string(m.Payload()))
4444
time.Sleep(100 * time.Millisecond)
4545
return nil
4646
}),
@@ -101,8 +101,8 @@ func Example_fanout_exchange() {
101101
WithQueue("fanout_queue_1"),
102102
WithExchangeName("fanout_exchange"),
103103
WithExchangeType("fanout"),
104-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
105-
fmt.Println("worker01 get data:", string(m.Bytes()))
104+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
105+
fmt.Println("worker01 get data:", string(m.Payload()))
106106
return nil
107107
}),
108108
)
@@ -119,8 +119,8 @@ func Example_fanout_exchange() {
119119
WithQueue("fanout_queue_2"),
120120
WithExchangeName("fanout_exchange"),
121121
WithExchangeType("fanout"),
122-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
123-
fmt.Println("worker02 get data:", string(m.Bytes()))
122+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
123+
fmt.Println("worker02 get data:", string(m.Payload()))
124124
return nil
125125
}),
126126
)

go.mod

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
module github.com/golang-queue/rabbitmq
22

3-
go 1.18
3+
go 1.22
44

55
require (
6-
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98
7-
github.com/rabbitmq/amqp091-go v1.7.0
8-
github.com/stretchr/testify v1.8.1
9-
go.uber.org/goleak v1.2.0
6+
github.com/golang-queue/queue v0.3.0
7+
github.com/rabbitmq/amqp091-go v1.10.0
8+
github.com/stretchr/testify v1.10.0
9+
go.uber.org/goleak v1.3.0
1010
)
1111

1212
require (
1313
github.com/davecgh/go-spew v1.1.1 // indirect
14-
github.com/goccy/go-json v0.10.0 // indirect
14+
github.com/jpillora/backoff v1.0.0 // indirect
1515
github.com/pmezard/go-difflib v1.0.0 // indirect
1616
gopkg.in/yaml.v3 v3.0.1 // indirect
1717
)

go.sum

+15-48
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,27 @@
1-
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1+
github.com/appleboy/com v0.2.1 h1:dHAHauX3eYDuheAahI83HIGFxpi0SEb2ZAu9EZ9hbUM=
2+
github.com/appleboy/com v0.2.1/go.mod h1:kByEI3/vzI5GM1+O5QdBHLsXaOsmFsJcOpCSgASi4sg=
23
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
34
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4-
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
5-
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
6-
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 h1:T2DoUcMWZr6uSUQAr5wCEzOiwHB1zJOiATAZ4BUAefg=
7-
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98/go.mod h1:8P7IgwdxwKh0/W1I9yCuQQGI8OHIuc7fIHi4OYr1COU=
8-
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
5+
github.com/golang-queue/queue v0.3.0 h1:gyBLNT9EDOsChazYScp8iLiwLfG0SdnCDmNUybcHig4=
6+
github.com/golang-queue/queue v0.3.0/go.mod h1:SkjMwz1TjxZOrF7kABvbar1CagcMxwRtXt5Tx00wb4g=
7+
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
8+
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
99
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
1010
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
11-
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
12-
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
1311
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
12+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
1413
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1514
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
16-
github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo=
17-
github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
18-
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
19-
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
20-
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
21-
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
22-
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
23-
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
24-
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
25-
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
26-
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
27-
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
28-
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
29-
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
30-
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
31-
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
32-
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
33-
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
34-
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
35-
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
36-
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
37-
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
38-
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
39-
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
40-
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
41-
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
42-
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
43-
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
44-
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
45-
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
46-
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
47-
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
48-
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
49-
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
50-
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
51-
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
52-
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
53-
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
54-
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
15+
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
16+
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
17+
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
18+
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
19+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
20+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
21+
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
22+
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
5523
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
5624
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
5725
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
58-
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
5926
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
6027
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

rabbitmq.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ func (w *Worker) startConsumer() (err error) {
9191
false, // no-wait
9292
nil, // args
9393
)
94-
9594
if err != nil {
9695
w.opts.logger.Error("cannot consume from: ", q.Name, err)
9796
}
@@ -101,7 +100,7 @@ func (w *Worker) startConsumer() (err error) {
101100
}
102101

103102
// Run start the worker
104-
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
103+
func (w *Worker) Run(ctx context.Context, task core.TaskMessage) error {
105104
return w.opts.runFunc(ctx, task)
106105
}
107106

@@ -125,7 +124,7 @@ func (w *Worker) Shutdown() (err error) {
125124
}
126125

127126
// Queue send notification to queue
128-
func (w *Worker) Queue(job core.QueuedMessage) error {
127+
func (w *Worker) Queue(job core.TaskMessage) error {
129128
if atomic.LoadInt32(&w.stopFlag) == 1 {
130129
return queue.ErrQueueShutdown
131130
}
@@ -149,7 +148,7 @@ func (w *Worker) Queue(job core.QueuedMessage) error {
149148
}
150149

151150
// Request a new task
152-
func (w *Worker) Request() (core.QueuedMessage, error) {
151+
func (w *Worker) Request() (core.TaskMessage, error) {
153152
_ = w.startConsumer()
154153
clock := 0
155154
loop:

rabbitmq_test.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func TestCustomFuncAndWait(t *testing.T) {
5252
}
5353
w := NewWorker(
5454
WithQueue("test"),
55-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
56-
log.Println("show message: " + string(m.Bytes()))
55+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
56+
log.Println("show message: " + string(m.Payload()))
5757
time.Sleep(500 * time.Millisecond)
5858
return nil
5959
}),
@@ -101,11 +101,11 @@ func TestJobReachTimeout(t *testing.T) {
101101
}
102102
w := NewWorker(
103103
WithQueue("JobReachTimeout"),
104-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
104+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
105105
for {
106106
select {
107107
case <-ctx.Done():
108-
log.Println("get data:", string(m.Bytes()))
108+
log.Println("get data:", string(m.Payload()))
109109
if errors.Is(ctx.Err(), context.Canceled) {
110110
log.Println("queue has been shutdown and cancel the job")
111111
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
@@ -138,11 +138,11 @@ func TestCancelJobAfterShutdown(t *testing.T) {
138138
w := NewWorker(
139139
WithQueue("CancelJob"),
140140
WithLogger(queue.NewLogger()),
141-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
141+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
142142
for {
143143
select {
144144
case <-ctx.Done():
145-
log.Println("get data:", string(m.Bytes()))
145+
log.Println("get data:", string(m.Payload()))
146146
if errors.Is(ctx.Err(), context.Canceled) {
147147
log.Println("queue has been shutdown and cancel the job")
148148
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
@@ -175,19 +175,19 @@ func TestGoroutineLeak(t *testing.T) {
175175
w := NewWorker(
176176
WithQueue("GoroutineLeak"),
177177
WithLogger(queue.NewEmptyLogger()),
178-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
178+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
179179
for {
180180
select {
181181
case <-ctx.Done():
182-
log.Println("get data:", string(m.Bytes()))
182+
log.Println("get data:", string(m.Payload()))
183183
if errors.Is(ctx.Err(), context.Canceled) {
184184
log.Println("queue has been shutdown and cancel the job")
185185
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
186186
log.Println("job deadline exceeded")
187187
}
188188
return nil
189189
default:
190-
log.Println("get data:", string(m.Bytes()))
190+
log.Println("get data:", string(m.Payload()))
191191
time.Sleep(50 * time.Millisecond)
192192
return nil
193193
}
@@ -220,7 +220,7 @@ func TestGoroutinePanic(t *testing.T) {
220220
WithQueue("GoroutinePanic"),
221221
WithRoutingKey("GoroutinePanic"),
222222
WithExchangeName("GoroutinePanic"),
223-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
223+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
224224
panic("missing something")
225225
}),
226226
)

0 commit comments

Comments
 (0)