Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline tests #6726

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions cmd/internal/pipelinetests/edge_cases_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pipelinetests

import (
"testing"

"github.com/grafana/agent/cmd/internal/pipelinetests/internal/framework"
)

/*
*
//TODO(thampiotr):
- Make a test with OTEL pipeline
- Make a test with loki.process
- Make a test with relabel rules
*
*/
func TestPipeline_WithEmptyConfig(t *testing.T) {
framework.PipelineTest{
ConfigFile: "testdata/empty.river",
RequireCleanShutdown: true,
}.RunTest(t)
}

func TestPipeline_FileNotExists(t *testing.T) {
framework.PipelineTest{
ConfigFile: "does_not_exist.river",
CmdErrContains: "does_not_exist.river: no such file or directory",
RequireCleanShutdown: true,
}.RunTest(t)
}

func TestPipeline_FileInvalid(t *testing.T) {
framework.PipelineTest{
ConfigFile: "testdata/invalid.river",
CmdErrContains: "could not perform the initial load successfully",
RequireCleanShutdown: true,
}.RunTest(t)
}
128 changes: 128 additions & 0 deletions cmd/internal/pipelinetests/internal/framework/framework.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package framework

import (
"context"
"fmt"
"testing"
"time"

"github.com/grafana/agent/internal/flowmode"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
defaultTimeout = 10 * time.Second
assertionCheckInterval = 100 * time.Millisecond
shutdownTimeout = 5 * time.Second
)

type PipelineTest struct {
// ConfigFile is the path to the config file to be used for the test.
ConfigFile string
// Before is a function that will be called once, right after the agent is started, but before we start checking
// for the assertions. Can be nil.
Before func(context *RuntimeContext)
// EventuallyAssert is a function that will be called after the agent has started, repeatedly until all assertions
// are satisfied or the default timeout is reached. The provided context contains all the extra information that
// the framework has collected, such as data received by the fake prometheus server.
EventuallyAssert func(t *assert.CollectT, context *RuntimeContext)
// CmdErrContains is a string that must be contained in the error returned by the command. If empty, no error is
// expected.
CmdErrContains string
// RequireCleanShutdown indicates whether the test framework should verify that the agent shut down cleanly after
// the test case has completed.
RequireCleanShutdown bool
// Timeout is the maximum amount of time the test case is allowed to run. If 0, defaultTimeout is used.
Timeout time.Duration
// Environment is a map of environment variables to be set before running the test. It will be automatically
// cleaned. The values can be used inside the config files using the `env("ENV_VAR")` syntax.
Environment map[string]string
}

func (p PipelineTest) RunTest(t *testing.T) {
if p.Timeout == 0 {
p.Timeout = defaultTimeout
}
// Main context has some padding added to the timeout to allow for assertions error message to surface first.
ctx, cancel := context.WithTimeout(context.Background(), p.Timeout+2*assertionCheckInterval)

for k, v := range p.Environment {
cleanUp := setEnvVariable(t, k, v)
//goland:noinspection GoDeferInLoop
defer cleanUp()
}

cleanUp := setUpGlobalRegistryForTesting(prometheus.NewRegistry())
defer cleanUp()

agentRuntimeCtx, cleanUpAgent := newAgentRuntimeContext(t)
defer cleanUpAgent()

cmd := flowmode.Command()
cmd.SetArgs([]string{
"run",
p.ConfigFile,
"--server.http.listen-addr",
fmt.Sprintf("127.0.0.1:%d", agentRuntimeCtx.AgentPort),
"--storage.path",
t.TempDir(),
})

doneErr := make(chan error)
go func() { doneErr <- cmd.ExecuteContext(ctx) }()

if p.Before != nil {
p.Before(agentRuntimeCtx)
}

assertionsDone := make(chan struct{})
go func() {
if p.EventuallyAssert != nil {
require.EventuallyWithT(t, func(t *assert.CollectT) {
p.EventuallyAssert(t, agentRuntimeCtx)
}, p.Timeout, assertionCheckInterval)
}
assertionsDone <- struct{}{}
}()

select {
case <-ctx.Done():
t.Fatalf("test case failed to complete within deadline")
case <-assertionsDone:
case err := <-doneErr:
verifyDoneError(t, p, err)
cancel()
return
}

t.Log("assertion checks done, shutting down agent")
cancel()
select {
case <-time.After(shutdownTimeout):
if p.RequireCleanShutdown {
t.Fatalf("agent failed to shut down within deadline")
} else {
t.Log("agent failed to shut down within deadline")
}
case err := <-doneErr:
verifyDoneError(t, p, err)
}
}

func verifyDoneError(t *testing.T, testCase PipelineTest, err error) {
if testCase.CmdErrContains != "" {
require.ErrorContains(t, err, testCase.CmdErrContains, "command must return error containing the string specified in test case")
} else {
require.NoError(t, err)
}
}

func setUpGlobalRegistryForTesting(registry *prometheus.Registry) func() {
prevRegisterer, prevGatherer := prometheus.DefaultRegisterer, prometheus.DefaultGatherer
prometheus.DefaultRegisterer, prometheus.DefaultGatherer = registry, registry
return func() {
prometheus.DefaultRegisterer, prometheus.DefaultGatherer = prevRegisterer, prevGatherer
}
}
40 changes: 40 additions & 0 deletions cmd/internal/pipelinetests/internal/framework/lokidata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package framework

import (
"strings"
"sync"

"github.com/grafana/loki/pkg/logproto"
)

type LokiData struct {
mut sync.Mutex
lokiWrites []*logproto.PushRequest
}

func (r *LokiData) appendLokiWrite(req *logproto.PushRequest) {
r.mut.Lock()
defer r.mut.Unlock()
r.lokiWrites = append(r.lokiWrites, req)
}

func (r *LokiData) WritesCount() int {
r.mut.Lock()
defer r.mut.Unlock()
return len(r.lokiWrites)
}

func (r *LokiData) FindLineContaining(contents string) (*logproto.Entry, string) {
r.mut.Lock()
defer r.mut.Unlock()
for i := len(r.lokiWrites) - 1; i >= 0; i-- {
for _, stream := range r.lokiWrites[i].Streams {
for _, entry := range stream.Entries {
if strings.Contains(entry.Line, contents) {
return &entry, stream.Labels
}
}
}
}
return nil, ""
}
54 changes: 54 additions & 0 deletions cmd/internal/pipelinetests/internal/framework/promdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package framework

import (
"math"
"sync"

"github.com/prometheus/prometheus/prompb"
"golang.org/x/exp/maps"
)

type PromData struct {
mut sync.Mutex
promWrites []*prompb.WriteRequest
}

func (r *PromData) appendPromWrite(req *prompb.WriteRequest) {
r.mut.Lock()
defer r.mut.Unlock()
r.promWrites = append(r.promWrites, req)
}

func (r *PromData) WritesCount() int {
r.mut.Lock()
defer r.mut.Unlock()
return len(r.promWrites)
}

func (r *PromData) FindLastSampleMatching(name string, labelsKV ...string) float64 {
labelsMap := make(map[string]string)
for i := 0; i < len(labelsKV); i += 2 {
labelsMap[labelsKV[i]] = labelsKV[i+1]
}
labelsMap["__name__"] = name
r.mut.Lock()
defer r.mut.Unlock()
// start from the end to find the most recent Timeseries
for i := len(r.promWrites) - 1; i >= 0; i-- {
for _, ts := range r.promWrites[i].Timeseries {
// toMatch is a copy of labelsMap that we will remove labels from as we find matches
toMatch := maps.Clone(labelsMap)
for _, label := range ts.Labels {
val, ok := toMatch[label.Name]
if ok && val == label.Value {
delete(toMatch, label.Name)
}
}
foundMatch := len(toMatch) == 0
if foundMatch && len(ts.Samples) > 0 {
return ts.Samples[len(ts.Samples)-1].Value
}
}
}
return math.NaN()
}
61 changes: 61 additions & 0 deletions cmd/internal/pipelinetests/internal/framework/runtime_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package framework

import (
"fmt"
"net"
"os"
"testing"

"github.com/phayes/freeport"
"github.com/stretchr/testify/require"
)

type RuntimeContext struct {
// AgentPort is the port the agent's HTTP server is listening on.
AgentPort int
// DataSentToProm is a collection of data sent to the fake test Prometheus server.
DataSentToProm *PromData
// DataSentToLoki is a collection of data sent to the fake test Loki server.
DataSentToLoki *LokiData
// TestTarget is a fake test target that can be used to expose metrics that can be scraped by the agent.
TestTarget *TestTarget
}

func newAgentRuntimeContext(t *testing.T) (*RuntimeContext, func()) {
agentPort, err := freeport.GetFreePort()
require.NoError(t, err)
cleanAgentPortVar := setEnvVariable(t, "AGENT_SELF_HTTP_PORT", fmt.Sprintf("%d", agentPort))

testTarget := newTestTarget()
cleanTestTargetVar := setEnvVariable(t, "TEST_TARGET_ADDRESS", fmt.Sprintf("127.0.0.1:%d", testTarget.server.Listener.Addr().(*net.TCPAddr).Port))

agentRuntimeCtx := &RuntimeContext{
AgentPort: agentPort,
DataSentToProm: &PromData{},
DataSentToLoki: &LokiData{},
TestTarget: testTarget,
}

promServer := newTestPromServer(agentRuntimeCtx.DataSentToProm.appendPromWrite)
cleanPromServerVar := setEnvVariable(t, "PROM_SERVER_URL", fmt.Sprintf("%s/api/v1/write", promServer.URL))

lokiServer := newTestLokiServer(agentRuntimeCtx.DataSentToLoki.appendLokiWrite)
cleanLokiServerVar := setEnvVariable(t, "LOKI_SERVER_URL", lokiServer.URL)

return agentRuntimeCtx, func() {
cleanAgentPortVar()
testTarget.server.Close()
cleanTestTargetVar()
promServer.Close()
cleanPromServerVar()
lokiServer.Close()
cleanLokiServerVar()
}
}

func setEnvVariable(t *testing.T, key, value string) func() {
require.NoError(t, os.Setenv(key, value))
return func() {
require.NoError(t, os.Unsetenv(key))
}
}
23 changes: 23 additions & 0 deletions cmd/internal/pipelinetests/internal/framework/testloki.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package framework

import (
"context"
"math"
"net/http"
"net/http/httptest"

"github.com/grafana/loki/pkg/logproto"
loki_util "github.com/grafana/loki/pkg/util"
)

func newTestLokiServer(onWrite func(*logproto.PushRequest)) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var pushReq logproto.PushRequest
err := loki_util.ParseProtoReader(context.Background(), r.Body, int(r.ContentLength), math.MaxInt32, &pushReq, loki_util.RawSnappy)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
onWrite(&pushReq)
}))
}
20 changes: 20 additions & 0 deletions cmd/internal/pipelinetests/internal/framework/testprom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package framework

import (
"net/http"
"net/http/httptest"

"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
)

func newTestPromServer(onWrite func(*prompb.WriteRequest)) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
req, err := remote.DecodeWriteRequest(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
onWrite(req)
}))
}
Loading
Loading