From e9b159f00b028fae01f811de9cc12abb0cbc9e7d Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Fri, 13 Oct 2023 16:42:05 +0100 Subject: [PATCH 01/11] WIP: Pipeline tests --- cmd/internal/pipelinetests/httpsink.go | 58 ++++++++ cmd/internal/pipelinetests/pipeline_test.go | 134 ++++++++++++++++++ cmd/internal/pipelinetests/promdata.go | 40 ++++++ cmd/internal/pipelinetests/runtime_context.go | 51 +++++++ .../pipelinetests/testdata/empty.river | 0 .../pipelinetests/testdata/invalid.river | 3 + .../testdata/scrape_and_write.river | 26 ++++ cmd/internal/pipelinetests/testprom.go | 20 +++ .../prometheus/remotewrite/remote_write.go | 9 ++ internal/flowmode/cmd_run.go | 10 +- internal/flowmode/flowmode.go | 13 +- internal/static/metrics/wal/wal.go | 2 + 12 files changed, 357 insertions(+), 9 deletions(-) create mode 100644 cmd/internal/pipelinetests/httpsink.go create mode 100644 cmd/internal/pipelinetests/pipeline_test.go create mode 100644 cmd/internal/pipelinetests/promdata.go create mode 100644 cmd/internal/pipelinetests/runtime_context.go create mode 100644 cmd/internal/pipelinetests/testdata/empty.river create mode 100644 cmd/internal/pipelinetests/testdata/invalid.river create mode 100644 cmd/internal/pipelinetests/testdata/scrape_and_write.river create mode 100644 cmd/internal/pipelinetests/testprom.go diff --git a/cmd/internal/pipelinetests/httpsink.go b/cmd/internal/pipelinetests/httpsink.go new file mode 100644 index 000000000000..d65552d5fdda --- /dev/null +++ b/cmd/internal/pipelinetests/httpsink.go @@ -0,0 +1,58 @@ +package pipelinetests + +import ( + "context" + "fmt" + "log" + "net/http" + "time" +) + +type RequestsSink interface { + AllRequestsReceived() []http.Request +} + +type httpSinkHandler struct { + requests []http.Request +} + +func (h *httpSinkHandler) AllRequestsReceived() []http.Request { + return h.requests +} + +func newHttpSink(ctx context.Context, port int) RequestsSink { + sink := &httpSinkHandler{} + server := &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: sink, + } + + go func() { + log.Printf("http sink listening on port %d", port) + err := server.ListenAndServe() + if err != nil { + log.Printf("http sink stopped with error: %s", err) + } + }() + + go func() { + <-ctx.Done() + log.Println("shutting down http sink") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := server.Shutdown(ctx) + if err != nil { + log.Printf("error shutting down http sink: %s", err) + } + }() + return sink +} + +func (h *httpSinkHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + log.Printf("http sink got request: %v", request) + h.requests = append(h.requests, *request) + _, err := writer.Write([]byte("got it, thanks!")) + if err != nil { + log.Printf("http sink warning: error writing response: %v", err) + } +} diff --git a/cmd/internal/pipelinetests/pipeline_test.go b/cmd/internal/pipelinetests/pipeline_test.go new file mode 100644 index 000000000000..24f80caa25b3 --- /dev/null +++ b/cmd/internal/pipelinetests/pipeline_test.go @@ -0,0 +1,134 @@ +package pipelinetests + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/grafana/agent/cmd/internal/flowmode" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + defaultTimeout = 1 * time.Minute + assertionCheckInterval = 100 * time.Millisecond + shutdownTimeout = 5 * time.Second +) + +type pipelineTest struct { + configFile string + eventuallyAssert func(t *assert.CollectT, context *runtimeContext) + cmdErrContains string + requireCleanShutdown bool +} + +func TestPipeline_WithEmptyConfig(t *testing.T) { + runTestCase(t, pipelineTest{ + configFile: "testdata/empty.river", + requireCleanShutdown: true, + }) +} + +func TestPipeline_FileNotExists(t *testing.T) { + runTestCase(t, pipelineTest{ + configFile: "does_not_exist.river", + cmdErrContains: "does_not_exist.river: no such file or directory", + requireCleanShutdown: true, + }) +} + +func TestPipeline_FileInvalid(t *testing.T) { + runTestCase(t, pipelineTest{ + configFile: "testdata/invalid.river", + cmdErrContains: "could not perform the initial load successfully", + requireCleanShutdown: true, + }) +} + +func TestPipeline_Prometheus_SelfScrapeAndWrite(topT *testing.T) { + runTestCase(topT, pipelineTest{ + configFile: "testdata/scrape_and_write.river", + eventuallyAssert: func(t *assert.CollectT, context *runtimeContext) { + writes := context.promData.getPromWrites() + assert.NotEmptyf(t, writes, "must receive at least one prom write request") + assert.Greater(t, context.promData.sampleValueForSeries("agent_prometheus_forwarded_samples_total"), float64(1000)) + assert.Greater(t, context.promData.sampleValueForSeries("agent_wal_samples_appended_total"), float64(1000)) + assert.Equal(t, context.promData.sampleValueForSeries("agent_prometheus_scrape_targets_gauge"), float64(1)) + }, + }) +} + +func runTestCase(t *testing.T, testCase pipelineTest) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + + cleanUp := setUpGlobalRegistryForTesting(prometheus.NewRegistry()) + defer cleanUp() + + agentRuntimeCtx, cleanUpAgent := newAgentRuntimeContext(t, ctx) + defer cleanUpAgent() + + cmd := flowmode.Command() + cmd.SetArgs([]string{ + "run", + testCase.configFile, + "--server.http.listen-addr", + fmt.Sprintf("127.0.0.1:%d", agentRuntimeCtx.agentPort), + "--storage.path", + t.TempDir(), + }) + + doneErr := make(chan error) + go func() { doneErr <- cmd.ExecuteContext(ctx) }() + + assertionsDone := make(chan struct{}) + go func() { + if testCase.eventuallyAssert != nil { + require.EventuallyWithT(t, func(t *assert.CollectT) { + testCase.eventuallyAssert(t, agentRuntimeCtx) + }, defaultTimeout, assertionCheckInterval) + } + assertionsDone <- struct{}{} + }() + + select { + case <-ctx.Done(): + t.Fatalf("test case failed to complete within deadline") + case <-assertionsDone: + case err := <-doneErr: + verifyDoneError(t, testCase, err) + cancel() + return + } + + t.Log("assertion checks done, shutting down agent") + cancel() + select { + case <-time.After(shutdownTimeout): + if testCase.requireCleanShutdown { + t.Fatalf("agent failed to shut down within deadline") + } else { + t.Log("agent failed to shut down within deadline") + } + case err := <-doneErr: + verifyDoneError(t, testCase, err) + } +} + +func verifyDoneError(t *testing.T, testCase pipelineTest, err error) { + if testCase.cmdErrContains != "" { + require.ErrorContains(t, err, testCase.cmdErrContains, "command must return error containing the string specified in test case") + } else { + require.NoError(t, err) + } +} + +func setUpGlobalRegistryForTesting(registry *prometheus.Registry) func() { + prevRegisterer, prevGatherer := prometheus.DefaultRegisterer, prometheus.DefaultGatherer + prometheus.DefaultRegisterer, prometheus.DefaultGatherer = registry, registry + return func() { + prometheus.DefaultRegisterer, prometheus.DefaultGatherer = prevRegisterer, prevGatherer + } +} diff --git a/cmd/internal/pipelinetests/promdata.go b/cmd/internal/pipelinetests/promdata.go new file mode 100644 index 000000000000..d82a90a87c81 --- /dev/null +++ b/cmd/internal/pipelinetests/promdata.go @@ -0,0 +1,40 @@ +package pipelinetests + +import ( + "math" + "sync" + + "github.com/prometheus/prometheus/prompb" + "golang.org/x/exp/slices" +) + +type promData struct { + mut sync.Mutex + promWrites []*prompb.WriteRequest +} + +func (r *promData) appendPromWrite(req *prompb.WriteRequest) { + r.mut.Lock() + defer r.mut.Unlock() + r.promWrites = append(r.promWrites, req) +} + +func (r *promData) getPromWrites() []*prompb.WriteRequest { + r.mut.Lock() + defer r.mut.Unlock() + return slices.Clone(r.promWrites) +} + +func (r *promData) sampleValueForSeries(name string) float64 { + r.mut.Lock() + defer r.mut.Unlock() + // start from the end to find the most recent Timeseries + for i := len(r.promWrites) - 1; i >= 0; i-- { + for _, ts := range r.promWrites[i].Timeseries { + if ts.Labels[0].Name == "__name__" && ts.Labels[0].Value == name { + return ts.Samples[len(ts.Samples)-1].Value + } + } + } + return math.NaN() +} diff --git a/cmd/internal/pipelinetests/runtime_context.go b/cmd/internal/pipelinetests/runtime_context.go new file mode 100644 index 000000000000..d90e7cfee83b --- /dev/null +++ b/cmd/internal/pipelinetests/runtime_context.go @@ -0,0 +1,51 @@ +package pipelinetests + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/phayes/freeport" + "github.com/stretchr/testify/require" +) + +type runtimeContext struct { + agentPort int + sink RequestsSink + promData *promData +} + +func newAgentRuntimeContext(t *testing.T, ctx context.Context) (*runtimeContext, func()) { + sinkPort, err := freeport.GetFreePort() + require.NoError(t, err) + sink := newHttpSink(ctx, sinkPort) + cleanSinkVar := setEnvVariable(t, "HTTP_SINK_URL", fmt.Sprintf("http://127.0.0.1:%d", sinkPort)) + + agentPort, err := freeport.GetFreePort() + require.NoError(t, err) + cleanAgentPortVar := setEnvVariable(t, "AGENT_SELF_HTTP_PORT", fmt.Sprintf("%d", agentPort)) + + agentRuntimeCtx := &runtimeContext{ + agentPort: agentPort, + sink: sink, + promData: &promData{}, + } + + promServer := newTestPromServer(agentRuntimeCtx.promData.appendPromWrite) + cleanPromServerVar := setEnvVariable(t, "PROM_SERVER_URL", fmt.Sprintf("%s/api/v1/write", promServer.URL)) + + return agentRuntimeCtx, func() { + promServer.Close() + cleanSinkVar() + cleanAgentPortVar() + cleanPromServerVar() + } +} + +func setEnvVariable(t *testing.T, key, value string) func() { + require.NoError(t, os.Setenv(key, value)) + return func() { + require.NoError(t, os.Unsetenv(key)) + } +} diff --git a/cmd/internal/pipelinetests/testdata/empty.river b/cmd/internal/pipelinetests/testdata/empty.river new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/cmd/internal/pipelinetests/testdata/invalid.river b/cmd/internal/pipelinetests/testdata/invalid.river new file mode 100644 index 000000000000..cb17c232e6b0 --- /dev/null +++ b/cmd/internal/pipelinetests/testdata/invalid.river @@ -0,0 +1,3 @@ +not.exists "wrong" { + that_is = "broken" +} diff --git a/cmd/internal/pipelinetests/testdata/scrape_and_write.river b/cmd/internal/pipelinetests/testdata/scrape_and_write.river new file mode 100644 index 000000000000..06360928e409 --- /dev/null +++ b/cmd/internal/pipelinetests/testdata/scrape_and_write.river @@ -0,0 +1,26 @@ +logging { + level = "debug" + format = "logfmt" +} + +prometheus.scrape "agent_self" { + targets = [ + {"__address__" = "127.0.0.1:"+env("AGENT_SELF_HTTP_PORT"), "job" = "agent"}, + ] + forward_to = [prometheus.remote_write.default.receiver] + + // Scrape settings for the impatient + scrape_interval = "250ms" + scrape_timeout = "250ms" +} + +prometheus.remote_write "default" { + endpoint { + url = env("PROM_SERVER_URL") + + // Queue settings for the impatient + queue_config { + batch_send_deadline = "250ms" + } + } +} diff --git a/cmd/internal/pipelinetests/testprom.go b/cmd/internal/pipelinetests/testprom.go new file mode 100644 index 000000000000..9e845ecd164d --- /dev/null +++ b/cmd/internal/pipelinetests/testprom.go @@ -0,0 +1,20 @@ +package pipelinetests + +import ( + "net/http" + "net/http/httptest" + + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" +) + +func newTestPromServer(onWrite func(*prompb.WriteRequest)) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + req, err := remote.DecodeWriteRequest(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + onWrite(req) + })) +} diff --git a/internal/component/prometheus/remotewrite/remote_write.go b/internal/component/prometheus/remotewrite/remote_write.go index b5b00b78b86e..b757fb9ea4f6 100644 --- a/internal/component/prometheus/remotewrite/remote_write.go +++ b/internal/component/prometheus/remotewrite/remote_write.go @@ -129,6 +129,9 @@ func New(o component.Options, c Arguments) (*Component, error) { if localID == 0 { ls.GetOrAddLink(res.opts.ID, uint64(newRef), l) } + //TODO(thampiotr): We can expose this as `notify_on_append` option - it can be used to reduce latency in + // the pipeline if users want to, with a trade-off being more CPU and I/O. + remoteStore.Notify() return globalRef, nextErr }), prometheus.WithMetadataHook(func(globalRef storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) { @@ -141,6 +144,9 @@ func New(o component.Options, c Arguments) (*Component, error) { if localID == 0 { ls.GetOrAddLink(res.opts.ID, uint64(newRef), l) } + //TODO(thampiotr): We can expose this as `notify_on_append` option - it can be used to reduce latency in + // the pipeline if users want to, with a trade-off being more CPU and I/O. + remoteStore.Notify() return globalRef, nextErr }), prometheus.WithExemplarHook(func(globalRef storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) { @@ -153,6 +159,9 @@ func New(o component.Options, c Arguments) (*Component, error) { if localID == 0 { ls.GetOrAddLink(res.opts.ID, uint64(newRef), l) } + //TODO(thampiotr): We can expose this as `notify_on_append` option - it can be used to reduce latency in + // the pipeline if users want to, with a trade-off being more CPU and I/O. + remoteStore.Notify() return globalRef, nextErr }), ) diff --git a/internal/flowmode/cmd_run.go b/internal/flowmode/cmd_run.go index 3bbd9889a32c..4ffca3032f62 100644 --- a/internal/flowmode/cmd_run.go +++ b/internal/flowmode/cmd_run.go @@ -95,7 +95,7 @@ depending on the nature of the reload error. SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { - return r.Run(args[0]) + return r.Run(cmd.Context(), args[0]) }, } @@ -162,11 +162,11 @@ type flowRun struct { configExtraArgs string } -func (fr *flowRun) Run(configPath string) error { +func (fr *flowRun) Run(ctx context.Context, configPath string) error { var wg sync.WaitGroup defer wg.Wait() - ctx, cancel := interruptContext() + ctx, cancel := interruptContext(ctx) defer cancel() if configPath == "" { @@ -451,8 +451,8 @@ func loadFlowSource(path string, converterSourceFormat string, converterBypassEr return flow.ParseSource(path, bb) } -func interruptContext() (context.Context, context.CancelFunc) { - ctx, cancel := context.WithCancel(context.Background()) +func interruptContext(parent context.Context) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(parent) go func() { defer cancel() diff --git a/internal/flowmode/flowmode.go b/internal/flowmode/flowmode.go index d5edcb48b41c..31fd0e951f61 100644 --- a/internal/flowmode/flowmode.go +++ b/internal/flowmode/flowmode.go @@ -12,6 +12,14 @@ import ( // Run is the entrypoint to Flow mode. It is expected to be called // directly from the main function. func Run() { + cmd := Command() + if err := cmd.Execute(); err != nil { + os.Exit(1) + } +} + +// Command returns the root command for Flow mode. +func Command() *cobra.Command { var cmd = &cobra.Command{ Use: fmt.Sprintf("%s [global options] ", os.Args[0]), Short: "Grafana Agent Flow", @@ -29,8 +37,5 @@ func Run() { runCommand(), toolsCommand(), ) - - if err := cmd.Execute(); err != nil { - os.Exit(1) - } + return cmd } diff --git a/internal/static/metrics/wal/wal.go b/internal/static/metrics/wal/wal.go index 513a43df5da1..65ce869f5845 100644 --- a/internal/static/metrics/wal/wal.go +++ b/internal/static/metrics/wal/wal.go @@ -208,7 +208,9 @@ func (w *Storage) replayWAL() error { return ErrWALClosed } + startTime := time.Now() level.Info(w.logger).Log("msg", "replaying WAL, this may take a while", "dir", w.wal.Dir()) + defer level.Info(w.logger).Log("msg", "finished replaying WAL", "dir", w.wal.Dir(), "duration", time.Since(startTime)) dir, startFrom, err := wlog.LastCheckpoint(w.wal.Dir()) if err != nil && err != record.ErrNotFound { return fmt.Errorf("find last checkpoint: %w", err) From 72d7957cd3c367f1427fb868fdc3de1b53b78029 Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Wed, 18 Oct 2023 17:39:19 +0100 Subject: [PATCH 02/11] wip --- cmd/internal/pipelinetests/httpsink.go | 58 ------------------- cmd/internal/pipelinetests/pipeline_test.go | 33 +++++++++-- cmd/internal/pipelinetests/promdata.go | 19 +++++- cmd/internal/pipelinetests/runtime_context.go | 6 +- 4 files changed, 47 insertions(+), 69 deletions(-) delete mode 100644 cmd/internal/pipelinetests/httpsink.go diff --git a/cmd/internal/pipelinetests/httpsink.go b/cmd/internal/pipelinetests/httpsink.go deleted file mode 100644 index d65552d5fdda..000000000000 --- a/cmd/internal/pipelinetests/httpsink.go +++ /dev/null @@ -1,58 +0,0 @@ -package pipelinetests - -import ( - "context" - "fmt" - "log" - "net/http" - "time" -) - -type RequestsSink interface { - AllRequestsReceived() []http.Request -} - -type httpSinkHandler struct { - requests []http.Request -} - -func (h *httpSinkHandler) AllRequestsReceived() []http.Request { - return h.requests -} - -func newHttpSink(ctx context.Context, port int) RequestsSink { - sink := &httpSinkHandler{} - server := &http.Server{ - Addr: fmt.Sprintf(":%d", port), - Handler: sink, - } - - go func() { - log.Printf("http sink listening on port %d", port) - err := server.ListenAndServe() - if err != nil { - log.Printf("http sink stopped with error: %s", err) - } - }() - - go func() { - <-ctx.Done() - log.Println("shutting down http sink") - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - err := server.Shutdown(ctx) - if err != nil { - log.Printf("error shutting down http sink: %s", err) - } - }() - return sink -} - -func (h *httpSinkHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { - log.Printf("http sink got request: %v", request) - h.requests = append(h.requests, *request) - _, err := writer.Write([]byte("got it, thanks!")) - if err != nil { - log.Printf("http sink warning: error writing response: %v", err) - } -} diff --git a/cmd/internal/pipelinetests/pipeline_test.go b/cmd/internal/pipelinetests/pipeline_test.go index 24f80caa25b3..4e3d7f501703 100644 --- a/cmd/internal/pipelinetests/pipeline_test.go +++ b/cmd/internal/pipelinetests/pipeline_test.go @@ -54,9 +54,34 @@ func TestPipeline_Prometheus_SelfScrapeAndWrite(topT *testing.T) { eventuallyAssert: func(t *assert.CollectT, context *runtimeContext) { writes := context.promData.getPromWrites() assert.NotEmptyf(t, writes, "must receive at least one prom write request") - assert.Greater(t, context.promData.sampleValueForSeries("agent_prometheus_forwarded_samples_total"), float64(1000)) - assert.Greater(t, context.promData.sampleValueForSeries("agent_wal_samples_appended_total"), float64(1000)) - assert.Equal(t, context.promData.sampleValueForSeries("agent_prometheus_scrape_targets_gauge"), float64(1)) + // One target expected + assert.Equal(t, float64(1), context.promData.findLastSampleMatching("agent_prometheus_scrape_targets_gauge")) + // Fanned out at least one target + assert.GreaterOrEqual(t, context.promData.findLastSampleMatching( + "agent_prometheus_fanout_latency_count", + "component_id", + "prometheus.scrape.agent_self", + ), float64(1)) + + // Received at least `count` samples + count := 1000 + assert.Greater(t, context.promData.findLastSampleMatching( + "agent_prometheus_forwarded_samples_total", + "component_id", + "prometheus.scrape.agent_self", + ), float64(count)) + assert.Greater(t, context.promData.findLastSampleMatching( + "agent_wal_samples_appended_total", + "component_id", + "prometheus.remote_write.default", + ), float64(count)) + + // At least 100 active series should be present + assert.Greater(t, context.promData.findLastSampleMatching( + "agent_wal_storage_active_series", + "component_id", + "prometheus.remote_write.default", + ), float64(100)) }, }) } @@ -67,7 +92,7 @@ func runTestCase(t *testing.T, testCase pipelineTest) { cleanUp := setUpGlobalRegistryForTesting(prometheus.NewRegistry()) defer cleanUp() - agentRuntimeCtx, cleanUpAgent := newAgentRuntimeContext(t, ctx) + agentRuntimeCtx, cleanUpAgent := newAgentRuntimeContext(t) defer cleanUpAgent() cmd := flowmode.Command() diff --git a/cmd/internal/pipelinetests/promdata.go b/cmd/internal/pipelinetests/promdata.go index d82a90a87c81..ca84b11d1751 100644 --- a/cmd/internal/pipelinetests/promdata.go +++ b/cmd/internal/pipelinetests/promdata.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/prometheus/prometheus/prompb" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) @@ -25,13 +26,27 @@ func (r *promData) getPromWrites() []*prompb.WriteRequest { return slices.Clone(r.promWrites) } -func (r *promData) sampleValueForSeries(name string) float64 { +func (r *promData) findLastSampleMatching(name string, labelsKV ...string) float64 { + labelsMap := make(map[string]string) + for i := 0; i < len(labelsKV); i += 2 { + labelsMap[labelsKV[i]] = labelsKV[i+1] + } + labelsMap["__name__"] = name r.mut.Lock() defer r.mut.Unlock() // start from the end to find the most recent Timeseries for i := len(r.promWrites) - 1; i >= 0; i-- { for _, ts := range r.promWrites[i].Timeseries { - if ts.Labels[0].Name == "__name__" && ts.Labels[0].Value == name { + // toMatch is a copy of labelsMap that we will remove labels from as we find matches + toMatch := maps.Clone(labelsMap) + for _, label := range ts.Labels { + val, ok := toMatch[label.Name] + if ok && val == label.Value { + delete(toMatch, label.Name) + } + } + foundMatch := len(toMatch) == 0 + if foundMatch && len(ts.Samples) > 0 { return ts.Samples[len(ts.Samples)-1].Value } } diff --git a/cmd/internal/pipelinetests/runtime_context.go b/cmd/internal/pipelinetests/runtime_context.go index d90e7cfee83b..cb16401a4382 100644 --- a/cmd/internal/pipelinetests/runtime_context.go +++ b/cmd/internal/pipelinetests/runtime_context.go @@ -1,7 +1,6 @@ package pipelinetests import ( - "context" "fmt" "os" "testing" @@ -12,14 +11,12 @@ import ( type runtimeContext struct { agentPort int - sink RequestsSink promData *promData } -func newAgentRuntimeContext(t *testing.T, ctx context.Context) (*runtimeContext, func()) { +func newAgentRuntimeContext(t *testing.T) (*runtimeContext, func()) { sinkPort, err := freeport.GetFreePort() require.NoError(t, err) - sink := newHttpSink(ctx, sinkPort) cleanSinkVar := setEnvVariable(t, "HTTP_SINK_URL", fmt.Sprintf("http://127.0.0.1:%d", sinkPort)) agentPort, err := freeport.GetFreePort() @@ -28,7 +25,6 @@ func newAgentRuntimeContext(t *testing.T, ctx context.Context) (*runtimeContext, agentRuntimeCtx := &runtimeContext{ agentPort: agentPort, - sink: sink, promData: &promData{}, } From 6f41e067e2007bf2fb55f64a530ccdd45bdb020b Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Wed, 18 Oct 2023 17:45:09 +0100 Subject: [PATCH 03/11] wip --- cmd/internal/pipelinetests/pipeline_test.go | 13 ++++++------- cmd/internal/pipelinetests/promdata.go | 5 ++--- cmd/internal/pipelinetests/runtime_context.go | 15 +++++---------- 3 files changed, 13 insertions(+), 20 deletions(-) diff --git a/cmd/internal/pipelinetests/pipeline_test.go b/cmd/internal/pipelinetests/pipeline_test.go index 4e3d7f501703..38a8e3c1d295 100644 --- a/cmd/internal/pipelinetests/pipeline_test.go +++ b/cmd/internal/pipelinetests/pipeline_test.go @@ -52,12 +52,11 @@ func TestPipeline_Prometheus_SelfScrapeAndWrite(topT *testing.T) { runTestCase(topT, pipelineTest{ configFile: "testdata/scrape_and_write.river", eventuallyAssert: func(t *assert.CollectT, context *runtimeContext) { - writes := context.promData.getPromWrites() - assert.NotEmptyf(t, writes, "must receive at least one prom write request") + assert.NotEmptyf(t, context.dataSentToProm.writesCount(), "must receive at least one prom write request") // One target expected - assert.Equal(t, float64(1), context.promData.findLastSampleMatching("agent_prometheus_scrape_targets_gauge")) + assert.Equal(t, float64(1), context.dataSentToProm.findLastSampleMatching("agent_prometheus_scrape_targets_gauge")) // Fanned out at least one target - assert.GreaterOrEqual(t, context.promData.findLastSampleMatching( + assert.GreaterOrEqual(t, context.dataSentToProm.findLastSampleMatching( "agent_prometheus_fanout_latency_count", "component_id", "prometheus.scrape.agent_self", @@ -65,19 +64,19 @@ func TestPipeline_Prometheus_SelfScrapeAndWrite(topT *testing.T) { // Received at least `count` samples count := 1000 - assert.Greater(t, context.promData.findLastSampleMatching( + assert.Greater(t, context.dataSentToProm.findLastSampleMatching( "agent_prometheus_forwarded_samples_total", "component_id", "prometheus.scrape.agent_self", ), float64(count)) - assert.Greater(t, context.promData.findLastSampleMatching( + assert.Greater(t, context.dataSentToProm.findLastSampleMatching( "agent_wal_samples_appended_total", "component_id", "prometheus.remote_write.default", ), float64(count)) // At least 100 active series should be present - assert.Greater(t, context.promData.findLastSampleMatching( + assert.Greater(t, context.dataSentToProm.findLastSampleMatching( "agent_wal_storage_active_series", "component_id", "prometheus.remote_write.default", diff --git a/cmd/internal/pipelinetests/promdata.go b/cmd/internal/pipelinetests/promdata.go index ca84b11d1751..98d771c94fbd 100644 --- a/cmd/internal/pipelinetests/promdata.go +++ b/cmd/internal/pipelinetests/promdata.go @@ -6,7 +6,6 @@ import ( "github.com/prometheus/prometheus/prompb" "golang.org/x/exp/maps" - "golang.org/x/exp/slices" ) type promData struct { @@ -20,10 +19,10 @@ func (r *promData) appendPromWrite(req *prompb.WriteRequest) { r.promWrites = append(r.promWrites, req) } -func (r *promData) getPromWrites() []*prompb.WriteRequest { +func (r *promData) writesCount() int { r.mut.Lock() defer r.mut.Unlock() - return slices.Clone(r.promWrites) + return len(r.promWrites) } func (r *promData) findLastSampleMatching(name string, labelsKV ...string) float64 { diff --git a/cmd/internal/pipelinetests/runtime_context.go b/cmd/internal/pipelinetests/runtime_context.go index cb16401a4382..837a5abfbbcc 100644 --- a/cmd/internal/pipelinetests/runtime_context.go +++ b/cmd/internal/pipelinetests/runtime_context.go @@ -10,30 +10,25 @@ import ( ) type runtimeContext struct { - agentPort int - promData *promData + agentPort int + dataSentToProm *promData } func newAgentRuntimeContext(t *testing.T) (*runtimeContext, func()) { - sinkPort, err := freeport.GetFreePort() - require.NoError(t, err) - cleanSinkVar := setEnvVariable(t, "HTTP_SINK_URL", fmt.Sprintf("http://127.0.0.1:%d", sinkPort)) - agentPort, err := freeport.GetFreePort() require.NoError(t, err) cleanAgentPortVar := setEnvVariable(t, "AGENT_SELF_HTTP_PORT", fmt.Sprintf("%d", agentPort)) agentRuntimeCtx := &runtimeContext{ - agentPort: agentPort, - promData: &promData{}, + agentPort: agentPort, + dataSentToProm: &promData{}, } - promServer := newTestPromServer(agentRuntimeCtx.promData.appendPromWrite) + promServer := newTestPromServer(agentRuntimeCtx.dataSentToProm.appendPromWrite) cleanPromServerVar := setEnvVariable(t, "PROM_SERVER_URL", fmt.Sprintf("%s/api/v1/write", promServer.URL)) return agentRuntimeCtx, func() { promServer.Close() - cleanSinkVar() cleanAgentPortVar() cleanPromServerVar() } From a5733dd4617e120e4fb0a53f286e2dc6ee369f58 Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Wed, 18 Oct 2023 17:52:16 +0100 Subject: [PATCH 04/11] todo --- cmd/internal/pipelinetests/pipeline_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/cmd/internal/pipelinetests/pipeline_test.go b/cmd/internal/pipelinetests/pipeline_test.go index 38a8e3c1d295..e03600a7f82f 100644 --- a/cmd/internal/pipelinetests/pipeline_test.go +++ b/cmd/internal/pipelinetests/pipeline_test.go @@ -25,6 +25,16 @@ type pipelineTest struct { requireCleanShutdown bool } +/** +//TODO(thampiotr): +- Move the framework to own internal package to separate from tests +- Provide fake scrape target that can be scraped? +- Think how to make this low-code and easier to use +- Make a test with logging pipeline +- Make a test with OTEL pipeline +- Make a test with loki.process +- Make a test with relabel rules +**/ func TestPipeline_WithEmptyConfig(t *testing.T) { runTestCase(t, pipelineTest{ configFile: "testdata/empty.river", From 5f1190926e65ada3cdfa3c2f9f30b0cce39c2349 Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Fri, 20 Oct 2023 14:00:16 +0100 Subject: [PATCH 05/11] move framework to own package --- .../internal/framework/framework.go | 97 ++++++++++++ .../{ => internal/framework}/promdata.go | 10 +- .../framework}/runtime_context.go | 18 +-- .../{ => internal/framework}/testprom.go | 2 +- cmd/internal/pipelinetests/pipeline_test.go | 142 ++++-------------- 5 files changed, 138 insertions(+), 131 deletions(-) create mode 100644 cmd/internal/pipelinetests/internal/framework/framework.go rename cmd/internal/pipelinetests/{ => internal/framework}/promdata.go (83%) rename cmd/internal/pipelinetests/{ => internal/framework}/runtime_context.go (67%) rename cmd/internal/pipelinetests/{ => internal/framework}/testprom.go (95%) diff --git a/cmd/internal/pipelinetests/internal/framework/framework.go b/cmd/internal/pipelinetests/internal/framework/framework.go new file mode 100644 index 000000000000..cdb1f80a13af --- /dev/null +++ b/cmd/internal/pipelinetests/internal/framework/framework.go @@ -0,0 +1,97 @@ +package framework + +import ( + "context" + "fmt" + "github.com/grafana/agent/cmd/internal/flowmode" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +const ( + defaultTimeout = 1 * time.Minute + assertionCheckInterval = 100 * time.Millisecond + shutdownTimeout = 5 * time.Second +) + +type PipelineTest struct { + ConfigFile string + EventuallyAssert func(t *assert.CollectT, context *RuntimeContext) + CmdErrContains string + RequireCleanShutdown bool +} + +func RunPipelineTest(t *testing.T, testCase PipelineTest) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + + cleanUp := setUpGlobalRegistryForTesting(prometheus.NewRegistry()) + defer cleanUp() + + agentRuntimeCtx, cleanUpAgent := newAgentRuntimeContext(t) + defer cleanUpAgent() + + cmd := flowmode.Command() + cmd.SetArgs([]string{ + "run", + testCase.ConfigFile, + "--server.http.listen-addr", + fmt.Sprintf("127.0.0.1:%d", agentRuntimeCtx.AgentPort), + "--storage.path", + t.TempDir(), + }) + + doneErr := make(chan error) + go func() { doneErr <- cmd.ExecuteContext(ctx) }() + + assertionsDone := make(chan struct{}) + go func() { + if testCase.EventuallyAssert != nil { + require.EventuallyWithT(t, func(t *assert.CollectT) { + testCase.EventuallyAssert(t, agentRuntimeCtx) + }, defaultTimeout, assertionCheckInterval) + } + assertionsDone <- struct{}{} + }() + + select { + case <-ctx.Done(): + t.Fatalf("test case failed to complete within deadline") + case <-assertionsDone: + case err := <-doneErr: + verifyDoneError(t, testCase, err) + cancel() + return + } + + t.Log("assertion checks done, shutting down agent") + cancel() + select { + case <-time.After(shutdownTimeout): + if testCase.RequireCleanShutdown { + t.Fatalf("agent failed to shut down within deadline") + } else { + t.Log("agent failed to shut down within deadline") + } + case err := <-doneErr: + verifyDoneError(t, testCase, err) + } +} + +func verifyDoneError(t *testing.T, testCase PipelineTest, err error) { + if testCase.CmdErrContains != "" { + require.ErrorContains(t, err, testCase.CmdErrContains, "command must return error containing the string specified in test case") + } else { + require.NoError(t, err) + } +} + +func setUpGlobalRegistryForTesting(registry *prometheus.Registry) func() { + prevRegisterer, prevGatherer := prometheus.DefaultRegisterer, prometheus.DefaultGatherer + prometheus.DefaultRegisterer, prometheus.DefaultGatherer = registry, registry + return func() { + prometheus.DefaultRegisterer, prometheus.DefaultGatherer = prevRegisterer, prevGatherer + } +} diff --git a/cmd/internal/pipelinetests/promdata.go b/cmd/internal/pipelinetests/internal/framework/promdata.go similarity index 83% rename from cmd/internal/pipelinetests/promdata.go rename to cmd/internal/pipelinetests/internal/framework/promdata.go index 98d771c94fbd..9885cd5d587f 100644 --- a/cmd/internal/pipelinetests/promdata.go +++ b/cmd/internal/pipelinetests/internal/framework/promdata.go @@ -1,4 +1,4 @@ -package pipelinetests +package framework import ( "math" @@ -8,24 +8,24 @@ import ( "golang.org/x/exp/maps" ) -type promData struct { +type PromData struct { mut sync.Mutex promWrites []*prompb.WriteRequest } -func (r *promData) appendPromWrite(req *prompb.WriteRequest) { +func (r *PromData) appendPromWrite(req *prompb.WriteRequest) { r.mut.Lock() defer r.mut.Unlock() r.promWrites = append(r.promWrites, req) } -func (r *promData) writesCount() int { +func (r *PromData) WritesCount() int { r.mut.Lock() defer r.mut.Unlock() return len(r.promWrites) } -func (r *promData) findLastSampleMatching(name string, labelsKV ...string) float64 { +func (r *PromData) FindLastSampleMatching(name string, labelsKV ...string) float64 { labelsMap := make(map[string]string) for i := 0; i < len(labelsKV); i += 2 { labelsMap[labelsKV[i]] = labelsKV[i+1] diff --git a/cmd/internal/pipelinetests/runtime_context.go b/cmd/internal/pipelinetests/internal/framework/runtime_context.go similarity index 67% rename from cmd/internal/pipelinetests/runtime_context.go rename to cmd/internal/pipelinetests/internal/framework/runtime_context.go index 837a5abfbbcc..ea2966d3c469 100644 --- a/cmd/internal/pipelinetests/runtime_context.go +++ b/cmd/internal/pipelinetests/internal/framework/runtime_context.go @@ -1,4 +1,4 @@ -package pipelinetests +package framework import ( "fmt" @@ -9,22 +9,22 @@ import ( "github.com/stretchr/testify/require" ) -type runtimeContext struct { - agentPort int - dataSentToProm *promData +type RuntimeContext struct { + AgentPort int + DataSentToProm *PromData } -func newAgentRuntimeContext(t *testing.T) (*runtimeContext, func()) { +func newAgentRuntimeContext(t *testing.T) (*RuntimeContext, func()) { agentPort, err := freeport.GetFreePort() require.NoError(t, err) cleanAgentPortVar := setEnvVariable(t, "AGENT_SELF_HTTP_PORT", fmt.Sprintf("%d", agentPort)) - agentRuntimeCtx := &runtimeContext{ - agentPort: agentPort, - dataSentToProm: &promData{}, + agentRuntimeCtx := &RuntimeContext{ + AgentPort: agentPort, + DataSentToProm: &PromData{}, } - promServer := newTestPromServer(agentRuntimeCtx.dataSentToProm.appendPromWrite) + promServer := newTestPromServer(agentRuntimeCtx.DataSentToProm.appendPromWrite) cleanPromServerVar := setEnvVariable(t, "PROM_SERVER_URL", fmt.Sprintf("%s/api/v1/write", promServer.URL)) return agentRuntimeCtx, func() { diff --git a/cmd/internal/pipelinetests/testprom.go b/cmd/internal/pipelinetests/internal/framework/testprom.go similarity index 95% rename from cmd/internal/pipelinetests/testprom.go rename to cmd/internal/pipelinetests/internal/framework/testprom.go index 9e845ecd164d..5006ff7eab45 100644 --- a/cmd/internal/pipelinetests/testprom.go +++ b/cmd/internal/pipelinetests/internal/framework/testprom.go @@ -1,4 +1,4 @@ -package pipelinetests +package framework import ( "net/http" diff --git a/cmd/internal/pipelinetests/pipeline_test.go b/cmd/internal/pipelinetests/pipeline_test.go index e03600a7f82f..4c90c2d006f0 100644 --- a/cmd/internal/pipelinetests/pipeline_test.go +++ b/cmd/internal/pipelinetests/pipeline_test.go @@ -1,72 +1,54 @@ package pipelinetests import ( - "context" - "fmt" - "testing" - "time" - - "github.com/grafana/agent/cmd/internal/flowmode" - "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const ( - defaultTimeout = 1 * time.Minute - assertionCheckInterval = 100 * time.Millisecond - shutdownTimeout = 5 * time.Second + "testing" ) -type pipelineTest struct { - configFile string - eventuallyAssert func(t *assert.CollectT, context *runtimeContext) - cmdErrContains string - requireCleanShutdown bool -} - -/** +/* +* //TODO(thampiotr): -- Move the framework to own internal package to separate from tests - Provide fake scrape target that can be scraped? - Think how to make this low-code and easier to use - Make a test with logging pipeline - Make a test with OTEL pipeline - Make a test with loki.process - Make a test with relabel rules -**/ +* +*/ func TestPipeline_WithEmptyConfig(t *testing.T) { - runTestCase(t, pipelineTest{ - configFile: "testdata/empty.river", - requireCleanShutdown: true, + framework.RunPipelineTest(t, framework.PipelineTest{ + ConfigFile: "testdata/empty.river", + RequireCleanShutdown: true, }) } func TestPipeline_FileNotExists(t *testing.T) { - runTestCase(t, pipelineTest{ - configFile: "does_not_exist.river", - cmdErrContains: "does_not_exist.river: no such file or directory", - requireCleanShutdown: true, + framework.RunPipelineTest(t, framework.PipelineTest{ + ConfigFile: "does_not_exist.river", + CmdErrContains: "does_not_exist.river: no such file or directory", + RequireCleanShutdown: true, }) } func TestPipeline_FileInvalid(t *testing.T) { - runTestCase(t, pipelineTest{ - configFile: "testdata/invalid.river", - cmdErrContains: "could not perform the initial load successfully", - requireCleanShutdown: true, + framework.RunPipelineTest(t, framework.PipelineTest{ + ConfigFile: "testdata/invalid.river", + CmdErrContains: "could not perform the initial load successfully", + RequireCleanShutdown: true, }) } func TestPipeline_Prometheus_SelfScrapeAndWrite(topT *testing.T) { - runTestCase(topT, pipelineTest{ - configFile: "testdata/scrape_and_write.river", - eventuallyAssert: func(t *assert.CollectT, context *runtimeContext) { - assert.NotEmptyf(t, context.dataSentToProm.writesCount(), "must receive at least one prom write request") + framework.RunPipelineTest(topT, framework.PipelineTest{ + ConfigFile: "testdata/scrape_and_write.river", + EventuallyAssert: func(t *assert.CollectT, context *framework.RuntimeContext) { + assert.NotEmptyf(t, context.DataSentToProm.WritesCount(), "must receive at least one prom write request") // One target expected - assert.Equal(t, float64(1), context.dataSentToProm.findLastSampleMatching("agent_prometheus_scrape_targets_gauge")) + assert.Equal(t, float64(1), context.DataSentToProm.FindLastSampleMatching("agent_prometheus_scrape_targets_gauge")) // Fanned out at least one target - assert.GreaterOrEqual(t, context.dataSentToProm.findLastSampleMatching( + assert.GreaterOrEqual(t, context.DataSentToProm.FindLastSampleMatching( "agent_prometheus_fanout_latency_count", "component_id", "prometheus.scrape.agent_self", @@ -74,19 +56,19 @@ func TestPipeline_Prometheus_SelfScrapeAndWrite(topT *testing.T) { // Received at least `count` samples count := 1000 - assert.Greater(t, context.dataSentToProm.findLastSampleMatching( + assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( "agent_prometheus_forwarded_samples_total", "component_id", "prometheus.scrape.agent_self", ), float64(count)) - assert.Greater(t, context.dataSentToProm.findLastSampleMatching( + assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( "agent_wal_samples_appended_total", "component_id", "prometheus.remote_write.default", ), float64(count)) // At least 100 active series should be present - assert.Greater(t, context.dataSentToProm.findLastSampleMatching( + assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( "agent_wal_storage_active_series", "component_id", "prometheus.remote_write.default", @@ -94,75 +76,3 @@ func TestPipeline_Prometheus_SelfScrapeAndWrite(topT *testing.T) { }, }) } - -func runTestCase(t *testing.T, testCase pipelineTest) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) - - cleanUp := setUpGlobalRegistryForTesting(prometheus.NewRegistry()) - defer cleanUp() - - agentRuntimeCtx, cleanUpAgent := newAgentRuntimeContext(t) - defer cleanUpAgent() - - cmd := flowmode.Command() - cmd.SetArgs([]string{ - "run", - testCase.configFile, - "--server.http.listen-addr", - fmt.Sprintf("127.0.0.1:%d", agentRuntimeCtx.agentPort), - "--storage.path", - t.TempDir(), - }) - - doneErr := make(chan error) - go func() { doneErr <- cmd.ExecuteContext(ctx) }() - - assertionsDone := make(chan struct{}) - go func() { - if testCase.eventuallyAssert != nil { - require.EventuallyWithT(t, func(t *assert.CollectT) { - testCase.eventuallyAssert(t, agentRuntimeCtx) - }, defaultTimeout, assertionCheckInterval) - } - assertionsDone <- struct{}{} - }() - - select { - case <-ctx.Done(): - t.Fatalf("test case failed to complete within deadline") - case <-assertionsDone: - case err := <-doneErr: - verifyDoneError(t, testCase, err) - cancel() - return - } - - t.Log("assertion checks done, shutting down agent") - cancel() - select { - case <-time.After(shutdownTimeout): - if testCase.requireCleanShutdown { - t.Fatalf("agent failed to shut down within deadline") - } else { - t.Log("agent failed to shut down within deadline") - } - case err := <-doneErr: - verifyDoneError(t, testCase, err) - } -} - -func verifyDoneError(t *testing.T, testCase pipelineTest, err error) { - if testCase.cmdErrContains != "" { - require.ErrorContains(t, err, testCase.cmdErrContains, "command must return error containing the string specified in test case") - } else { - require.NoError(t, err) - } -} - -func setUpGlobalRegistryForTesting(registry *prometheus.Registry) func() { - prevRegisterer, prevGatherer := prometheus.DefaultRegisterer, prometheus.DefaultGatherer - prometheus.DefaultRegisterer, prometheus.DefaultGatherer = registry, registry - return func() { - prometheus.DefaultRegisterer, prometheus.DefaultGatherer = prevRegisterer, prevGatherer - } -} From f3fc02569b76f62b17bfa9f08fe271aae78b2155 Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Fri, 20 Oct 2023 15:06:48 +0100 Subject: [PATCH 06/11] More prom tests --- cmd/internal/pipelinetests/edge_cases_test.go | 38 +++++ .../internal/framework/runtime_context.go | 10 +- .../internal/framework/testtarget.go | 31 +++++ cmd/internal/pipelinetests/pipeline_test.go | 78 ----------- cmd/internal/pipelinetests/prometheus_test.go | 131 ++++++++++++++++++ ...rite.river => self_scrape_and_write.river} | 0 .../testdata/target_scrape_and_write.river | 26 ++++ ...get_scrape_and_write_otel_conversion.river | 47 +++++++ 8 files changed, 282 insertions(+), 79 deletions(-) create mode 100644 cmd/internal/pipelinetests/edge_cases_test.go create mode 100644 cmd/internal/pipelinetests/internal/framework/testtarget.go delete mode 100644 cmd/internal/pipelinetests/pipeline_test.go create mode 100644 cmd/internal/pipelinetests/prometheus_test.go rename cmd/internal/pipelinetests/testdata/{scrape_and_write.river => self_scrape_and_write.river} (100%) create mode 100644 cmd/internal/pipelinetests/testdata/target_scrape_and_write.river create mode 100644 cmd/internal/pipelinetests/testdata/target_scrape_and_write_otel_conversion.river diff --git a/cmd/internal/pipelinetests/edge_cases_test.go b/cmd/internal/pipelinetests/edge_cases_test.go new file mode 100644 index 000000000000..128147d41b92 --- /dev/null +++ b/cmd/internal/pipelinetests/edge_cases_test.go @@ -0,0 +1,38 @@ +package pipelinetests + +import ( + "github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework" + "testing" +) + +/* +* +//TODO(thampiotr): +- Make a test with logging pipeline +- Make a test with OTEL pipeline +- Make a test with loki.process +- Make a test with relabel rules +* +*/ +func TestPipeline_WithEmptyConfig(t *testing.T) { + framework.RunPipelineTest(t, framework.PipelineTest{ + ConfigFile: "testdata/empty.river", + RequireCleanShutdown: true, + }) +} + +func TestPipeline_FileNotExists(t *testing.T) { + framework.RunPipelineTest(t, framework.PipelineTest{ + ConfigFile: "does_not_exist.river", + CmdErrContains: "does_not_exist.river: no such file or directory", + RequireCleanShutdown: true, + }) +} + +func TestPipeline_FileInvalid(t *testing.T) { + framework.RunPipelineTest(t, framework.PipelineTest{ + ConfigFile: "testdata/invalid.river", + CmdErrContains: "could not perform the initial load successfully", + RequireCleanShutdown: true, + }) +} diff --git a/cmd/internal/pipelinetests/internal/framework/runtime_context.go b/cmd/internal/pipelinetests/internal/framework/runtime_context.go index ea2966d3c469..043a574d2256 100644 --- a/cmd/internal/pipelinetests/internal/framework/runtime_context.go +++ b/cmd/internal/pipelinetests/internal/framework/runtime_context.go @@ -2,6 +2,7 @@ package framework import ( "fmt" + "net" "os" "testing" @@ -12,6 +13,7 @@ import ( type RuntimeContext struct { AgentPort int DataSentToProm *PromData + TestTarget *TestTarget } func newAgentRuntimeContext(t *testing.T) (*RuntimeContext, func()) { @@ -19,17 +21,23 @@ func newAgentRuntimeContext(t *testing.T) (*RuntimeContext, func()) { require.NoError(t, err) cleanAgentPortVar := setEnvVariable(t, "AGENT_SELF_HTTP_PORT", fmt.Sprintf("%d", agentPort)) + testTarget := newTestTarget() + cleanTestTargetVar := setEnvVariable(t, "TEST_TARGET_ADDRESS", fmt.Sprintf("127.0.0.1:%d", testTarget.server.Listener.Addr().(*net.TCPAddr).Port)) + agentRuntimeCtx := &RuntimeContext{ AgentPort: agentPort, DataSentToProm: &PromData{}, + TestTarget: testTarget, } promServer := newTestPromServer(agentRuntimeCtx.DataSentToProm.appendPromWrite) cleanPromServerVar := setEnvVariable(t, "PROM_SERVER_URL", fmt.Sprintf("%s/api/v1/write", promServer.URL)) return agentRuntimeCtx, func() { - promServer.Close() cleanAgentPortVar() + testTarget.server.Close() + cleanTestTargetVar() + promServer.Close() cleanPromServerVar() } } diff --git a/cmd/internal/pipelinetests/internal/framework/testtarget.go b/cmd/internal/pipelinetests/internal/framework/testtarget.go new file mode 100644 index 000000000000..1eee7c8d518a --- /dev/null +++ b/cmd/internal/pipelinetests/internal/framework/testtarget.go @@ -0,0 +1,31 @@ +package framework + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "net/http/httptest" +) + +type TestTarget struct { + registry *prometheus.Registry + server *httptest.Server +} + +func newTestTarget() *TestTarget { + target := &TestTarget{ + registry: prometheus.NewRegistry(), + } + + server := httptest.NewServer(promhttp.InstrumentMetricHandler( + target.registry, promhttp.HandlerFor(target.registry, promhttp.HandlerOpts{ + Registry: target.registry, + }), + )) + target.server = server + + return target +} + +func (t *TestTarget) Register(collector prometheus.Collector) { + t.registry.MustRegister(collector) +} diff --git a/cmd/internal/pipelinetests/pipeline_test.go b/cmd/internal/pipelinetests/pipeline_test.go deleted file mode 100644 index 4c90c2d006f0..000000000000 --- a/cmd/internal/pipelinetests/pipeline_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package pipelinetests - -import ( - "github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework" - "github.com/stretchr/testify/assert" - "testing" -) - -/* -* -//TODO(thampiotr): -- Provide fake scrape target that can be scraped? -- Think how to make this low-code and easier to use -- Make a test with logging pipeline -- Make a test with OTEL pipeline -- Make a test with loki.process -- Make a test with relabel rules -* -*/ -func TestPipeline_WithEmptyConfig(t *testing.T) { - framework.RunPipelineTest(t, framework.PipelineTest{ - ConfigFile: "testdata/empty.river", - RequireCleanShutdown: true, - }) -} - -func TestPipeline_FileNotExists(t *testing.T) { - framework.RunPipelineTest(t, framework.PipelineTest{ - ConfigFile: "does_not_exist.river", - CmdErrContains: "does_not_exist.river: no such file or directory", - RequireCleanShutdown: true, - }) -} - -func TestPipeline_FileInvalid(t *testing.T) { - framework.RunPipelineTest(t, framework.PipelineTest{ - ConfigFile: "testdata/invalid.river", - CmdErrContains: "could not perform the initial load successfully", - RequireCleanShutdown: true, - }) -} - -func TestPipeline_Prometheus_SelfScrapeAndWrite(topT *testing.T) { - framework.RunPipelineTest(topT, framework.PipelineTest{ - ConfigFile: "testdata/scrape_and_write.river", - EventuallyAssert: func(t *assert.CollectT, context *framework.RuntimeContext) { - assert.NotEmptyf(t, context.DataSentToProm.WritesCount(), "must receive at least one prom write request") - // One target expected - assert.Equal(t, float64(1), context.DataSentToProm.FindLastSampleMatching("agent_prometheus_scrape_targets_gauge")) - // Fanned out at least one target - assert.GreaterOrEqual(t, context.DataSentToProm.FindLastSampleMatching( - "agent_prometheus_fanout_latency_count", - "component_id", - "prometheus.scrape.agent_self", - ), float64(1)) - - // Received at least `count` samples - count := 1000 - assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( - "agent_prometheus_forwarded_samples_total", - "component_id", - "prometheus.scrape.agent_self", - ), float64(count)) - assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( - "agent_wal_samples_appended_total", - "component_id", - "prometheus.remote_write.default", - ), float64(count)) - - // At least 100 active series should be present - assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( - "agent_wal_storage_active_series", - "component_id", - "prometheus.remote_write.default", - ), float64(100)) - }, - }) -} diff --git a/cmd/internal/pipelinetests/prometheus_test.go b/cmd/internal/pipelinetests/prometheus_test.go new file mode 100644 index 000000000000..07117cc46aff --- /dev/null +++ b/cmd/internal/pipelinetests/prometheus_test.go @@ -0,0 +1,131 @@ +package pipelinetests + +import ( + "testing" + + "github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" +) + +func TestPipeline_Prometheus_SelfScrapeAndWrite(topT *testing.T) { + framework.RunPipelineTest(topT, framework.PipelineTest{ + ConfigFile: "testdata/self_scrape_and_write.river", + EventuallyAssert: func(t *assert.CollectT, context *framework.RuntimeContext) { + assert.NotEmptyf(t, context.DataSentToProm.WritesCount(), "must receive at least one prom write request") + // One target expected + assert.Equal(t, float64(1), context.DataSentToProm.FindLastSampleMatching("agent_prometheus_scrape_targets_gauge")) + // Fanned out at least one target + assert.GreaterOrEqual(t, context.DataSentToProm.FindLastSampleMatching( + "agent_prometheus_fanout_latency_count", + "component_id", + "prometheus.scrape.agent_self", + ), float64(1)) + + // Received at least `count` samples + count := 1000 + assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( + "agent_prometheus_forwarded_samples_total", + "component_id", + "prometheus.scrape.agent_self", + ), float64(count)) + assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( + "agent_wal_samples_appended_total", + "component_id", + "prometheus.remote_write.default", + ), float64(count)) + + // At least 100 active series should be present + assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( + "agent_wal_storage_active_series", + "component_id", "prometheus.remote_write.default", + "job", "agent", + ), float64(100)) + }, + }) +} + +func TestPipeline_Prometheus_TargetScrapeAndWrite(topT *testing.T) { + framework.RunPipelineTest(topT, framework.PipelineTest{ + ConfigFile: "testdata/target_scrape_and_write.river", + EventuallyAssert: verifyDifferentTypesOfMetricsWithTestTarget(), + }) +} + +func TestPipeline_Prometheus_TargetScrapeAndWrite_WithOTELConversion(topT *testing.T) { + framework.RunPipelineTest(topT, framework.PipelineTest{ + ConfigFile: "testdata/target_scrape_and_write_otel_conversion.river", + EventuallyAssert: verifyDifferentTypesOfMetricsWithTestTarget(), + }) +} + +// verifyDifferentTypesOfMetricsWithTestTarget exposes different metrics using the context.TestTarget and then +// verifies that they all arrived eventually to context.DataSentToProm. This test can be used to verify various +// pipelines that are expected to ship the metrics from a test target to the prometheus remote write endpoint. +func verifyDifferentTypesOfMetricsWithTestTarget() func(t *assert.CollectT, context *framework.RuntimeContext) { + registered := false + gauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "test_target_gauge", + ConstLabels: map[string]string{ + "foo": "bar", + }, + }) + counter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_target_counter", + ConstLabels: map[string]string{ + "owner": "count_von_count", + }, + }) + hist := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "test_target_histogram", + ConstLabels: map[string]string{ + "type": "histogram", + }, + }) + + return func(t *assert.CollectT, context *framework.RuntimeContext) { + if !registered { + // Register and set the test gauge only once + context.TestTarget.Register(gauge) + gauge.Set(123) + + context.TestTarget.Register(counter) + counter.Add(321) + + context.TestTarget.Register(hist) + for i := 0; i < 100; i++ { + hist.Observe(float64(i)) + } + + registered = true + } + + assert.NotEmptyf(t, context.DataSentToProm.WritesCount(), "must receive at least one prom write request") + // Check the gauge as expected value and has const labels + assert.Equal(t, float64(123), context.DataSentToProm.FindLastSampleMatching( + "test_target_gauge", + "foo", "bar", + )) + + // Check the counter labels + assert.Equal(t, float64(321), context.DataSentToProm.FindLastSampleMatching( + "test_target_counter", + "owner", "count_von_count", + )) + + // Check the histogram metrics + assert.Equal(t, float64(100), context.DataSentToProm.FindLastSampleMatching( + "test_target_histogram_count", + "type", "histogram", + )) + assert.Equal(t, float64(4950), context.DataSentToProm.FindLastSampleMatching( + "test_target_histogram_sum", + "type", "histogram", + )) + assert.Equal(t, float64(11), context.DataSentToProm.FindLastSampleMatching( + "test_target_histogram_bucket", + "type", "histogram", + "le", "10", + )) + } +} diff --git a/cmd/internal/pipelinetests/testdata/scrape_and_write.river b/cmd/internal/pipelinetests/testdata/self_scrape_and_write.river similarity index 100% rename from cmd/internal/pipelinetests/testdata/scrape_and_write.river rename to cmd/internal/pipelinetests/testdata/self_scrape_and_write.river diff --git a/cmd/internal/pipelinetests/testdata/target_scrape_and_write.river b/cmd/internal/pipelinetests/testdata/target_scrape_and_write.river new file mode 100644 index 000000000000..2df61570335d --- /dev/null +++ b/cmd/internal/pipelinetests/testdata/target_scrape_and_write.river @@ -0,0 +1,26 @@ +logging { + level = "debug" + format = "logfmt" +} + +prometheus.scrape "agent_self" { + targets = [ + {"__address__" = env("TEST_TARGET_ADDRESS"), "job" = "test_target"}, + ] + forward_to = [prometheus.remote_write.default.receiver] + + // Scrape settings for the impatient + scrape_interval = "250ms" + scrape_timeout = "250ms" +} + +prometheus.remote_write "default" { + endpoint { + url = env("PROM_SERVER_URL") + + // Queue settings for the impatient + queue_config { + batch_send_deadline = "250ms" + } + } +} diff --git a/cmd/internal/pipelinetests/testdata/target_scrape_and_write_otel_conversion.river b/cmd/internal/pipelinetests/testdata/target_scrape_and_write_otel_conversion.river new file mode 100644 index 000000000000..94a40c45bee7 --- /dev/null +++ b/cmd/internal/pipelinetests/testdata/target_scrape_and_write_otel_conversion.river @@ -0,0 +1,47 @@ +logging { + level = "debug" + format = "logfmt" +} + +// Scrape test target and send to OTEL converter +prometheus.scrape "agent_self" { + targets = [ + {"__address__" = env("TEST_TARGET_ADDRESS"), "job" = "test_target"}, + ] + forward_to = [otelcol.receiver.prometheus.to_otel.receiver] + + // Scrape settings for the impatient + scrape_interval = "250ms" + scrape_timeout = "250ms" +} + +// converts prom metrics and sends to OTEL batch processor +otelcol.receiver.prometheus "to_otel" { + output { + metrics = [otelcol.processor.batch.default.input] + } +} + +// Batches metrics and sends to OTEL exporter +otelcol.processor.batch "default" { + output { + metrics = [otelcol.exporter.prometheus.default.input] + } +} + +// Converts metrics to prom and sends to remote_write +otelcol.exporter.prometheus "default" { + forward_to = [prometheus.remote_write.default.receiver] +} + +// Sends to test prom server +prometheus.remote_write "default" { + endpoint { + url = env("PROM_SERVER_URL") + + // Queue settings for the impatient + queue_config { + batch_send_deadline = "250ms" + } + } +} From 178ab13edc8263082fc22dab03f2d259162b87a5 Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Fri, 20 Oct 2023 15:15:14 +0100 Subject: [PATCH 07/11] comments --- .../pipelinetests/internal/framework/framework.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/cmd/internal/pipelinetests/internal/framework/framework.go b/cmd/internal/pipelinetests/internal/framework/framework.go index cdb1f80a13af..c893a793e95c 100644 --- a/cmd/internal/pipelinetests/internal/framework/framework.go +++ b/cmd/internal/pipelinetests/internal/framework/framework.go @@ -18,9 +18,17 @@ const ( ) type PipelineTest struct { - ConfigFile string - EventuallyAssert func(t *assert.CollectT, context *RuntimeContext) - CmdErrContains string + // ConfigFile is the path to the config file to be used for the test. + ConfigFile string + // EventuallyAssert is a function that will be called after the agent has started, repeatedly until all assertions + // are satisfied or the default timeout is reached. The provided context contains all the extra information that + // the framework has collected, such as data received by the fake prometheus server. + EventuallyAssert func(t *assert.CollectT, context *RuntimeContext) + // CmdErrContains is a string that must be contained in the error returned by the command. If empty, no error is + // expected. + CmdErrContains string + // RequireCleanShutdown indicates whether the test framework should verify that the agent shut down cleanly after + // the test case has completed. RequireCleanShutdown bool } From e51ac841d3137e9adef5fb6e9c3151c0471bc891 Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Fri, 20 Oct 2023 15:26:35 +0100 Subject: [PATCH 08/11] goimports --- .../internal/framework/framework.go | 5 ++- .../internal/framework/testtarget.go | 3 +- cmd/internal/pipelinetests/otel_test.go | 45 +++++++++++++++++++ .../testdata/otel_receive_and_write.river | 26 +++++++++++ 4 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 cmd/internal/pipelinetests/otel_test.go create mode 100644 cmd/internal/pipelinetests/testdata/otel_receive_and_write.river diff --git a/cmd/internal/pipelinetests/internal/framework/framework.go b/cmd/internal/pipelinetests/internal/framework/framework.go index c893a793e95c..8c9cca9eac66 100644 --- a/cmd/internal/pipelinetests/internal/framework/framework.go +++ b/cmd/internal/pipelinetests/internal/framework/framework.go @@ -3,12 +3,13 @@ package framework import ( "context" "fmt" + "testing" + "time" + "github.com/grafana/agent/cmd/internal/flowmode" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" - "time" ) const ( diff --git a/cmd/internal/pipelinetests/internal/framework/testtarget.go b/cmd/internal/pipelinetests/internal/framework/testtarget.go index 1eee7c8d518a..d7fbba30e5b8 100644 --- a/cmd/internal/pipelinetests/internal/framework/testtarget.go +++ b/cmd/internal/pipelinetests/internal/framework/testtarget.go @@ -1,9 +1,10 @@ package framework import ( + "net/http/httptest" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "net/http/httptest" ) type TestTarget struct { diff --git a/cmd/internal/pipelinetests/otel_test.go b/cmd/internal/pipelinetests/otel_test.go new file mode 100644 index 000000000000..d1b267a33546 --- /dev/null +++ b/cmd/internal/pipelinetests/otel_test.go @@ -0,0 +1,45 @@ +package pipelinetests + +import ( + "testing" + + "github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework" + "github.com/stretchr/testify/assert" +) + +func TestPipeline_OTEL_TestScrapeAndWrite(topT *testing.T) { + framework.RunPipelineTest(topT, framework.PipelineTest{ + ConfigFile: "testdata/self_scrape_and_write.river", + EventuallyAssert: func(t *assert.CollectT, context *framework.RuntimeContext) { + assert.NotEmptyf(t, context.DataSentToProm.WritesCount(), "must receive at least one prom write request") + // One target expected + assert.Equal(t, float64(1), context.DataSentToProm.FindLastSampleMatching("agent_prometheus_scrape_targets_gauge")) + // Fanned out at least one target + assert.GreaterOrEqual(t, context.DataSentToProm.FindLastSampleMatching( + "agent_prometheus_fanout_latency_count", + "component_id", + "prometheus.scrape.agent_self", + ), float64(1)) + + // Received at least `count` samples + count := 1000 + assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( + "agent_prometheus_forwarded_samples_total", + "component_id", + "prometheus.scrape.agent_self", + ), float64(count)) + assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( + "agent_wal_samples_appended_total", + "component_id", + "prometheus.remote_write.default", + ), float64(count)) + + // At least 100 active series should be present + assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( + "agent_wal_storage_active_series", + "component_id", "prometheus.remote_write.default", + "job", "agent", + ), float64(100)) + }, + }) +} diff --git a/cmd/internal/pipelinetests/testdata/otel_receive_and_write.river b/cmd/internal/pipelinetests/testdata/otel_receive_and_write.river new file mode 100644 index 000000000000..2df61570335d --- /dev/null +++ b/cmd/internal/pipelinetests/testdata/otel_receive_and_write.river @@ -0,0 +1,26 @@ +logging { + level = "debug" + format = "logfmt" +} + +prometheus.scrape "agent_self" { + targets = [ + {"__address__" = env("TEST_TARGET_ADDRESS"), "job" = "test_target"}, + ] + forward_to = [prometheus.remote_write.default.receiver] + + // Scrape settings for the impatient + scrape_interval = "250ms" + scrape_timeout = "250ms" +} + +prometheus.remote_write "default" { + endpoint { + url = env("PROM_SERVER_URL") + + // Queue settings for the impatient + queue_config { + batch_send_deadline = "250ms" + } + } +} From e991a3e2872b59f4a38f7a03c463a70306afd34b Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Fri, 20 Oct 2023 16:47:52 +0100 Subject: [PATCH 09/11] Refactor tests for syntax sugar --- cmd/internal/pipelinetests/edge_cases_test.go | 16 ++-- .../internal/framework/framework.go | 35 +++++--- .../internal/framework/lokidata.go | 40 +++++++++ .../internal/framework/runtime_context.go | 7 ++ .../internal/framework/testloki.go | 23 +++++ cmd/internal/pipelinetests/loki_test.go | 85 +++++++++++++++++++ cmd/internal/pipelinetests/otel_test.go | 45 ---------- cmd/internal/pipelinetests/prometheus_test.go | 16 ++-- .../testdata/loki_source_api_write.river | 24 ++++++ .../testdata/self_logs_write.river | 12 +++ .../testdata/self_scrape_and_write.river | 22 ++++- 11 files changed, 252 insertions(+), 73 deletions(-) create mode 100644 cmd/internal/pipelinetests/internal/framework/lokidata.go create mode 100644 cmd/internal/pipelinetests/internal/framework/testloki.go create mode 100644 cmd/internal/pipelinetests/loki_test.go delete mode 100644 cmd/internal/pipelinetests/otel_test.go create mode 100644 cmd/internal/pipelinetests/testdata/loki_source_api_write.river create mode 100644 cmd/internal/pipelinetests/testdata/self_logs_write.river diff --git a/cmd/internal/pipelinetests/edge_cases_test.go b/cmd/internal/pipelinetests/edge_cases_test.go index 128147d41b92..a07d9b036297 100644 --- a/cmd/internal/pipelinetests/edge_cases_test.go +++ b/cmd/internal/pipelinetests/edge_cases_test.go @@ -1,38 +1,38 @@ package pipelinetests import ( - "github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework" "testing" + + "github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework" ) /* * //TODO(thampiotr): -- Make a test with logging pipeline - Make a test with OTEL pipeline - Make a test with loki.process - Make a test with relabel rules * */ func TestPipeline_WithEmptyConfig(t *testing.T) { - framework.RunPipelineTest(t, framework.PipelineTest{ + framework.PipelineTest{ ConfigFile: "testdata/empty.river", RequireCleanShutdown: true, - }) + }.RunTest(t) } func TestPipeline_FileNotExists(t *testing.T) { - framework.RunPipelineTest(t, framework.PipelineTest{ + framework.PipelineTest{ ConfigFile: "does_not_exist.river", CmdErrContains: "does_not_exist.river: no such file or directory", RequireCleanShutdown: true, - }) + }.RunTest(t) } func TestPipeline_FileInvalid(t *testing.T) { - framework.RunPipelineTest(t, framework.PipelineTest{ + framework.PipelineTest{ ConfigFile: "testdata/invalid.river", CmdErrContains: "could not perform the initial load successfully", RequireCleanShutdown: true, - }) + }.RunTest(t) } diff --git a/cmd/internal/pipelinetests/internal/framework/framework.go b/cmd/internal/pipelinetests/internal/framework/framework.go index 8c9cca9eac66..7f7062791e99 100644 --- a/cmd/internal/pipelinetests/internal/framework/framework.go +++ b/cmd/internal/pipelinetests/internal/framework/framework.go @@ -13,7 +13,7 @@ import ( ) const ( - defaultTimeout = 1 * time.Minute + defaultTimeout = 10 * time.Second assertionCheckInterval = 100 * time.Millisecond shutdownTimeout = 5 * time.Second ) @@ -31,10 +31,25 @@ type PipelineTest struct { // RequireCleanShutdown indicates whether the test framework should verify that the agent shut down cleanly after // the test case has completed. RequireCleanShutdown bool + // Timeout is the maximum amount of time the test case is allowed to run. If 0, defaultTimeout is used. + Timeout time.Duration + // Environment is a map of environment variables to be set before running the test. It will be automatically + // cleaned. The values can be used inside the config files using the `env("ENV_VAR")` syntax. + Environment map[string]string } -func RunPipelineTest(t *testing.T, testCase PipelineTest) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) +func (p PipelineTest) RunTest(t *testing.T) { + if p.Timeout == 0 { + p.Timeout = defaultTimeout + } + // Main context has some padding added to the timeout to allow for assertions error message to surface first. + ctx, cancel := context.WithTimeout(context.Background(), p.Timeout+2*assertionCheckInterval) + + for k, v := range p.Environment { + cleanUp := setEnvVariable(t, k, v) + //goland:noinspection GoDeferInLoop + defer cleanUp() + } cleanUp := setUpGlobalRegistryForTesting(prometheus.NewRegistry()) defer cleanUp() @@ -45,7 +60,7 @@ func RunPipelineTest(t *testing.T, testCase PipelineTest) { cmd := flowmode.Command() cmd.SetArgs([]string{ "run", - testCase.ConfigFile, + p.ConfigFile, "--server.http.listen-addr", fmt.Sprintf("127.0.0.1:%d", agentRuntimeCtx.AgentPort), "--storage.path", @@ -57,10 +72,10 @@ func RunPipelineTest(t *testing.T, testCase PipelineTest) { assertionsDone := make(chan struct{}) go func() { - if testCase.EventuallyAssert != nil { + if p.EventuallyAssert != nil { require.EventuallyWithT(t, func(t *assert.CollectT) { - testCase.EventuallyAssert(t, agentRuntimeCtx) - }, defaultTimeout, assertionCheckInterval) + p.EventuallyAssert(t, agentRuntimeCtx) + }, p.Timeout, assertionCheckInterval) } assertionsDone <- struct{}{} }() @@ -70,7 +85,7 @@ func RunPipelineTest(t *testing.T, testCase PipelineTest) { t.Fatalf("test case failed to complete within deadline") case <-assertionsDone: case err := <-doneErr: - verifyDoneError(t, testCase, err) + verifyDoneError(t, p, err) cancel() return } @@ -79,13 +94,13 @@ func RunPipelineTest(t *testing.T, testCase PipelineTest) { cancel() select { case <-time.After(shutdownTimeout): - if testCase.RequireCleanShutdown { + if p.RequireCleanShutdown { t.Fatalf("agent failed to shut down within deadline") } else { t.Log("agent failed to shut down within deadline") } case err := <-doneErr: - verifyDoneError(t, testCase, err) + verifyDoneError(t, p, err) } } diff --git a/cmd/internal/pipelinetests/internal/framework/lokidata.go b/cmd/internal/pipelinetests/internal/framework/lokidata.go new file mode 100644 index 000000000000..1b4abb109d29 --- /dev/null +++ b/cmd/internal/pipelinetests/internal/framework/lokidata.go @@ -0,0 +1,40 @@ +package framework + +import ( + "strings" + "sync" + + "github.com/grafana/loki/pkg/logproto" +) + +type LokiData struct { + mut sync.Mutex + lokiWrites []*logproto.PushRequest +} + +func (r *LokiData) appendLokiWrite(req *logproto.PushRequest) { + r.mut.Lock() + defer r.mut.Unlock() + r.lokiWrites = append(r.lokiWrites, req) +} + +func (r *LokiData) WritesCount() int { + r.mut.Lock() + defer r.mut.Unlock() + return len(r.lokiWrites) +} + +func (r *LokiData) FindLineContaining(contents string) (*logproto.Entry, string) { + r.mut.Lock() + defer r.mut.Unlock() + for i := len(r.lokiWrites) - 1; i >= 0; i-- { + for _, stream := range r.lokiWrites[i].Streams { + for _, entry := range stream.Entries { + if strings.Contains(entry.Line, contents) { + return &entry, stream.Labels + } + } + } + } + return nil, "" +} diff --git a/cmd/internal/pipelinetests/internal/framework/runtime_context.go b/cmd/internal/pipelinetests/internal/framework/runtime_context.go index 043a574d2256..5fa1519d1f33 100644 --- a/cmd/internal/pipelinetests/internal/framework/runtime_context.go +++ b/cmd/internal/pipelinetests/internal/framework/runtime_context.go @@ -13,6 +13,7 @@ import ( type RuntimeContext struct { AgentPort int DataSentToProm *PromData + DataSentToLoki *LokiData TestTarget *TestTarget } @@ -27,18 +28,24 @@ func newAgentRuntimeContext(t *testing.T) (*RuntimeContext, func()) { agentRuntimeCtx := &RuntimeContext{ AgentPort: agentPort, DataSentToProm: &PromData{}, + DataSentToLoki: &LokiData{}, TestTarget: testTarget, } promServer := newTestPromServer(agentRuntimeCtx.DataSentToProm.appendPromWrite) cleanPromServerVar := setEnvVariable(t, "PROM_SERVER_URL", fmt.Sprintf("%s/api/v1/write", promServer.URL)) + lokiServer := newTestLokiServer(agentRuntimeCtx.DataSentToLoki.appendLokiWrite) + cleanLokiServerVar := setEnvVariable(t, "LOKI_SERVER_URL", lokiServer.URL) + return agentRuntimeCtx, func() { cleanAgentPortVar() testTarget.server.Close() cleanTestTargetVar() promServer.Close() cleanPromServerVar() + lokiServer.Close() + cleanLokiServerVar() } } diff --git a/cmd/internal/pipelinetests/internal/framework/testloki.go b/cmd/internal/pipelinetests/internal/framework/testloki.go new file mode 100644 index 000000000000..04b9f7907b52 --- /dev/null +++ b/cmd/internal/pipelinetests/internal/framework/testloki.go @@ -0,0 +1,23 @@ +package framework + +import ( + "context" + "math" + "net/http" + "net/http/httptest" + + "github.com/grafana/loki/pkg/logproto" + loki_util "github.com/grafana/loki/pkg/util" +) + +func newTestLokiServer(onWrite func(*logproto.PushRequest)) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var pushReq logproto.PushRequest + err := loki_util.ParseProtoReader(context.Background(), r.Body, int(r.ContentLength), math.MaxInt32, &pushReq, loki_util.RawSnappy) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + onWrite(&pushReq) + })) +} diff --git a/cmd/internal/pipelinetests/loki_test.go b/cmd/internal/pipelinetests/loki_test.go new file mode 100644 index 000000000000..84a41a0e6d0f --- /dev/null +++ b/cmd/internal/pipelinetests/loki_test.go @@ -0,0 +1,85 @@ +package pipelinetests + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework" + "github.com/grafana/agent/component/common/loki" + "github.com/grafana/agent/component/common/loki/client" + "github.com/grafana/dskit/flagext" + "github.com/grafana/loki/pkg/logproto" + "github.com/phayes/freeport" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPipeline_Loki_SelfLogsWrite(topT *testing.T) { + framework.PipelineTest{ + ConfigFile: "testdata/self_logs_write.river", + EventuallyAssert: func(t *assert.CollectT, context *framework.RuntimeContext) { + // Reload the config in order to generate some logs + _, _ = http.Get(fmt.Sprintf("http://localhost:%d/-/reload", context.AgentPort)) + line, labels := context.DataSentToLoki.FindLineContaining("config reloaded") + assert.NotNil(t, line) + assert.Equal(t, "{component=\"agent\"}", labels) + }, + }.RunTest(topT) +} + +func TestPipeline_Loki_APILogsWrite(topT *testing.T) { + apiServerPort, err := freeport.GetFreePort() + assert.NoError(topT, err) + + lokiClient := newTestLokiClient(topT, fmt.Sprintf("http://127.0.0.1:%d/loki/api/v1/push", apiServerPort)) + defer lokiClient.Stop() + + testLogEntry := loki.Entry{ + Labels: map[model.LabelName]model.LabelValue{"source": "test"}, + Entry: logproto.Entry{Timestamp: time.Now(), Line: "hello world!"}, + } + + logLineSent := false + + framework.PipelineTest{ + ConfigFile: "testdata/loki_source_api_write.river", + Environment: map[string]string{ + "API_SERVER_PORT": fmt.Sprintf("%d", apiServerPort), + }, + EventuallyAssert: func(t *assert.CollectT, context *framework.RuntimeContext) { + // Send the line if not yet sent + if !logLineSent { + lokiClient.Chan() <- testLogEntry + logLineSent = true + } + // Verify we have received the line at the other end of the pipeline + line, labels := context.DataSentToLoki.FindLineContaining("hello world!") + assert.NotNil(t, line) + assert.Equal(t, "{forwarded=\"true\", source=\"test\"}", labels) + }, + }.RunTest(topT) +} + +func newTestLokiClient(t *testing.T, url string) client.Client { + fUrl := flagext.URLValue{} + err := fUrl.Set(url) + require.NoError(t, err) + + lokiClient, err := client.New( + client.NewMetrics(nil), + client.Config{ + URL: fUrl, + Timeout: 5 * time.Second, + }, + 0, + 0, + false, + log.NewNopLogger(), + ) + require.NoError(t, err) + return lokiClient +} diff --git a/cmd/internal/pipelinetests/otel_test.go b/cmd/internal/pipelinetests/otel_test.go deleted file mode 100644 index d1b267a33546..000000000000 --- a/cmd/internal/pipelinetests/otel_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package pipelinetests - -import ( - "testing" - - "github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework" - "github.com/stretchr/testify/assert" -) - -func TestPipeline_OTEL_TestScrapeAndWrite(topT *testing.T) { - framework.RunPipelineTest(topT, framework.PipelineTest{ - ConfigFile: "testdata/self_scrape_and_write.river", - EventuallyAssert: func(t *assert.CollectT, context *framework.RuntimeContext) { - assert.NotEmptyf(t, context.DataSentToProm.WritesCount(), "must receive at least one prom write request") - // One target expected - assert.Equal(t, float64(1), context.DataSentToProm.FindLastSampleMatching("agent_prometheus_scrape_targets_gauge")) - // Fanned out at least one target - assert.GreaterOrEqual(t, context.DataSentToProm.FindLastSampleMatching( - "agent_prometheus_fanout_latency_count", - "component_id", - "prometheus.scrape.agent_self", - ), float64(1)) - - // Received at least `count` samples - count := 1000 - assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( - "agent_prometheus_forwarded_samples_total", - "component_id", - "prometheus.scrape.agent_self", - ), float64(count)) - assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( - "agent_wal_samples_appended_total", - "component_id", - "prometheus.remote_write.default", - ), float64(count)) - - // At least 100 active series should be present - assert.Greater(t, context.DataSentToProm.FindLastSampleMatching( - "agent_wal_storage_active_series", - "component_id", "prometheus.remote_write.default", - "job", "agent", - ), float64(100)) - }, - }) -} diff --git a/cmd/internal/pipelinetests/prometheus_test.go b/cmd/internal/pipelinetests/prometheus_test.go index 07117cc46aff..0d79a9bbfd56 100644 --- a/cmd/internal/pipelinetests/prometheus_test.go +++ b/cmd/internal/pipelinetests/prometheus_test.go @@ -2,6 +2,7 @@ package pipelinetests import ( "testing" + "time" "github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework" "github.com/prometheus/client_golang/prometheus" @@ -9,8 +10,9 @@ import ( ) func TestPipeline_Prometheus_SelfScrapeAndWrite(topT *testing.T) { - framework.RunPipelineTest(topT, framework.PipelineTest{ + framework.PipelineTest{ ConfigFile: "testdata/self_scrape_and_write.river", + Timeout: 1 * time.Minute, // prometheus tests are slower due to remote_write/wal issues EventuallyAssert: func(t *assert.CollectT, context *framework.RuntimeContext) { assert.NotEmptyf(t, context.DataSentToProm.WritesCount(), "must receive at least one prom write request") // One target expected @@ -42,21 +44,23 @@ func TestPipeline_Prometheus_SelfScrapeAndWrite(topT *testing.T) { "job", "agent", ), float64(100)) }, - }) + }.RunTest(topT) } func TestPipeline_Prometheus_TargetScrapeAndWrite(topT *testing.T) { - framework.RunPipelineTest(topT, framework.PipelineTest{ + framework.PipelineTest{ ConfigFile: "testdata/target_scrape_and_write.river", + Timeout: 1 * time.Minute, // prometheus tests are slower due to remote_write/wal issues EventuallyAssert: verifyDifferentTypesOfMetricsWithTestTarget(), - }) + }.RunTest(topT) } func TestPipeline_Prometheus_TargetScrapeAndWrite_WithOTELConversion(topT *testing.T) { - framework.RunPipelineTest(topT, framework.PipelineTest{ + framework.PipelineTest{ ConfigFile: "testdata/target_scrape_and_write_otel_conversion.river", + Timeout: 1 * time.Minute, // prometheus tests are slower due to remote_write/wal issues EventuallyAssert: verifyDifferentTypesOfMetricsWithTestTarget(), - }) + }.RunTest(topT) } // verifyDifferentTypesOfMetricsWithTestTarget exposes different metrics using the context.TestTarget and then diff --git a/cmd/internal/pipelinetests/testdata/loki_source_api_write.river b/cmd/internal/pipelinetests/testdata/loki_source_api_write.river new file mode 100644 index 000000000000..291740875ce6 --- /dev/null +++ b/cmd/internal/pipelinetests/testdata/loki_source_api_write.river @@ -0,0 +1,24 @@ +logging { + level = "debug" + format = "logfmt" +} + +loki.source.api "loki_push_api" { + http { + listen_address = "127.0.0.1" + listen_port = env("API_SERVER_PORT") + } + forward_to = [ + loki.write.default.receiver, + ] + labels = { + forwarded = "true", + } +} + +loki.write "default" { + endpoint { + url = env("LOKI_SERVER_URL") + batch_wait = "100ms" + } +} diff --git a/cmd/internal/pipelinetests/testdata/self_logs_write.river b/cmd/internal/pipelinetests/testdata/self_logs_write.river new file mode 100644 index 000000000000..1f832d2a2303 --- /dev/null +++ b/cmd/internal/pipelinetests/testdata/self_logs_write.river @@ -0,0 +1,12 @@ +logging { + level = "debug" + format = "logfmt" + write_to = [loki.write.default.receiver] +} + +loki.write "default" { + endpoint { + url = env("LOKI_SERVER_URL") + batch_wait = "100ms" + } +} diff --git a/cmd/internal/pipelinetests/testdata/self_scrape_and_write.river b/cmd/internal/pipelinetests/testdata/self_scrape_and_write.river index 06360928e409..3a9080077e4f 100644 --- a/cmd/internal/pipelinetests/testdata/self_scrape_and_write.river +++ b/cmd/internal/pipelinetests/testdata/self_scrape_and_write.river @@ -3,17 +3,31 @@ logging { format = "logfmt" } +// Self discover the agent +prometheus.exporter.agent "agent" { +} + +// Add a job label +discovery.relabel "add_job" { + targets = prometheus.exporter.agent.agent.targets + + rule { + target_label = "job" + replacement = "agent" + } +} + +// Scrape the agent prometheus.scrape "agent_self" { - targets = [ - {"__address__" = "127.0.0.1:"+env("AGENT_SELF_HTTP_PORT"), "job" = "agent"}, - ] - forward_to = [prometheus.remote_write.default.receiver] + targets = discovery.relabel.add_job.output + forward_to = [prometheus.remote_write.default.receiver] // Scrape settings for the impatient scrape_interval = "250ms" scrape_timeout = "250ms" } +// Send metrics to test server prometheus.remote_write "default" { endpoint { url = env("PROM_SERVER_URL") From 1c420f0a39693462a7c3e1269266e6152c0265c3 Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Fri, 20 Oct 2023 16:57:30 +0100 Subject: [PATCH 10/11] simplify with before fn --- .../pipelinetests/internal/framework/framework.go | 7 +++++++ .../internal/framework/runtime_context.go | 8 ++++++-- cmd/internal/pipelinetests/loki_test.go | 11 ++++------- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/cmd/internal/pipelinetests/internal/framework/framework.go b/cmd/internal/pipelinetests/internal/framework/framework.go index 7f7062791e99..04f136989c8f 100644 --- a/cmd/internal/pipelinetests/internal/framework/framework.go +++ b/cmd/internal/pipelinetests/internal/framework/framework.go @@ -21,6 +21,9 @@ const ( type PipelineTest struct { // ConfigFile is the path to the config file to be used for the test. ConfigFile string + // Before is a function that will be called once, right after the agent is started, but before we start checking + // for the assertions. Can be nil. + Before func(context *RuntimeContext) // EventuallyAssert is a function that will be called after the agent has started, repeatedly until all assertions // are satisfied or the default timeout is reached. The provided context contains all the extra information that // the framework has collected, such as data received by the fake prometheus server. @@ -70,6 +73,10 @@ func (p PipelineTest) RunTest(t *testing.T) { doneErr := make(chan error) go func() { doneErr <- cmd.ExecuteContext(ctx) }() + if p.Before != nil { + p.Before(agentRuntimeCtx) + } + assertionsDone := make(chan struct{}) go func() { if p.EventuallyAssert != nil { diff --git a/cmd/internal/pipelinetests/internal/framework/runtime_context.go b/cmd/internal/pipelinetests/internal/framework/runtime_context.go index 5fa1519d1f33..4a5ba65e17bd 100644 --- a/cmd/internal/pipelinetests/internal/framework/runtime_context.go +++ b/cmd/internal/pipelinetests/internal/framework/runtime_context.go @@ -11,10 +11,14 @@ import ( ) type RuntimeContext struct { - AgentPort int + // AgentPort is the port the agent's HTTP server is listening on. + AgentPort int + // DataSentToProm is a collection of data sent to the fake test Prometheus server. DataSentToProm *PromData + // DataSentToLoki is a collection of data sent to the fake test Loki server. DataSentToLoki *LokiData - TestTarget *TestTarget + // TestTarget is a fake test target that can be used to expose metrics that can be scraped by the agent. + TestTarget *TestTarget } func newAgentRuntimeContext(t *testing.T) (*RuntimeContext, func()) { diff --git a/cmd/internal/pipelinetests/loki_test.go b/cmd/internal/pipelinetests/loki_test.go index 84a41a0e6d0f..839ddb76b264 100644 --- a/cmd/internal/pipelinetests/loki_test.go +++ b/cmd/internal/pipelinetests/loki_test.go @@ -43,19 +43,16 @@ func TestPipeline_Loki_APILogsWrite(topT *testing.T) { Entry: logproto.Entry{Timestamp: time.Now(), Line: "hello world!"}, } - logLineSent := false - framework.PipelineTest{ ConfigFile: "testdata/loki_source_api_write.river", Environment: map[string]string{ "API_SERVER_PORT": fmt.Sprintf("%d", apiServerPort), }, + Before: func(context *framework.RuntimeContext) { + // Send the line first + lokiClient.Chan() <- testLogEntry + }, EventuallyAssert: func(t *assert.CollectT, context *framework.RuntimeContext) { - // Send the line if not yet sent - if !logLineSent { - lokiClient.Chan() <- testLogEntry - logLineSent = true - } // Verify we have received the line at the other end of the pipeline line, labels := context.DataSentToLoki.FindLineContaining("hello world!") assert.NotNil(t, line) From 08160c6fd3b19a106a624c60633bb437ecd81c80 Mon Sep 17 00:00:00 2001 From: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> Date: Tue, 19 Mar 2024 14:07:47 +0000 Subject: [PATCH 11/11] fix after rebase --- cmd/internal/pipelinetests/internal/framework/framework.go | 2 +- cmd/internal/pipelinetests/loki_test.go | 4 ++-- .../pipelinetests/testdata/self_scrape_and_write.river | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/internal/pipelinetests/internal/framework/framework.go b/cmd/internal/pipelinetests/internal/framework/framework.go index 04f136989c8f..c8d3d2c3c809 100644 --- a/cmd/internal/pipelinetests/internal/framework/framework.go +++ b/cmd/internal/pipelinetests/internal/framework/framework.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "github.com/grafana/agent/cmd/internal/flowmode" + "github.com/grafana/agent/internal/flowmode" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/cmd/internal/pipelinetests/loki_test.go b/cmd/internal/pipelinetests/loki_test.go index 839ddb76b264..e8c2402b9fd2 100644 --- a/cmd/internal/pipelinetests/loki_test.go +++ b/cmd/internal/pipelinetests/loki_test.go @@ -8,8 +8,8 @@ import ( "github.com/go-kit/log" "github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework" - "github.com/grafana/agent/component/common/loki" - "github.com/grafana/agent/component/common/loki/client" + "github.com/grafana/agent/internal/component/common/loki" + "github.com/grafana/agent/internal/component/common/loki/client" "github.com/grafana/dskit/flagext" "github.com/grafana/loki/pkg/logproto" "github.com/phayes/freeport" diff --git a/cmd/internal/pipelinetests/testdata/self_scrape_and_write.river b/cmd/internal/pipelinetests/testdata/self_scrape_and_write.river index 3a9080077e4f..58543aa8f00a 100644 --- a/cmd/internal/pipelinetests/testdata/self_scrape_and_write.river +++ b/cmd/internal/pipelinetests/testdata/self_scrape_and_write.river @@ -4,12 +4,12 @@ logging { } // Self discover the agent -prometheus.exporter.agent "agent" { +prometheus.exporter.self "agent" { } // Add a job label discovery.relabel "add_job" { - targets = prometheus.exporter.agent.agent.targets + targets = prometheus.exporter.self.agent.targets rule { target_label = "job"