Skip to content

Commit

Permalink
feat: add nsq output (#25)
Browse files Browse the repository at this point in the history
* feat: add nsq output

* feat: add nsq output

* perf: add nsqd output doc
  • Loading branch information
JTrancender authored Aug 11, 2021
1 parent 22af0a3 commit 49bac73
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 10 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
},
"output": {
"nsqd": {
"nsqd-tcp-addresses": ["127.0.0.1:4150"],
"nsqd": "127.0.0.1:4150",
"topic": "dev_test_dup",
"enabled": false
"enabled": false,
"enabled_topic": true
},
"console": {
"enabled": true
Expand All @@ -37,11 +38,11 @@ make clean && ./build/nsq_to_consumer --etcd-endpoints 127.0.0.1:2379 --etcd-use
~~~
### Output list
1. console
2. nsqd
3. file[todo]
3. nsqd[todo]
3. http[todo]
4. mysql[todo]
5. elasticsearch[todo]
4. http[todo]
5. mysql[todo]
6. elasticsearch[todo]

### Getting Help
If you need help or hit an issue, you can make a issue, we will deal it as soon as posibile.
Expand Down
8 changes: 8 additions & 0 deletions consumer/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ func (m *Message) GetNsqMessage() *nsq.Message {
return m.message
}

func (m *Message) GetTopic() string {
return m.topic
}

func (m *Message) GetMessageBody() []byte {
return m.message.Body
}

func (m *Message) Body() []byte {
return append([]byte(m.topic+":"), m.message.Body...)
}
2 changes: 2 additions & 0 deletions libconsumer/consumer/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type Pipeline interface {
type Message interface {
GetNsqMessage() *nsq.Message
Body() []byte
GetTopic() string
GetMessageBody() []byte
}

type PipelineConnector = Pipeline
Expand Down
77 changes: 77 additions & 0 deletions libconsumer/outputs/nsq/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package nsq

import (
"context"
"sync"

"github.com/JieTrancender/nsq_to_consumer/libconsumer/consumer"
"github.com/JieTrancender/nsq_to_consumer/libconsumer/logp"
"github.com/JieTrancender/nsq_to_consumer/libconsumer/outputs"
"github.com/nsqio/go-nsq"
)

type client struct {
logger *logp.Logger
outputs.NetworkClient

// for nsq
nsqd string
topic string
enabledTopic bool
producer *nsq.Producer
config *nsq.Config

mux sync.Mutex
}

func newNsqClient(config *Config) (*client, error) {
cfg := nsq.NewConfig()
cfg.WriteTimeout = config.WriteTimeout
cfg.DialTimeout = config.DialTimeout
c := &client{
logger: logp.NewLogger(logSelector),
nsqd: config.Nsqd,
topic: config.Topic,
config: cfg,
enabledTopic: config.EnabledTopic,
}

return c, nil
}

func (c *client) Close() error {
c.producer.Stop()
return nil
}

func (c *client) Connect() error {
c.mux.Lock()
defer c.mux.Unlock()

c.logger.Debugf("connect: %v", c.nsqd)
producer, err := nsq.NewProducer(c.nsqd, c.config)
if err != nil {
c.logger.Errorf("nsq connect fail with: %+v", err)
return err
}

// todo: set logger
c.producer = producer
return nil
}

func (c *client) Publish(_ context.Context, m consumer.Message) error {
if c.enabledTopic {
return c.producer.Publish(c.topic, m.GetMessageBody())
}

if m.GetNsqMessage().NSQDAddress == c.nsqd {
c.logger.Debugf("The nsq address are same as the message's address, maybe endless")
}

return c.producer.Publish(m.GetTopic(), m.GetMessageBody())
}

func (c *client) String() string {
return "NSQD"
}
49 changes: 49 additions & 0 deletions libconsumer/outputs/nsq/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package nsq

import (
"fmt"
"time"

"github.com/JieTrancender/nsq_to_consumer/libconsumer/common"
)

type Config struct {
Nsqd string `config:"nsqd"`
Topic string `config:"topic"`
BulkMaxSize int `config:"bulk_max_size"`
MaxRetries int `config:"max_retries"`
WriteTimeout time.Duration `config:"write_timeout"`
DialTimeout time.Duration `config:"dial_timeout"`

// If not enabled topic, while using original topic
EnabledTopic bool `config:"enabled_topic"`
}

func defaultConfig() Config {
return Config{
Nsqd: "127.0.0.1:4150",
Topic: "nsq_consumer",
BulkMaxSize: 256,
MaxRetries: 3,
WriteTimeout: 6 * time.Second,
DialTimeout: 6 * time.Second,
EnabledTopic: false,
}
}

func readConfig(cfg *common.Config) (*Config, error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
return nil, err
}

return &c, nil
}

func (c *Config) Validate() error {
if c.EnabledTopic && c.Topic == "" {
return fmt.Errorf("Topic can not be empty when enabled topic")
}

return nil
}
32 changes: 32 additions & 0 deletions libconsumer/outputs/nsq/nsq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package nsq

import (
"github.com/JieTrancender/nsq_to_consumer/libconsumer/common"
"github.com/JieTrancender/nsq_to_consumer/libconsumer/consumer"
"github.com/JieTrancender/nsq_to_consumer/libconsumer/outputs"
)

const (
logSelector = "nsqd"
)

func init() {
outputs.RegisterType("nsqd", makeNsq)
}

func makeNsq(
consumerInfo consumer.Info,
cfg *common.Config,
) (outputs.Group, error) {
config, err := readConfig(cfg)
if err != nil {
return outputs.Fail(err)
}

client, err := newNsqClient(config)
if err != nil {
return outputs.Fail(err)
}

return outputs.Success(0, 0, client)
}
9 changes: 9 additions & 0 deletions libconsumer/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,12 @@ type Client interface {

String() string
}

type NetworkClient interface {
Client
Connectable
}

type Connectable interface {
Connect() error
}
1 change: 1 addition & 0 deletions libconsumer/publisher/includes/includes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ package includes

import (
_ "github.com/JieTrancender/nsq_to_consumer/libconsumer/outputs/console"
_ "github.com/JieTrancender/nsq_to_consumer/libconsumer/outputs/nsq"
)
84 changes: 80 additions & 4 deletions libconsumer/publisher/pipeline/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ type clientWorker struct {
logger *logp.Logger
}

// netClientWorker manages reconnectable output clients of type outputs.NetworkClient.
type netClientWorker struct {
worker
client outputs.NetworkClient

logger *logp.Logger
}

func makeClientWorker(msgChan chan consumer.Message, client outputs.Client, logger *logp.Logger) outputWorker {
w := worker{
msgChan: msgChan,
Expand All @@ -30,11 +38,22 @@ func makeClientWorker(msgChan chan consumer.Message, client outputs.Client, logg
var c interface {
outputWorker
run()
} = &clientWorker{
worker: w,
client: client,
logger: logger,
}

if nc, ok := client.(outputs.NetworkClient); ok {
c = &netClientWorker{
worker: w,
client: nc,
logger: logger,
}
} else {
c = &clientWorker{
worker: w,
client: client,
logger: logger,
}
}

go c.run()
return c
}
Expand Down Expand Up @@ -65,3 +84,60 @@ func (w *clientWorker) run() {
}
}
}

func (w *netClientWorker) Close() error {
w.worker.close()
return w.client.Close()
}

func (w *netClientWorker) run() {
var (
connected = false
reconnectAttempts = 0
)

for {
// We wait for either the worker to be closed or for there to be message to publish.
select {
case <-w.done:
return
case m := <-w.msgChan:
// Try to (re)connect so we can publish message
if !connected {
if reconnectAttempts == 0 {
w.logger.Infof("Connecting to %v", w.client)
} else {
w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client,
reconnectAttempts)
}

err := w.client.Connect()
connected = err == nil
if connected {
w.logger.Infof("Connection to %v established", w.client)
reconnectAttempts = 0
} else {
w.logger.Errorf("Failed to connect to %v: %v", w.client, err)
reconnectAttempts++
m.GetNsqMessage().Requeue(-1)
continue
}
}

if err := w.publishMessage(m); err != nil {
connected = false
continue
}
}
}
}

func (w *netClientWorker) publishMessage(m consumer.Message) error {
if err := w.client.Publish(context.Background(), m); err != nil {
m.GetNsqMessage().Requeue(-1)
w.logger.Errorf("clientWorker#run Publish message fail:%v", err)
return err
}
m.GetNsqMessage().Finish()
return nil
}

0 comments on commit 49bac73

Please sign in to comment.