-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathconsumer.go
173 lines (148 loc) · 3.72 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package rabbitmq
import (
"fmt"
"github.com/better-go/pkg/log"
"github.com/streadway/amqp"
)
// task handler:
type TaskFunc func(message string) error
//
//
//
type Consumer struct {
conn *amqp.Connection
//
done chan error
}
//
//
//
func NewConsumer(opt *ConnOption) (*Consumer, error) {
p := new(Consumer)
connection, err := amqp.Dial(opt.ConnUri())
if err != nil {
return p, fmt.Errorf("rabbitmq dial: %s", err)
}
p.conn = connection
p.done = make(chan error)
return p, nil
}
// 批量处理:
func (m *Consumer) Consume(exchange *Exchange, queue *Queue, routingKey string, consumerTag string, taskFn TaskFunc) error {
channel, err := m.conn.Channel()
if err != nil {
return fmt.Errorf("rabbitmq get channel error: %v'", err)
}
defer channel.Close()
// 1. declare exchange:
if err := channel.ExchangeDeclare(
exchange.Name,
exchange.Type,
true,
false,
false,
false,
nil,
); err != nil {
return fmt.Errorf("rabbitmq exchange declare error: %v", err)
}
// 2. declare queue:
if _, err := channel.QueueDeclare(
queue.Name,
queue.Durable,
queue.AutoDelete,
queue.Exclusive,
queue.NoWait,
queue.Args,
); err != nil {
return fmt.Errorf("rabbitmq queue declare error: %v", err)
}
// 3. binding queue to exchange:
if err := channel.QueueBind(queue.Name, routingKey, exchange.Name, false, nil, ); err != nil {
return fmt.Errorf("rabbitmq queue binding exchange error: %v", err)
}
// 4. consume:
deliveries, err := channel.Consume(
queue.Name, // name
consumerTag, // consumerTag,
true, // 需要打开, 不然会重复消费
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return fmt.Errorf("rabbitmq queue consume error: %s", err)
}
// 5. do handle task:
go m.handleTasks(deliveries, m.done, taskFn)
return nil
}
// 单条消息处理:
func (m *Consumer) Get(exchange *Exchange, queue *Queue, routingKey string, consumerTag string, taskFn TaskFunc) error {
channel, err := m.conn.Channel()
if err != nil {
return fmt.Errorf("rabbitmq get channel error: %v'", err)
}
defer channel.Close()
// 1. declare exchange:
if err := channel.ExchangeDeclare(
exchange.Name,
exchange.Type,
true,
false,
false,
false,
nil,
); err != nil {
return fmt.Errorf("rabbitmq exchange declare error: %v", err)
}
// 2. declare queue:
if _, err := channel.QueueDeclare(
queue.Name,
queue.Durable,
queue.AutoDelete,
queue.Exclusive,
queue.NoWait,
queue.Args,
); err != nil {
return fmt.Errorf("rabbitmq queue declare error: %v", err)
}
// 3. binding queue to exchange:
if err := channel.QueueBind(queue.Name, routingKey, exchange.Name, false, nil, ); err != nil {
return fmt.Errorf("rabbitmq queue binding exchange error: %v", err)
}
// 4. Get one task:
message, ok, err := channel.Get(queue.Name, true)
if err != nil {
return err
}
if !ok {
return nil
}
// handle task:
if err := taskFn(string(message.Body)); err != nil {
log.Errorf("rabbitmq consumer.Get: taskFunc error: %v, message: %v", err, string(message.Body))
return err
}
return nil
}
// handle one task:
func (m *Consumer) handleTasks(deliveries <-chan amqp.Delivery, done chan error, taskFn TaskFunc) {
for d := range deliveries {
log.Infof("rabbitmq consumer: handle task - size=%d B delivery=[%v] msg=%q", len(d.Body), d.DeliveryTag, d.Body)
// handle one message:
if err := taskFn(string(d.Body)); err != nil {
log.Errorf("rabbitmq consumer.Consume: taskFunc error: %v, message: %v", err, string(d.Body))
}
// ack one:
_ = d.Ack(false)
}
log.Infof("rabbitmq consumer handle done: deliveries channel closed")
done <- nil
}
func (m *Consumer) Close() {
if m.conn != nil {
defer m.conn.Close()
}
}