diff --git a/cmd/root.go b/cmd/root.go index 62f7830..4b599d1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -17,6 +17,7 @@ func NsqConsumerSettings() instance.Settings { runFlags.String("etcd-path", "/config/nsq_consumer/default", "etcd path") runFlags.String("etcd-username", "root", "etcd username") runFlags.String("etcd-password", "root", "etcd password") + runFlags.String("channel", "NsqConsumer", "channel name of this nsqd consumer") return instance.Settings{ RunFlags: runFlags, Name: Name, diff --git a/consumer/consumer.go b/consumer/consumer.go index b33f144..af66a7e 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -44,11 +44,11 @@ type etcdConfig struct { // New creates a new Consumer pointer instance. func New(settings instance.Settings) consumer.Creator { return func(c *consumer.ConsumerEntity, rawConfig *common.Config) (consumer.Consumer, error) { - return newConsumer(c, rawConfig) + return newConsumer(c, settings, rawConfig) } } -func newConsumer(c *consumer.ConsumerEntity, rawConfig *common.Config) (consumer.Consumer, error) { +func newConsumer(c *consumer.ConsumerEntity, settings instance.Settings, rawConfig *common.Config) (consumer.Consumer, error) { consumerType, err := rawConfig.String("consumer-type", -1) if err != nil { return nil, err @@ -56,21 +56,26 @@ func newConsumer(c *consumer.ConsumerEntity, rawConfig *common.Config) (consumer switch consumerType { case "nsq": - return newNSQConsumer(c, rawConfig) + return newNSQConsumer(c, settings, rawConfig) default: return nil, fmt.Errorf("consumer name [%s] is invalid", consumerType) } } -func newNSQConsumer(c *consumer.ConsumerEntity, rawConfig *common.Config) (consumer.Consumer, error) { +func newNSQConsumer(c *consumer.ConsumerEntity, settings instance.Settings, rawConfig *common.Config) (consumer.Consumer, error) { opts := newOptions() cfg := nsq.NewConfig() cfg.UserAgent = fmt.Sprintf("nsq-consumer/%s go-nsq/%s", version.GetDefaultVersion(), nsq.VERSION) cfg.MaxInFlight = opts.MaxInFlight cfg.DialTimeout = 10 * time.Second - queue := make(chan *Message) + channel, _ := settings.RunFlags.GetString("channel") + if rawConfig.HasField(("channel")) { + channel, _ = rawConfig.String("channel", -1) + } + opts.Channel = channel + queue := make(chan *Message) consumer := &NSQConsumer{ done: make(chan struct{}), opts: opts, diff --git a/consumer/options.go b/consumer/options.go index bb223bb..271c327 100644 --- a/consumer/options.go +++ b/consumer/options.go @@ -1,6 +1,14 @@ package consumer -import "time" +import ( + "time" + + "github.com/spf13/pflag" +) + +var ( + channel = pflag.String("channel", "NsqConsumer", "channel name of this nsq consumer") +) // Options options for config type Options struct { @@ -24,7 +32,7 @@ func newOptions() *Options { return &Options{ LogPrefix: "[NsqConsumer] ", LogLevel: "INFO", - Channel: "NsqConsumer", + Channel: *channel, MaxInFlight: 200, OutputDir: "/tmp", SyncInterval: 30 * time.Second, diff --git a/go.mod b/go.mod index 706360d..03003b0 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/elastic/go-ucfg v0.8.3 github.com/google/uuid v1.2.0 // indirect github.com/hashicorp/go-multierror v1.0.0 - github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect + github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 github.com/nsqio/go-nsq v1.0.8 github.com/olivere/elastic/v7 v7.0.27 github.com/pkg/errors v0.9.1 diff --git a/libconsumer/cmd/instance/consumer.go b/libconsumer/cmd/instance/consumer.go index ea3374e..f5d237e 100644 --- a/libconsumer/cmd/instance/consumer.go +++ b/libconsumer/cmd/instance/consumer.go @@ -144,7 +144,7 @@ func (c *Consumer) createConsumer(ct consumer.Creator) (consumer.Consumer, error // handleFlags parses the command line flags func (c *Consumer) handleFlags() error { - // flag.Parse() + // pflag.Parse() return nil } diff --git a/libconsumer/logp/configure/logging.go b/libconsumer/logp/configure/logging.go index d37fcac..06e7baa 100644 --- a/libconsumer/logp/configure/logging.go +++ b/libconsumer/logp/configure/logging.go @@ -1,11 +1,11 @@ package configure import ( - "flag" "fmt" "github.com/JieTrancender/nsq_consumer/libconsumer/common" "github.com/JieTrancender/nsq_consumer/libconsumer/logp" + "github.com/spf13/pflag" ) var ( @@ -17,9 +17,9 @@ var ( type environmentVar logp.Environment func init() { - flag.BoolVar(&verbose, "v", false, "Log at INFO level") - flag.BoolVar(&toStderr, "e", false, "Log to stderr and disable syslog/file output") - flag.Var((*environmentVar)(&environment), "environment", "set environment being ran in") + pflag.BoolVar(&verbose, "verbose", false, "Log at INFO level") + pflag.BoolVar(&toStderr, "toStderr", false, "Log to stderr and disable syslog/file output") + pflag.Var((*environmentVar)(&environment), "environment", "set environment being ran in") } // Logging builds a logp.Config based on the given common.Config and the specified CLI flags. @@ -59,3 +59,7 @@ func (v *environmentVar) Set(in string) error { func (v *environmentVar) String() string { return (*logp.Environment)(v).String() } + +func (v *environmentVar) Type() string { + return (*logp.Environment)(v).Type() +} diff --git a/libconsumer/logp/environment.go b/libconsumer/logp/environment.go index ae74ba7..6e783af 100644 --- a/libconsumer/logp/environment.go +++ b/libconsumer/logp/environment.go @@ -61,6 +61,10 @@ func (v Environment) String() string { } } +func (v Environment) Type() string { + return "Environment" +} + // ParseEnvironment returns the environment type by name. // The parse is case insensitive. // InvalidEnvironment is returned if the environment type is unknown.