Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@ import (
"context"
"flag"
"fmt"
"github.com/prometheus/client_golang/prometheus/promhttp"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/gobwas/glob"
)

const (
DEFAULT_LISTEN_ADDRESS = ":9698"
DEFAULT_METRICS_PATH = "/metrics"
)

var defaultForceHighRes, _ = strconv.ParseBool(os.Getenv("FORCE_HIGH_RES"))

var (
Expand All @@ -36,6 +44,8 @@ var (
includeDimensionsForMetrics = flag.String("include_dimensions_for_metrics", os.Getenv("INCLUDE_DIMENSIONS_FOR_METRICS"), "Only publish the specified dimensions for metrics (semi-colon-separated key values of comma-separated dimensions of METRIC=dim1,dim2;, e.g. 'flink_jobmanager=job_id')")
excludeDimensionsForMetrics = flag.String("exclude_dimensions_for_metrics", os.Getenv("EXCLUDE_DIMENSIONS_FOR_METRICS"), "Never publish the specified dimensions for metrics (semi-colon-separated key values of comma-separated dimensions of METRIC=dim1,dim2;, e.g. 'flink_jobmanager=job,host;zk_up=host,pod;')")
forceHighRes = flag.Bool("force_high_res", defaultForceHighRes, "Publish all metrics with high resolution, even when original metrics don't have the label "+cwHighResLabel)
listenAddress = flag.String("listen_address", os.Getenv("LISTEN_ADDRESS"), fmt.Sprintf("Address to expose metrics (default: %s)", DEFAULT_LISTEN_ADDRESS))
metricsPath = flag.String("metrics_path", os.Getenv("METRICS_PATH"), fmt.Sprintf("Path under which to expose metrics (default: %s)", DEFAULT_METRICS_PATH))
)

// kevValMustParse takes a string and exits with a message if it cannot parse as KEY=VALUE
Expand Down Expand Up @@ -89,6 +99,31 @@ func stringSliceToSet(slice []string) StringSet {
return boolMap
}

func startHttpServer(shutdownWaiter *sync.WaitGroup) *http.Server {
var metricsListenAddress = *listenAddress
if metricsListenAddress == "" {
metricsListenAddress = DEFAULT_LISTEN_ADDRESS
}

var metricsListenPath = *metricsPath
if metricsListenPath == "" {
metricsListenPath = DEFAULT_METRICS_PATH
}

server := &http.Server{Addr: metricsListenAddress}
http.Handle(metricsListenPath, promhttp.Handler())

go func() {
defer shutdownWaiter.Done()
log.Println(fmt.Sprintf("prometheus-to-cloudwatch: Http server listening on %s", metricsListenAddress))
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalln(fmt.Sprintf("prometheus-to-cloudwatch: Http server failed to listen on %s", metricsListenAddress), err)
}
}()

return server
}

func main() {
flag.Parse()

Expand Down Expand Up @@ -227,5 +262,19 @@ func main() {
}
}()


httpServerExitDone := &sync.WaitGroup{}
httpServerExitDone.Add(1)
server := startHttpServer(httpServerExitDone)

bridge.Run(ctx)

shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if err := server.Shutdown(shutdownCtx); err != nil {
log.Fatalln("prometheus-to-cloudwatch: Failed to gracefully stop Http server", err)
}

httpServerExitDone.Wait()
}
62 changes: 62 additions & 0 deletions prometheus_to_cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"crypto/tls"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"io"
"log"
"math"
"mime"
"net/http"
"net/url"
"sort"
"time"

Expand Down Expand Up @@ -129,12 +131,21 @@ type Bridge struct {
includeDimensionsForMetrics []MatcherWithStringSet
excludeDimensionsForMetrics []MatcherWithStringSet
forceHighRes bool
metrics *metrics
}

type metrics struct {
publishesTotal prometheus.Counter
publishErrorsTotal prometheus.Counter
publishDuration prometheus.Histogram
metricsTotal prometheus.Counter
}

// NewBridge initializes and returns a pointer to a Bridge using the
// supplied configuration, or an error if there is a problem with the configuration
func NewBridge(c *Config) (*Bridge, error) {
b := &Bridge{}
b.metrics = newMetrics(c, prometheus.DefaultRegisterer)

if c.CloudWatchNamespace == "" {
return nil, errors.New("CloudWatchNamespace required")
Expand Down Expand Up @@ -215,9 +226,12 @@ func (b *Bridge) Run(ctx context.Context) {
count, err := b.publishMetricsToCloudWatch(metricFamilies)
if err != nil {
log.Println("prometheus-to-cloudwatch: error publishing to CloudWatch:", err)
b.metrics.publishErrorsTotal.Inc()
}

log.Println(fmt.Sprintf("prometheus-to-cloudwatch: published %d metrics to CloudWatch", count))
b.metrics.metricsTotal.Add(float64(count))
b.metrics.publishesTotal.Inc()

case <-ctx.Done():
log.Println("prometheus-to-cloudwatch: stopping")
Expand All @@ -226,11 +240,59 @@ func (b *Bridge) Run(ctx context.Context) {
}
}

func newMetrics(c *Config, r prometheus.Registerer) *metrics {
m := &metrics{}

prometheusScrapeUrl := c.PrometheusScrapeUrl
if unescape, err := url.QueryUnescape(c.PrometheusScrapeUrl); err == nil {
prometheusScrapeUrl = unescape
}
labels := map[string]string{
"cloudwatchRegion": c.CloudWatchRegion,
"prometheusScrapeUrl": prometheusScrapeUrl,
}

m.publishesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cloudwatch_bridge_publishes_total",
Help: "Number of publishes to cloudwatch.",
ConstLabels: labels,
})
m.publishErrorsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cloudwatch_bridge_publish_errors_total",
Help: "Number of cloudwatch publish errors.",
ConstLabels: labels,
})
m.publishDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "cloudwatch_bridge_publish_duration_seconds",
Help: "Duration of cloudwatch publishes",
ConstLabels: labels,
})

m.metricsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cloudwatch_bridge_metrics_total",
Help: "Number of mmetrics published to cloudwatch.",
ConstLabels: labels,
})

if r != nil {
r.MustRegister(
m.publishesTotal,
m.publishErrorsTotal,
m.publishDuration,
m.metricsTotal,
)
}
return m
}

// NOTE: The CloudWatch API has the following limitations:
// - Max 40kb request size
// - Single namespace per request
// - Max 10 dimensions per metric
func (b *Bridge) publishMetricsToCloudWatch(mfs []*dto.MetricFamily) (count int, e error) {
start := time.Now()
defer func() { b.metrics.publishDuration.Observe(time.Since(start).Seconds()) }()

vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{Timestamp: model.Now()}, mfs...)

if err != nil {
Expand Down