-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathkafka_stream.go
145 lines (108 loc) · 3.13 KB
/
kafka_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package epee
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
"time"
)
type kafkaStream interface {
// Close all resources associated with this thing.
Close()
// Returns a channel of messages to consume based on the client ID.
Consume(topic string, partition int, offset int64) (*streamConsumer, error)
// Given a consumer, gracefully stops it.
CancelConsumer(*streamConsumer) error
}
type kafkaStreamImpl struct {
sync.Mutex
// A list of stream consumers that have been created.
consumers map[*streamConsumer]bool
// Indicates to child processes that we should continue running.
closing bool
// The consumer we're using to consume stuff.
consumer sarama.Consumer
// The client connected to
client sarama.Client
// The zookeeper cluster our service is connecting to.
zk ZookeeperClient
}
func (ks *kafkaStreamImpl) Consume(topic string, partition int, offset int64) (*streamConsumer, error) {
// If the stream is in the process of closing we don't want to start a new
// consumer.
if ks.closing {
return nil, ErrStreamClosing
}
if offset == 0 {
offset = sarama.OffsetOldest
}
var partitionConsumer sarama.PartitionConsumer
var err error
for {
if partitionConsumer != nil {
break
}
partitionConsumer, err = ks.consumer.ConsumePartition(topic, int32(partition), offset)
if err == sarama.ErrUnknownTopicOrPartition {
logWarning("Failed to find [%s, partition %d]. Waiting, then retrying.", topic, partition)
<-time.After(5 * time.Second)
continue
} else if err != nil {
logError("Failed to start partition consumer. %v", err)
return nil, err
}
}
ch := make(chan *Message, 0)
consumer := newStreamConsumer(ch, partitionConsumer)
// We have to acquire the lock to modify the map.
ks.Lock()
ks.consumers[consumer] = true
ks.Unlock()
// Let's start the consumer up!
consumer.Start()
return consumer, nil
}
func (ks *kafkaStreamImpl) CancelConsumer(sc *streamConsumer) error {
ks.Lock()
defer ks.Unlock()
// Only actually cancel this consumer if it's still alive.
_, ok := ks.consumers[sc]
if ok {
sc.Close()
delete(ks.consumers, sc)
}
return nil
}
func (ks *kafkaStreamImpl) Close() {
ks.Lock()
defer ks.Unlock()
ks.closing = true
// Let's close all the created consumers.
for c := range ks.consumers {
// Wait for this consumer to close fully.
c.Close()
}
// Now all of the consumers should (theoretically) be done.
ks.consumer.Close()
}
func newKafkaStream(clientID string, zk ZookeeperClient) (kafkaStream, error) {
brokers, err := findRegisteredBrokers(zk)
if err != nil {
return nil, err
}
logInfo("Using brokers %v", brokers)
client, err := sarama.NewClient(brokers, getConfig(clientID))
if err != nil {
return nil, fmt.Errorf("Failed to instantiate new client. %v", err)
}
// Now that we have a client, let's start a consumer up.
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
client.Close()
return nil, fmt.Errorf("Failed to open new consumer from client. %v", err)
}
stream := new(kafkaStreamImpl)
stream.client = client
stream.consumer = consumer
stream.consumers = make(map[*streamConsumer]bool)
return stream, nil
}