diff --git a/main.go b/main.go index 6958e8f..0ba7c40 100644 --- a/main.go +++ b/main.go @@ -4,17 +4,25 @@ import ( "context" "flag" "fmt" + "github.com/prometheus/client_golang/prometheus/promhttp" "log" + "net/http" "os" "os/signal" "strconv" "strings" + "sync" "syscall" "time" "github.com/gobwas/glob" ) +const ( + DEFAULT_LISTEN_ADDRESS = ":9698" + DEFAULT_METRICS_PATH = "/metrics" +) + var defaultForceHighRes, _ = strconv.ParseBool(os.Getenv("FORCE_HIGH_RES")) var ( @@ -36,6 +44,8 @@ var ( includeDimensionsForMetrics = flag.String("include_dimensions_for_metrics", os.Getenv("INCLUDE_DIMENSIONS_FOR_METRICS"), "Only publish the specified dimensions for metrics (semi-colon-separated key values of comma-separated dimensions of METRIC=dim1,dim2;, e.g. 'flink_jobmanager=job_id')") excludeDimensionsForMetrics = flag.String("exclude_dimensions_for_metrics", os.Getenv("EXCLUDE_DIMENSIONS_FOR_METRICS"), "Never publish the specified dimensions for metrics (semi-colon-separated key values of comma-separated dimensions of METRIC=dim1,dim2;, e.g. 'flink_jobmanager=job,host;zk_up=host,pod;')") forceHighRes = flag.Bool("force_high_res", defaultForceHighRes, "Publish all metrics with high resolution, even when original metrics don't have the label "+cwHighResLabel) + listenAddress = flag.String("listen_address", os.Getenv("LISTEN_ADDRESS"), fmt.Sprintf("Address to expose metrics (default: %s)", DEFAULT_LISTEN_ADDRESS)) + metricsPath = flag.String("metrics_path", os.Getenv("METRICS_PATH"), fmt.Sprintf("Path under which to expose metrics (default: %s)", DEFAULT_METRICS_PATH)) ) // kevValMustParse takes a string and exits with a message if it cannot parse as KEY=VALUE @@ -89,6 +99,31 @@ func stringSliceToSet(slice []string) StringSet { return boolMap } +func startHttpServer(shutdownWaiter *sync.WaitGroup) *http.Server { + var metricsListenAddress = *listenAddress + if metricsListenAddress == "" { + metricsListenAddress = DEFAULT_LISTEN_ADDRESS + } + + var metricsListenPath = *metricsPath + if metricsListenPath == "" { + metricsListenPath = DEFAULT_METRICS_PATH + } + + server := &http.Server{Addr: metricsListenAddress} + http.Handle(metricsListenPath, promhttp.Handler()) + + go func() { + defer shutdownWaiter.Done() + log.Println(fmt.Sprintf("prometheus-to-cloudwatch: Http server listening on %s", metricsListenAddress)) + if err := server.ListenAndServe(); err != http.ErrServerClosed { + log.Fatalln(fmt.Sprintf("prometheus-to-cloudwatch: Http server failed to listen on %s", metricsListenAddress), err) + } + }() + + return server +} + func main() { flag.Parse() @@ -227,5 +262,19 @@ func main() { } }() + + httpServerExitDone := &sync.WaitGroup{} + httpServerExitDone.Add(1) + server := startHttpServer(httpServerExitDone) + bridge.Run(ctx) + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := server.Shutdown(shutdownCtx); err != nil { + log.Fatalln("prometheus-to-cloudwatch: Failed to gracefully stop Http server", err) + } + + httpServerExitDone.Wait() } diff --git a/prometheus_to_cloudwatch.go b/prometheus_to_cloudwatch.go index 6db0e25..0088fa9 100644 --- a/prometheus_to_cloudwatch.go +++ b/prometheus_to_cloudwatch.go @@ -7,11 +7,13 @@ import ( "crypto/tls" "errors" "fmt" + "github.com/prometheus/client_golang/prometheus" "io" "log" "math" "mime" "net/http" + "net/url" "sort" "time" @@ -129,12 +131,21 @@ type Bridge struct { includeDimensionsForMetrics []MatcherWithStringSet excludeDimensionsForMetrics []MatcherWithStringSet forceHighRes bool + metrics *metrics +} + +type metrics struct { + publishesTotal prometheus.Counter + publishErrorsTotal prometheus.Counter + publishDuration prometheus.Histogram + metricsTotal prometheus.Counter } // NewBridge initializes and returns a pointer to a Bridge using the // supplied configuration, or an error if there is a problem with the configuration func NewBridge(c *Config) (*Bridge, error) { b := &Bridge{} + b.metrics = newMetrics(c, prometheus.DefaultRegisterer) if c.CloudWatchNamespace == "" { return nil, errors.New("CloudWatchNamespace required") @@ -215,9 +226,12 @@ func (b *Bridge) Run(ctx context.Context) { count, err := b.publishMetricsToCloudWatch(metricFamilies) if err != nil { log.Println("prometheus-to-cloudwatch: error publishing to CloudWatch:", err) + b.metrics.publishErrorsTotal.Inc() } log.Println(fmt.Sprintf("prometheus-to-cloudwatch: published %d metrics to CloudWatch", count)) + b.metrics.metricsTotal.Add(float64(count)) + b.metrics.publishesTotal.Inc() case <-ctx.Done(): log.Println("prometheus-to-cloudwatch: stopping") @@ -226,11 +240,59 @@ func (b *Bridge) Run(ctx context.Context) { } } +func newMetrics(c *Config, r prometheus.Registerer) *metrics { + m := &metrics{} + + prometheusScrapeUrl := c.PrometheusScrapeUrl + if unescape, err := url.QueryUnescape(c.PrometheusScrapeUrl); err == nil { + prometheusScrapeUrl = unescape + } + labels := map[string]string{ + "cloudwatchRegion": c.CloudWatchRegion, + "prometheusScrapeUrl": prometheusScrapeUrl, + } + + m.publishesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cloudwatch_bridge_publishes_total", + Help: "Number of publishes to cloudwatch.", + ConstLabels: labels, + }) + m.publishErrorsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cloudwatch_bridge_publish_errors_total", + Help: "Number of cloudwatch publish errors.", + ConstLabels: labels, + }) + m.publishDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "cloudwatch_bridge_publish_duration_seconds", + Help: "Duration of cloudwatch publishes", + ConstLabels: labels, + }) + + m.metricsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cloudwatch_bridge_metrics_total", + Help: "Number of mmetrics published to cloudwatch.", + ConstLabels: labels, + }) + + if r != nil { + r.MustRegister( + m.publishesTotal, + m.publishErrorsTotal, + m.publishDuration, + m.metricsTotal, + ) + } + return m +} + // NOTE: The CloudWatch API has the following limitations: // - Max 40kb request size // - Single namespace per request // - Max 10 dimensions per metric func (b *Bridge) publishMetricsToCloudWatch(mfs []*dto.MetricFamily) (count int, e error) { + start := time.Now() + defer func() { b.metrics.publishDuration.Observe(time.Since(start).Seconds()) }() + vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{Timestamp: model.Now()}, mfs...) if err != nil {