Skip to content

Commit 2d69a2d

Browse files
committedJun 8, 2016
fix(client): fix blocking/deblocking loop
feat(all): add support for headers with ampq.Table arguments refact(example): refactor basic example to send continuously messages for reconnection testing
1 parent e3f514c commit 2d69a2d

File tree

6 files changed

+48
-41
lines changed

6 files changed

+48
-41
lines changed
 

‎client.go

+14-11
Original file line numberDiff line numberDiff line change
@@ -134,20 +134,23 @@ func (c *Client) Loop() bool {
134134
conn.NotifyClose(chanErr)
135135
conn.NotifyBlocked(chanBlocking)
136136

137-
select {
138-
case err1 := <-chanErr:
139-
c.reportErr(err1)
140-
141-
if conn1 := c.conn.Load().(*amqp.Connection); conn1 != nil {
142-
c.conn.Store((*amqp.Connection)(nil))
143-
conn1.Close()
144-
}
145-
case blocking := <-chanBlocking:
137+
// loop for blocking/deblocking
138+
for {
146139
select {
147-
case c.blocking <- blocking:
148-
default:
140+
case err1 := <-chanErr:
141+
c.reportErr(err1)
142+
143+
if conn1 := c.conn.Load().(*amqp.Connection); conn1 != nil {
144+
c.conn.Store((*amqp.Connection)(nil))
145+
conn1.Close()
146+
}
147+
// return from routine to launch reconnect process
148+
return
149+
case blocking := <-chanBlocking:
150+
c.blocking <- blocking
149151
}
150152
}
153+
151154
}()
152155

153156
ch, err := conn.Channel()

‎cony.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Package cony is a high-level wrapper around http://github.com/streadway/amqp library,
22
// for working declaratively with AMQP. Cony will manage AMQP
3-
// connect/reconnect to AMQP brocker, along with recovery of consumers.
3+
// connect/reconnect to AMQP broker, along with recovery of consumers.
44
package cony
55

66
import (
@@ -15,6 +15,7 @@ type Queue struct {
1515
Durable bool
1616
AutoDelete bool
1717
Exclusive bool
18+
Args amqp.Table
1819

1920
l sync.Mutex
2021
}
@@ -25,13 +26,15 @@ type Exchange struct {
2526
Kind string
2627
Durable bool
2728
AutoDelete bool
29+
Args amqp.Table
2830
}
2931

30-
// Binding used to declare bidning between AMQP Queue and AMQP Exchange
32+
// Binding used to declare binding between AMQP Queue and AMQP Exchange
3133
type Binding struct {
3234
Queue *Queue
3335
Exchange Exchange
3436
Key string
37+
Args amqp.Table
3538
}
3639

3740
type mqDeleter interface {

‎declaration.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package cony
22

33
import "github.com/streadway/amqp"
44

5-
// Declaration is a callback type to declare AMQP queue/exchange/bidning
5+
// Declaration is a callback type to declare AMQP queue/exchange/binding
66
type Declaration func(Declarer) error
77

88
// Declarer is implemented by *amqp.Channel
@@ -20,7 +20,7 @@ func DeclareQueue(q *Queue) Declaration {
2020
q.AutoDelete,
2121
q.Exclusive,
2222
false,
23-
nil,
23+
q.Args,
2424
)
2525
q.l.Lock()
2626
q.Name = realQ.Name
@@ -38,19 +38,19 @@ func DeclareExchange(e Exchange) Declaration {
3838
e.AutoDelete,
3939
false,
4040
false,
41-
nil,
41+
e.Args,
4242
)
4343
}
4444
}
4545

46-
// DeclareBinding is a way to declare AMQP bidning between AMQP queue and exchange
46+
// DeclareBinding is a way to declare AMQP binding between AMQP queue and exchange
4747
func DeclareBinding(b Binding) Declaration {
4848
return func(c Declarer) error {
4949
return c.QueueBind(b.Queue.Name,
5050
b.Key,
5151
b.Exchange.Name,
5252
false,
53-
nil,
53+
b.Args,
5454
)
5555
}
5656
}

‎examples/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
## Basic
44

5-
This is the base example ment to show how to use cony.
6-
The producer will exit after each publish action finishes.
5+
This is the base example meant to show how to use cony.
6+
The producer will publish a message every second (or until you stop the program with CTRL-C).
77
The consumer will run forever (or until you stop the program with CTRL-C).
88
Both applications are CLI-based.
99

‎examples/basic/consumer/consumer.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,17 @@ func main() {
3333
// The queue name will be supplied by the AMQP server
3434
que := &cony.Queue{
3535
AutoDelete: true,
36+
Name: "myQueue",
3637
}
3738
exc := cony.Exchange{
38-
Name: "basic",
39+
Name: "myExc",
3940
Kind: "fanout",
4041
AutoDelete: true,
4142
}
4243
bnd := cony.Binding{
4344
Queue: que,
4445
Exchange: exc,
45-
Key: "",
46+
Key: "pubSub",
4647
}
4748
cli.Declare([]cony.Declaration{
4849
cony.DeclareQueue(que),

‎examples/basic/producer/producer.go

+19-19
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/assembla/cony"
88
"github.com/streadway/amqp"
9+
"time"
910
)
1011

1112
var url = flag.String("url", "amqp://guest:guest@localhost/", "amqp url")
@@ -35,7 +36,7 @@ func main() {
3536

3637
// Declare the exchange we'll be using
3738
exc := cony.Exchange{
38-
Name: "basic",
39+
Name: "myExc",
3940
Kind: "fanout",
4041
AutoDelete: true,
4142
}
@@ -45,38 +46,37 @@ func main() {
4546

4647
// Declare and register a publisher
4748
// with the cony client
48-
pbl := cony.NewPublisher(exc.Name, "")
49+
pbl := cony.NewPublisher(exc.Name, "pubSub")
4950
cli.Publish(pbl)
50-
51-
// Chan used for signaling when
52-
// publishing has been completed
53-
// so the program can exit
54-
done := make(chan struct{})
55-
5651
// Launch a go routine and publish a message.
5752
// "Publish" is a blocking method this is why it
5853
// needs to be called in its own go routine.
5954
//
6055
go func() {
61-
pbl.Publish(amqp.Publishing{
62-
Body: []byte(*body),
63-
})
64-
// Close done to signal that the program
65-
// can exit
66-
close(done)
56+
ticker := time.NewTicker(time.Second)
57+
58+
for {
59+
select {
60+
case <-ticker.C:
61+
fmt.Printf("Client publishing\n")
62+
err := pbl.Publish(amqp.Publishing{
63+
Body: []byte(*body),
64+
})
65+
if err != nil {
66+
fmt.Printf("Client publish error: %v\n", err)
67+
}
68+
}
69+
}
6770
}()
6871

6972
// Client loop sends out declarations(exchanges, queues, bindings
7073
// etc) to the AMQP server. It also handles reconnecting.
71-
// We use the done channel here to make sure our publishing is
72-
// actually published before the programe exists.
73-
// We then close the client to exit the Loop.
7474
for cli.Loop() {
7575
select {
7676
case err := <-cli.Errors():
7777
fmt.Printf("Client error: %v\n", err)
78-
case <-done:
79-
cli.Close()
78+
case blocked := <-cli.Blocking():
79+
fmt.Printf("Client is blocked %v\n", blocked)
8080
}
8181
}
8282
}

0 commit comments

Comments
 (0)
Please sign in to comment.