From 49bac73678ac11097ce58a7c0fb037c2930f73bf Mon Sep 17 00:00:00 2001 From: JTrancender Date: Wed, 11 Aug 2021 17:43:54 +0800 Subject: [PATCH] feat: add nsq output (#25) * feat: add nsq output * feat: add nsq output * perf: add nsqd output doc --- README.md | 13 ++-- consumer/message.go | 8 +++ libconsumer/consumer/pipeline.go | 2 + libconsumer/outputs/nsq/client.go | 77 ++++++++++++++++++++ libconsumer/outputs/nsq/config.go | 49 +++++++++++++ libconsumer/outputs/nsq/nsq.go | 32 +++++++++ libconsumer/outputs/outputs.go | 9 +++ libconsumer/publisher/includes/includes.go | 1 + libconsumer/publisher/pipeline/output.go | 84 ++++++++++++++++++++-- 9 files changed, 265 insertions(+), 10 deletions(-) create mode 100644 libconsumer/outputs/nsq/client.go create mode 100644 libconsumer/outputs/nsq/config.go create mode 100644 libconsumer/outputs/nsq/nsq.go diff --git a/README.md b/README.md index cd5959e..6f905b3 100644 --- a/README.md +++ b/README.md @@ -17,9 +17,10 @@ }, "output": { "nsqd": { - "nsqd-tcp-addresses": ["127.0.0.1:4150"], + "nsqd": "127.0.0.1:4150", "topic": "dev_test_dup", - "enabled": false + "enabled": false, + "enabled_topic": true }, "console": { "enabled": true @@ -37,11 +38,11 @@ make clean && ./build/nsq_to_consumer --etcd-endpoints 127.0.0.1:2379 --etcd-use ~~~ ### Output list 1. console +2. nsqd 3. file[todo] -3. nsqd[todo] -3. http[todo] -4. mysql[todo] -5. elasticsearch[todo] +4. http[todo] +5. mysql[todo] +6. elasticsearch[todo] ### Getting Help If you need help or hit an issue, you can make a issue, we will deal it as soon as posibile. diff --git a/consumer/message.go b/consumer/message.go index 6cb23ac..6f9a2b3 100644 --- a/consumer/message.go +++ b/consumer/message.go @@ -11,6 +11,14 @@ func (m *Message) GetNsqMessage() *nsq.Message { return m.message } +func (m *Message) GetTopic() string { + return m.topic +} + +func (m *Message) GetMessageBody() []byte { + return m.message.Body +} + func (m *Message) Body() []byte { return append([]byte(m.topic+":"), m.message.Body...) } diff --git a/libconsumer/consumer/pipeline.go b/libconsumer/consumer/pipeline.go index dfd1292..65be897 100644 --- a/libconsumer/consumer/pipeline.go +++ b/libconsumer/consumer/pipeline.go @@ -14,6 +14,8 @@ type Pipeline interface { type Message interface { GetNsqMessage() *nsq.Message Body() []byte + GetTopic() string + GetMessageBody() []byte } type PipelineConnector = Pipeline diff --git a/libconsumer/outputs/nsq/client.go b/libconsumer/outputs/nsq/client.go new file mode 100644 index 0000000..38d7b9e --- /dev/null +++ b/libconsumer/outputs/nsq/client.go @@ -0,0 +1,77 @@ +package nsq + +import ( + "context" + "sync" + + "github.com/JieTrancender/nsq_to_consumer/libconsumer/consumer" + "github.com/JieTrancender/nsq_to_consumer/libconsumer/logp" + "github.com/JieTrancender/nsq_to_consumer/libconsumer/outputs" + "github.com/nsqio/go-nsq" +) + +type client struct { + logger *logp.Logger + outputs.NetworkClient + + // for nsq + nsqd string + topic string + enabledTopic bool + producer *nsq.Producer + config *nsq.Config + + mux sync.Mutex +} + +func newNsqClient(config *Config) (*client, error) { + cfg := nsq.NewConfig() + cfg.WriteTimeout = config.WriteTimeout + cfg.DialTimeout = config.DialTimeout + c := &client{ + logger: logp.NewLogger(logSelector), + nsqd: config.Nsqd, + topic: config.Topic, + config: cfg, + enabledTopic: config.EnabledTopic, + } + + return c, nil +} + +func (c *client) Close() error { + c.producer.Stop() + return nil +} + +func (c *client) Connect() error { + c.mux.Lock() + defer c.mux.Unlock() + + c.logger.Debugf("connect: %v", c.nsqd) + producer, err := nsq.NewProducer(c.nsqd, c.config) + if err != nil { + c.logger.Errorf("nsq connect fail with: %+v", err) + return err + } + + // todo: set logger + c.producer = producer + return nil +} + +func (c *client) Publish(_ context.Context, m consumer.Message) error { + if c.enabledTopic { + return c.producer.Publish(c.topic, m.GetMessageBody()) + } + + if m.GetNsqMessage().NSQDAddress == c.nsqd { + c.logger.Debugf("The nsq address are same as the message's address, maybe endless") + } + + return c.producer.Publish(m.GetTopic(), m.GetMessageBody()) +} + +func (c *client) String() string { + return "NSQD" +} diff --git a/libconsumer/outputs/nsq/config.go b/libconsumer/outputs/nsq/config.go new file mode 100644 index 0000000..ef4c1d8 --- /dev/null +++ b/libconsumer/outputs/nsq/config.go @@ -0,0 +1,49 @@ +package nsq + +import ( + "fmt" + "time" + + "github.com/JieTrancender/nsq_to_consumer/libconsumer/common" +) + +type Config struct { + Nsqd string `config:"nsqd"` + Topic string `config:"topic"` + BulkMaxSize int `config:"bulk_max_size"` + MaxRetries int `config:"max_retries"` + WriteTimeout time.Duration `config:"write_timeout"` + DialTimeout time.Duration `config:"dial_timeout"` + + // If not enabled topic, while using original topic + EnabledTopic bool `config:"enabled_topic"` +} + +func defaultConfig() Config { + return Config{ + Nsqd: "127.0.0.1:4150", + Topic: "nsq_consumer", + BulkMaxSize: 256, + MaxRetries: 3, + WriteTimeout: 6 * time.Second, + DialTimeout: 6 * time.Second, + EnabledTopic: false, + } +} + +func readConfig(cfg *common.Config) (*Config, error) { + c := defaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + + return &c, nil +} + +func (c *Config) Validate() error { + if c.EnabledTopic && c.Topic == "" { + return fmt.Errorf("Topic can not be empty when enabled topic") + } + + return nil +} diff --git a/libconsumer/outputs/nsq/nsq.go b/libconsumer/outputs/nsq/nsq.go new file mode 100644 index 0000000..06cc4b2 --- /dev/null +++ b/libconsumer/outputs/nsq/nsq.go @@ -0,0 +1,32 @@ +package nsq + +import ( + "github.com/JieTrancender/nsq_to_consumer/libconsumer/common" + "github.com/JieTrancender/nsq_to_consumer/libconsumer/consumer" + "github.com/JieTrancender/nsq_to_consumer/libconsumer/outputs" +) + +const ( + logSelector = "nsqd" +) + +func init() { + outputs.RegisterType("nsqd", makeNsq) +} + +func makeNsq( + consumerInfo consumer.Info, + cfg *common.Config, +) (outputs.Group, error) { + config, err := readConfig(cfg) + if err != nil { + return outputs.Fail(err) + } + + client, err := newNsqClient(config) + if err != nil { + return outputs.Fail(err) + } + + return outputs.Success(0, 0, client) +} diff --git a/libconsumer/outputs/outputs.go b/libconsumer/outputs/outputs.go index 2901b49..965f89c 100644 --- a/libconsumer/outputs/outputs.go +++ b/libconsumer/outputs/outputs.go @@ -13,3 +13,12 @@ type Client interface { String() string } + +type NetworkClient interface { + Client + Connectable +} + +type Connectable interface { + Connect() error +} diff --git a/libconsumer/publisher/includes/includes.go b/libconsumer/publisher/includes/includes.go index 26620fd..214b596 100644 --- a/libconsumer/publisher/includes/includes.go +++ b/libconsumer/publisher/includes/includes.go @@ -2,4 +2,5 @@ package includes import ( _ "github.com/JieTrancender/nsq_to_consumer/libconsumer/outputs/console" + _ "github.com/JieTrancender/nsq_to_consumer/libconsumer/outputs/nsq" ) diff --git a/libconsumer/publisher/pipeline/output.go b/libconsumer/publisher/pipeline/output.go index a30fc49..08a7fbf 100644 --- a/libconsumer/publisher/pipeline/output.go +++ b/libconsumer/publisher/pipeline/output.go @@ -21,6 +21,14 @@ type clientWorker struct { logger *logp.Logger } +// netClientWorker manages reconnectable output clients of type outputs.NetworkClient. +type netClientWorker struct { + worker + client outputs.NetworkClient + + logger *logp.Logger +} + func makeClientWorker(msgChan chan consumer.Message, client outputs.Client, logger *logp.Logger) outputWorker { w := worker{ msgChan: msgChan, @@ -30,11 +38,22 @@ func makeClientWorker(msgChan chan consumer.Message, client outputs.Client, logg var c interface { outputWorker run() - } = &clientWorker{ - worker: w, - client: client, - logger: logger, } + + if nc, ok := client.(outputs.NetworkClient); ok { + c = &netClientWorker{ + worker: w, + client: nc, + logger: logger, + } + } else { + c = &clientWorker{ + worker: w, + client: client, + logger: logger, + } + } + go c.run() return c } @@ -65,3 +84,60 @@ func (w *clientWorker) run() { } } } + +func (w *netClientWorker) Close() error { + w.worker.close() + return w.client.Close() +} + +func (w *netClientWorker) run() { + var ( + connected = false + reconnectAttempts = 0 + ) + + for { + // We wait for either the worker to be closed or for there to be message to publish. + select { + case <-w.done: + return + case m := <-w.msgChan: + // Try to (re)connect so we can publish message + if !connected { + if reconnectAttempts == 0 { + w.logger.Infof("Connecting to %v", w.client) + } else { + w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, + reconnectAttempts) + } + + err := w.client.Connect() + connected = err == nil + if connected { + w.logger.Infof("Connection to %v established", w.client) + reconnectAttempts = 0 + } else { + w.logger.Errorf("Failed to connect to %v: %v", w.client, err) + reconnectAttempts++ + m.GetNsqMessage().Requeue(-1) + continue + } + } + + if err := w.publishMessage(m); err != nil { + connected = false + continue + } + } + } +} + +func (w *netClientWorker) publishMessage(m consumer.Message) error { + if err := w.client.Publish(context.Background(), m); err != nil { + m.GetNsqMessage().Requeue(-1) + w.logger.Errorf("clientWorker#run Publish message fail:%v", err) + return err + } + m.GetNsqMessage().Finish() + return nil +}