-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconsumer.go
221 lines (187 loc) · 6.15 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package alice
import (
"time"
"github.com/rs/zerolog/log"
"github.com/streadway/amqp"
)
// RabbitConsumer models a RabbitMQ consumer
type RabbitConsumer struct {
channel *amqp.Channel // Channel this consumer uses to communicate with broker
queue *Queue // The queue this consumer consumes from
conn *connection // Pointer to broker connection
autoAck bool // Whether this consumer want autoAck
tag string // Consumer tag
args amqp.Table // Additional arguments when consuming messages
messageHandler func(amqp.Delivery) // Message handler to call if this consumer receives a message
routingKey string // Routing key this consumer listens to
}
/*
ConsumeMessages starts the consumption of messages from the queue the consumer is bound to
args: amqp.Table, additional arguments for this consumer
autoAck: bool, whether to automatically acknowledge messages
messageHandler: func(amqp.Delivery), a handler for incoming messages. Every message the handler is called in a new goroutine
*/
func (c *RabbitConsumer) ConsumeMessages(args amqp.Table, autoAck bool, messageHandler func(amqp.Delivery)) {
messages, err := c.channel.Consume(
c.queue.name,
c.tag,
c.autoAck,
false,
false,
false,
args,
)
if err != nil {
log.Error().AnErr("err", err).Str("type", "consumer").Str("consumerTag", c.tag).Str("routingKey", c.routingKey).Msg("failed to consume messages")
}
// Set some more consumer attributes
c.autoAck = autoAck
c.args = args
c.messageHandler = messageHandler
// Listen for incoming messages and pass them to the message handler
log.Info().Str("type", "consumer").Str("consumerTag", c.tag).Str("routingKey", c.routingKey).Msg("starting message consumption")
for message := range messages {
log.Trace().Str("type", "consumer").Str("consumerTag", c.tag).Str("routingKey", c.routingKey).Str("exchange", message.Exchange).Int("msgSize", len(message.Body)).Msg("received message")
go func(message amqp.Delivery) {
// Intercept any errors propagating up the stack
defer func() {
if err := recover(); err != nil {
log.Error().Str("type", "consumer").AnErr("err", err.(error)).Msg("error occurred in message handler")
}
if autoAck {
message.Ack(true)
log.Trace().Str("type", "consumer").Str("consumerTag", c.tag).Str("routingKey", c.routingKey).Str("msgID", message.MessageId).Msg("automatically acked message")
}
}()
// Call the message handler
messageHandler(message)
}(message)
}
}
// createConsumer creates a new Consumer on this connection
func (c *connection) createConsumer(queue *Queue, routingKey string, consumerTag string) (Consumer, error) {
consumer := &RabbitConsumer{
channel: nil,
queue: queue,
conn: c,
tag: consumerTag,
routingKey: routingKey,
}
var err error
//Connects to the channel
consumer.channel, err = c.conn.Channel()
if err != nil {
return nil, err
}
//Connects to exchange
err = consumer.declareExchange(queue.exchange)
if err != nil {
return nil, err
}
//Creates the queue
_, err = consumer.declareQueue(queue)
if err != nil {
return nil, err
}
//Binds the queue to the exchange
err = consumer.bindQueue(queue, routingKey)
if err != nil {
return nil, err
}
consumer.listenForClose()
log.Info().Str("type", "consumer").Str("queue", queue.name).Str("routingKey", routingKey).Str("consumerTag", consumerTag).Msg("created consumer")
return consumer, nil
}
func (c *RabbitConsumer) declareExchange(exchange *Exchange) error {
e := c.channel.ExchangeDeclare(
exchange.name,
string(exchange.exchangeType),
exchange.durable,
exchange.autoDelete,
exchange.internal,
exchange.noWait,
exchange.args,
)
return e
}
func (c *RabbitConsumer) declareQueue(queue *Queue) (amqp.Queue, error) {
q, e := c.channel.QueueDeclare(
queue.name,
queue.durable,
queue.autoDelete,
queue.exclusive,
queue.noWait,
queue.args,
)
return q, e
}
func (c *RabbitConsumer) bindQueue(queue *Queue, bindingKey string) error {
e := c.channel.QueueBind(
queue.name,
bindingKey,
queue.exchange.name,
false,
nil,
)
return e
}
func (c *RabbitConsumer) listenForClose() {
closeChan := c.channel.NotifyClose(make(chan *amqp.Error))
go func() {
closeErr := <-closeChan
log.Error().Str("type", "consumer").AnErr("err", closeErr).Str("routingKey", c.routingKey).Str("consumerTag", c.tag).Msg("connection was closed")
c.reconnect()
}()
}
// ReconnectChannel tries to re-open this consumers channel
func (c *RabbitConsumer) ReconnectChannel() error {
log.Info().Str("type", "consumer").Str("routingKey", c.routingKey).Str("consumerTag", c.tag).Msg("attempting to re-open channel")
var err error
c.channel, err = c.conn.conn.Channel()
if err != nil {
log.Error().AnErr("err", err).Str("type", "consumer").Str("routingKey", c.routingKey).Str("consumerTag", c.tag).Msg("failed to re-open channel")
}
return err
}
// Shutdown shuts down the consumer
func (c *RabbitConsumer) Shutdown() error {
log.Info().Str("type", "consumer").Str("routingKey", c.routingKey).Str("consumerTag", c.tag).Msg("shutting down consumer")
return c.channel.Close()
}
func (c *RabbitConsumer) reconnect() error {
// Wait for the connection to be open again
// Create new ticker with the desired connection delay time
ticker := time.NewTicker(time.Second * 10)
for {
<-ticker.C // New tick
// Check if connection is open yet
if !c.conn.conn.IsClosed() {
// Attempt to re-connect
var err error
//Connects to the channel
c.channel, err = c.conn.conn.Channel()
if err != nil {
return err
}
//Connects to exchange
err = c.declareExchange(c.queue.exchange)
if err != nil {
return err
}
//Creates the queue
_, err = c.declareQueue(c.queue)
if err != nil {
return err
}
//Binds the queue to the exchange
err = c.bindQueue(c.queue, c.routingKey)
if err != nil {
return err
}
c.listenForClose()
log.Info().Str("type", "consumer").Str("routingKey", c.routingKey).Str("consumerTag", c.tag).Msg("reconnected")
go c.ConsumeMessages(c.args, c.autoAck, c.messageHandler)
return nil
}
}
}