Skip to content

Commit 0ceec31

Browse files
author
thekingofworld
committed
nsqd: discover topic/channel paused state on new topic discovery
1 parent 8adb229 commit 0ceec31

17 files changed

+593
-59
lines changed

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ require (
1717
)
1818

1919
go 1.13
20+
21+
replace github.com/nsqio/go-nsq v1.0.8 => github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ github.com/mreiferson/go-options v1.0.0 h1:RMLidydGlDWpL+lQTXo0bVIf/XT2CTq7AEJMo
2121
github.com/mreiferson/go-options v1.0.0/go.mod h1:zHtCks/HQvOt8ATyfwVe3JJq2PPuImzXINPRTC03+9w=
2222
github.com/nsqio/go-diskqueue v1.0.0 h1:XRqpx7zTMu9yNVH+cHvA5jEiPNKoYcyEsCVqXP3eFg4=
2323
github.com/nsqio/go-diskqueue v1.0.0/go.mod h1:INuJIxl4ayUsyoNtHL5+9MFPDfSZ0zY93hNY6vhBRsI=
24-
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
25-
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
2624
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2725
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
2826
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
2927
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
3028
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
29+
github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90 h1:js7rqe9IqkTNFcyXjbmuOo0nV3Z2HBM4yNc8SWqByUs=
30+
github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
3131
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 h1:Dho5nD6R3PcW2SH1or8vS0dszDaXRxIw55lBX7XiE5g=
3232
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
3333
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=

internal/clusterinfo/data.go

+78-10
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,14 @@ func (c *ClusterInfo) GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, err
119119

120120
// GetLookupdTopicChannels returns a []string containing a union of all the channels
121121
// from all the given lookupd for the given topic
122-
func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, error) {
123-
var channels []string
122+
func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]ChannelState, error) {
124123
var lock sync.Mutex
125124
var wg sync.WaitGroup
126125
var errs []error
127126

128-
type respType struct {
129-
Channels []string `json:"channels"`
127+
topicChannelsMeta := &TopicChannelsMeta{
128+
Channels: []string{},
129+
ChannelsMeta: map[string]*ChannelMeta{},
130130
}
131131

132132
for _, addr := range lookupdHTTPAddrs {
@@ -137,7 +137,7 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s
137137
endpoint := fmt.Sprintf("http://%s/channels?topic=%s", addr, url.QueryEscape(topic))
138138
c.logf("CI: querying nsqlookupd %s", endpoint)
139139

140-
var resp respType
140+
var resp TopicChannelsMeta
141141
err := c.client.GETV1(endpoint, &resp)
142142
if err != nil {
143143
lock.Lock()
@@ -148,7 +148,15 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s
148148

149149
lock.Lock()
150150
defer lock.Unlock()
151-
channels = append(channels, resp.Channels...)
151+
topicChannelsMeta.Channels = append(topicChannelsMeta.Channels, resp.Channels...)
152+
for channelName, channelMeta := range resp.ChannelsMeta {
153+
if curMeta, ok := topicChannelsMeta.ChannelsMeta[channelName]; ok {
154+
if curMeta != nil && curMeta.Paused == true {
155+
continue //one of the lookupd has returned paused,so just continue
156+
}
157+
}
158+
topicChannelsMeta.ChannelsMeta[channelName] = channelMeta
159+
}
152160
}(addr)
153161
}
154162
wg.Wait()
@@ -157,13 +165,73 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s
157165
return nil, fmt.Errorf("Failed to query any nsqlookupd: %s", ErrList(errs))
158166
}
159167

160-
channels = stringy.Uniq(channels)
161-
sort.Strings(channels)
168+
topicChannelsMeta.Channels = stringy.Uniq(topicChannelsMeta.Channels)
169+
sort.Strings(topicChannelsMeta.Channels)
170+
171+
var channelStates []ChannelState
172+
for _, channelName := range topicChannelsMeta.Channels {
173+
channelState := ChannelState{
174+
Name: channelName,
175+
Paused: false,
176+
}
177+
if meta, ok := topicChannelsMeta.ChannelsMeta[channelName]; ok && meta != nil {
178+
channelState.Paused = meta.Paused
179+
}
180+
channelStates = append(channelStates, channelState)
181+
}
182+
183+
if len(errs) > 0 {
184+
return channelStates, ErrList(errs)
185+
}
186+
return channelStates, nil
187+
}
188+
189+
// GetLookupdTopic return a topicMeta info from all the given lookupd for the given topic
190+
func (c *ClusterInfo) GetLookupdTopic(topic string, lookupdHTTPAddrs []string) (TopicMeta, error) {
191+
var lock sync.Mutex
192+
var wg sync.WaitGroup
193+
var errs []error
194+
195+
topicMeta := TopicMeta{}
196+
type respType struct {
197+
Topics []string `json:"topics"`
198+
TopicsMeta map[string]*TopicMeta `json:"topics_meta"`
199+
}
200+
for _, addr := range lookupdHTTPAddrs {
201+
wg.Add(1)
202+
go func(addr string) {
203+
defer wg.Done()
204+
205+
endpoint := fmt.Sprintf("http://%s/topics?topic=%s", addr, url.QueryEscape(topic))
206+
c.logf("CI: querying nsqlookupd %s", endpoint)
207+
208+
var resp respType
209+
err := c.client.GETV1(endpoint, &resp)
210+
if err != nil {
211+
lock.Lock()
212+
errs = append(errs, err)
213+
lock.Unlock()
214+
return
215+
}
162216

217+
lock.Lock()
218+
defer lock.Unlock()
219+
if metaData, ok := resp.TopicsMeta[topic]; ok {
220+
//if one of the lookupd return paused, that should be paused
221+
if metaData != nil && metaData.Paused == true && topicMeta.Paused == false {
222+
topicMeta.Paused = true
223+
}
224+
}
225+
}(addr)
226+
}
227+
wg.Wait()
228+
if len(errs) == len(lookupdHTTPAddrs) {
229+
return topicMeta, fmt.Errorf("Failed to query any nsqlookupd: %s", ErrList(errs))
230+
}
163231
if len(errs) > 0 {
164-
return channels, ErrList(errs)
232+
return topicMeta, ErrList(errs)
165233
}
166-
return channels, nil
234+
return topicMeta, nil
167235
}
168236

169237
// GetLookupdProducers returns Producers of all the nsqd connected to the given lookupds

internal/clusterinfo/types.go

+18
Original file line numberDiff line numberDiff line change
@@ -304,3 +304,21 @@ type ProducersByHost struct {
304304
func (c ProducersByHost) Less(i, j int) bool {
305305
return c.Producers[i].Hostname < c.Producers[j].Hostname
306306
}
307+
308+
type ChannelState struct {
309+
Name string
310+
Paused bool
311+
}
312+
313+
type ChannelMeta struct {
314+
Paused bool `json:"paused"`
315+
}
316+
317+
type TopicMeta struct {
318+
Paused bool `json:"paused"`
319+
}
320+
321+
type TopicChannelsMeta struct {
322+
Channels []string `json:"channels"`
323+
ChannelsMeta map[string]*ChannelMeta `json:"channels_meta"`
324+
}

nsqadmin/http.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,13 @@ func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps
228228
producers, _ := s.ci.GetLookupdTopicProducers(
229229
topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
230230
if len(producers) == 0 {
231-
topicChannels, _ := s.ci.GetLookupdTopicChannels(
231+
var channels []string
232+
channelStates, _ := s.ci.GetLookupdTopicChannels(
232233
topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
233-
topicChannelMap[topicName] = topicChannels
234+
for _, channelState := range channelStates {
235+
channels = append(channels, channelState.Name)
236+
}
237+
topicChannelMap[topicName] = channels
234238
}
235239
}
236240
respond:

nsqadmin/http_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ type TopicsDoc struct {
2323
Topics []interface{} `json:"topics"`
2424
}
2525

26+
type TopicsInactiveDoc struct {
27+
Topics map[string][]string `json:"topics"`
28+
}
29+
2630
type TopicStatsDoc struct {
2731
*clusterinfo.TopicStats
2832
Message string `json:"message"`
@@ -177,6 +181,20 @@ func TestHTTPTopicsGET(t *testing.T) {
177181
test.Nil(t, err)
178182
test.Equal(t, 1, len(tr.Topics))
179183
test.Equal(t, topicName, tr.Topics[0])
184+
185+
url = fmt.Sprintf("http://%s/api/topics?inactive=true", nsqadmin1.RealHTTPAddr())
186+
req, _ = http.NewRequest("GET", url, nil)
187+
resp, err = client.Do(req)
188+
test.Nil(t, err)
189+
test.Equal(t, 200, resp.StatusCode)
190+
body, _ = ioutil.ReadAll(resp.Body)
191+
resp.Body.Close()
192+
193+
t.Logf("%s", body)
194+
ti := TopicsInactiveDoc{}
195+
err = json.Unmarshal(body, &ti)
196+
test.Nil(t, err)
197+
test.Equal(t, 0, len(ti.Topics))
180198
}
181199

182200
func TestHTTPTopicGET(t *testing.T) {

nsqd/channel.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,
117117
)
118118
}
119119

120-
c.nsqd.Notify(c)
120+
c.nsqd.Notify(notifyContext{NotifyTypeRegistration, c})
121121

122122
return c
123123
}
@@ -164,7 +164,7 @@ func (c *Channel) exit(deleted bool) error {
164164

165165
// since we are explicitly deleting a channel (not just at system exit time)
166166
// de-register this from the lookupd
167-
c.nsqd.Notify(c)
167+
c.nsqd.Notify(notifyContext{NotifyTypeUnRegistration, c})
168168
} else {
169169
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name)
170170
}
@@ -256,11 +256,15 @@ func (c *Channel) Depth() int64 {
256256
}
257257

258258
func (c *Channel) Pause() error {
259-
return c.doPause(true)
259+
err := c.doPause(true)
260+
c.nsqd.Notify(notifyContext{NotifyTypeStateUpdate, c})
261+
return err
260262
}
261263

262264
func (c *Channel) UnPause() error {
263-
return c.doPause(false)
265+
err := c.doPause(false)
266+
c.nsqd.Notify(notifyContext{NotifyTypeStateUpdate, c})
267+
return err
264268
}
265269

266270
func (c *Channel) doPause(pause bool) error {

nsqd/lookup.go

+81-22
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package nsqd
33
import (
44
"bytes"
55
"encoding/json"
6+
"fmt"
67
"net"
78
"os"
89
"strconv"
@@ -12,6 +13,17 @@ import (
1213
"github.com/nsqio/nsq/internal/version"
1314
)
1415

16+
const (
17+
NotifyTypeRegistration = iota
18+
NotifyTypeUnRegistration
19+
NotifyTypeStateUpdate
20+
)
21+
22+
type notifyContext struct {
23+
notifyType int
24+
v interface{}
25+
}
26+
1527
func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
1628
return func(lp *lookupPeer) {
1729
ci := make(map[string]interface{})
@@ -53,11 +65,26 @@ func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
5365
n.RLock()
5466
for _, topic := range n.topicMap {
5567
topic.RLock()
56-
if len(topic.channelMap) == 0 {
57-
commands = append(commands, nsq.Register(topic.name, ""))
58-
} else {
59-
for _, channel := range topic.channelMap {
60-
commands = append(commands, nsq.Register(channel.topicName, channel.name))
68+
commands = append(commands, nsq.Register(topic.name, ""))
69+
topicPaused := topic.IsPaused()
70+
if topicPaused { //sync state when topic paused
71+
command, err := nsq.SyncState(topic.name, "", map[string]interface{}{"paused": topicPaused})
72+
if err != nil {
73+
n.logf(LOG_ERROR, "LOOKUPD(%s): SyncState - %s", lp, err)
74+
} else {
75+
commands = append(commands, command)
76+
}
77+
}
78+
for _, channel := range topic.channelMap {
79+
commands = append(commands, nsq.Register(channel.topicName, channel.name))
80+
channelPaused := channel.IsPaused()
81+
if channelPaused { //sync state when channel paused
82+
command, err := nsq.SyncState(channel.topicName, channel.name, map[string]interface{}{"paused": channelPaused})
83+
if err != nil {
84+
n.logf(LOG_ERROR, "LOOKUPD(%s): SyncState - %s", lp, err)
85+
continue
86+
}
87+
commands = append(commands, command)
6188
}
6289
}
6390
topic.RUnlock()
@@ -118,29 +145,61 @@ func (n *NSQD) lookupLoop() {
118145
}
119146
case val := <-n.notifyChan:
120147
var cmd *nsq.Command
148+
var err error
121149
var branch string
122-
123-
switch val.(type) {
124-
case *Channel:
125-
// notify all nsqlookupds that a new channel exists, or that it's removed
126-
branch = "channel"
127-
channel := val.(*Channel)
128-
if channel.Exiting() == true {
129-
cmd = nsq.UnRegister(channel.topicName, channel.name)
130-
} else {
150+
notifyCtx, ok := val.(notifyContext)
151+
if !ok {
152+
panic("non-notifyContext sent to notifyChan - should never happen")
153+
}
154+
switch notifyCtx.notifyType {
155+
case NotifyTypeRegistration:
156+
switch notifyCtx.v.(type) {
157+
case *Channel:
158+
// notify all nsqlookupds that a new channel exists
159+
branch = "channel"
160+
channel := notifyCtx.v.(*Channel)
131161
cmd = nsq.Register(channel.topicName, channel.name)
162+
case *Topic:
163+
// notify all nsqlookupds that a new topic exists
164+
branch = "topic"
165+
topic := notifyCtx.v.(*Topic)
166+
cmd = nsq.Register(topic.name, "")
132167
}
133-
case *Topic:
134-
// notify all nsqlookupds that a new topic exists, or that it's removed
135-
branch = "topic"
136-
topic := val.(*Topic)
137-
if topic.Exiting() == true {
168+
case NotifyTypeUnRegistration:
169+
switch notifyCtx.v.(type) {
170+
case *Channel:
171+
// notify all nsqlookupds that a new channel removed
172+
branch = "channel"
173+
channel := notifyCtx.v.(*Channel)
174+
cmd = nsq.UnRegister(channel.topicName, channel.name)
175+
case *Topic:
176+
// notify all nsqlookupds that a new topic removed
177+
branch = "topic"
178+
topic := notifyCtx.v.(*Topic)
138179
cmd = nsq.UnRegister(topic.name, "")
139-
} else {
140-
cmd = nsq.Register(topic.name, "")
141180
}
181+
case NotifyTypeStateUpdate:
182+
switch notifyCtx.v.(type) {
183+
case *Channel:
184+
// notify all nsqlookupds that channel state changed
185+
branch = "channel"
186+
channel := notifyCtx.v.(*Channel)
187+
cmd, err = nsq.SyncState(channel.topicName, channel.name, map[string]interface{}{"paused": channel.IsPaused()})
188+
if err != nil {
189+
n.logf(LOG_ERROR, "NSQD: build cmd err: %s", err)
190+
}
191+
case *Topic:
192+
// notify all nsqlookupds that topic state changed
193+
branch = "topic"
194+
topic := notifyCtx.v.(*Topic)
195+
cmd, err = nsq.SyncState(topic.name, "", map[string]interface{}{"paused": topic.IsPaused()})
196+
if err != nil {
197+
n.logf(LOG_ERROR, "NSQD: build cmd err: %s", err)
198+
}
199+
}
200+
default:
201+
panic(fmt.Sprintf("unknown notifyType in notifyContext: %d, should never happen", notifyCtx.notifyType))
142202
}
143-
144203
for _, lookupPeer := range lookupPeers {
145204
n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
146205
_, err := lookupPeer.Command(cmd)

0 commit comments

Comments
 (0)