Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/querytee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"

"github.com/grafana/loki/v3/tools/querytee/comparator"

loki_tracing "github.com/grafana/loki/v3/pkg/tracing"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/tools/querytee"
Expand Down Expand Up @@ -85,7 +87,7 @@ func exit(code int) {
}

func lokiReadRoutes(cfg Config) []querytee.Route {
samplesComparator := querytee.NewSamplesComparator(querytee.SampleComparisonOptions{
samplesComparator := comparator.NewSamplesComparator(comparator.SampleComparisonOptions{
Tolerance: cfg.ProxyConfig.ValueComparisonTolerance,
UseRelativeError: cfg.ProxyConfig.UseRelativeError,
SkipRecentSamples: cfg.ProxyConfig.SkipRecentSamples,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package querytee
package comparator

import (
"encoding/json"
Expand All @@ -8,6 +8,7 @@ import (
"time"

"github.com/go-kit/log/level"

jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
Expand All @@ -16,6 +17,15 @@ import (
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

type ResponsesComparator interface {
Compare(expected, actual []byte, queryEvaluationTime time.Time) (*ComparisonSummary, error)
}

type ComparisonSummary struct {
Skipped bool
MissingMetrics int
}

// SamplesComparatorFunc helps with comparing different types of samples coming from /api/v1/query and /api/v1/query_range routes.
type SamplesComparatorFunc func(expected, actual json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error)

Expand Down Expand Up @@ -122,7 +132,7 @@ func compareMatrix(expectedRaw, actualRaw json.RawMessage, evaluationTime time.T

// If both matrices are empty after filtering, we can skip comparison
if len(expected) == 0 && len(actual) == 0 {
return &ComparisonSummary{skipped: true}, nil
return &ComparisonSummary{Skipped: true}, nil
}

if len(expected) != len(actual) {
Expand Down Expand Up @@ -233,7 +243,7 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, evaluationTime time.T
}

if len(expected) == 0 && len(actual) == 0 {
return &ComparisonSummary{skipped: true}, nil
return &ComparisonSummary{Skipped: true}, nil
}

if len(expected) != len(actual) {
Expand Down Expand Up @@ -279,7 +289,7 @@ func compareVector(expectedRaw, actualRaw json.RawMessage, evaluationTime time.T
err = fmt.Errorf("expected metric(s) [%s] missing from actual response", b.String())
}

return &ComparisonSummary{missingMetrics: len(missingMetrics)}, err
return &ComparisonSummary{MissingMetrics: len(missingMetrics)}, err
}

func compareScalar(expectedRaw, actualRaw json.RawMessage, evaluationTime time.Time, opts SampleComparisonOptions) (*ComparisonSummary, error) {
Expand All @@ -295,7 +305,7 @@ func compareScalar(expectedRaw, actualRaw json.RawMessage, evaluationTime time.T
}

if opts.SkipSample(expected.Timestamp.Time(), evaluationTime) && opts.SkipSample(actual.Timestamp.Time(), evaluationTime) {
return &ComparisonSummary{skipped: true}, nil
return &ComparisonSummary{Skipped: true}, nil
}

return nil, compareSamplePair(model.SamplePair{
Expand Down Expand Up @@ -359,7 +369,7 @@ func compareStreams(expectedRaw, actualRaw json.RawMessage, evaluationTime time.

// If both streams are empty after filtering, we can skip comparison
if len(expected) == 0 && len(actual) == 0 {
return &ComparisonSummary{skipped: true}, nil
return &ComparisonSummary{Skipped: true}, nil
}

if len(expected) != len(actual) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package querytee
package comparator

import (
"encoding/json"
Expand Down Expand Up @@ -693,7 +693,7 @@ func TestCompareStreams_SamplesOutsideComparableWindow(t *testing.T) {
if tc.err == nil {
require.NoError(t, err)
if summary != nil {
require.True(t, summary.skipped)
require.True(t, summary.Skipped)
}
return
}
Expand Down
22 changes: 20 additions & 2 deletions tools/querytee/goldfish/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"time"

"github.com/grafana/loki/v3/pkg/goldfish"
"github.com/grafana/loki/v3/tools/querytee/comparator"
)

// CompareResponses compares performance statistics and hashes from QuerySample
func CompareResponses(sample *goldfish.QuerySample, performanceTolerance float64) goldfish.ComparisonResult {
func CompareResponses(sample *goldfish.QuerySample, cellAResp, cellBResp *ResponseData, performanceTolerance float64, comparator comparator.ResponsesComparator) goldfish.ComparisonResult {
result := goldfish.ComparisonResult{
CorrelationID: sample.CorrelationID,
DifferenceDetails: make(map[string]any),
Expand Down Expand Up @@ -48,11 +49,28 @@ func CompareResponses(sample *goldfish.QuerySample, performanceTolerance float64

case sample.CellAResponseHash != sample.CellBResponseHash:
// Both returned 200 but with different content
result.ComparisonStatus = goldfish.ComparisonStatusMismatch

result.DifferenceDetails["content_hash"] = map[string]any{
"cell_a": sample.CellAResponseHash,
"cell_b": sample.CellBResponseHash,
}

if cellAResp != nil && cellBResp != nil && comparator != nil {
// we don't know the structure of the data, or the datatype.
// there is a chance the data is floating point numbers that differ within tolerance
// and so have different hashes but are "equivalent" within tolerance.
// it is also possible we match some other unexpected cases
// where the hashes differ but the data is equivalent within tolerance (empty matrix?)

_, err := comparator.Compare(cellAResp.Body, cellBResp.Body, time.Time{})
if err == nil {
result.ComparisonStatus = goldfish.ComparisonStatusMatch
result.DifferenceDetails["tolerance_match"] = true
return result
}
}

result.ComparisonStatus = goldfish.ComparisonStatusMismatch
return result

default:
Expand Down
6 changes: 3 additions & 3 deletions tools/querytee/goldfish/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestComparator_CompareResponses(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := CompareResponses(tt.sample, testTolerance)
result := CompareResponses(tt.sample, nil, nil, testTolerance, nil)
assert.Equal(t, tt.expectedStatus, result.ComparisonStatus)

for _, expectedDiff := range tt.expectedDiffs {
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestCompareResponses_StatusCodes(t *testing.T) {
}

testTolerance := 0.1 // 10% tolerance for tests
result := CompareResponses(sample, testTolerance)
result := CompareResponses(sample, nil, nil, testTolerance, nil)

if tt.cellAStatus == 200 && tt.cellBStatus == 200 {
// For 200 status codes, result depends on hash comparison
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestCompareResponses_ConfigurableTolerance(t *testing.T) {
},
}

result := CompareResponses(sample, tt.tolerance)
result := CompareResponses(sample, nil, nil, tt.tolerance, nil)

// The comparison should always be a match (same hash)
assert.Equal(t, goldfish.ComparisonStatusMatch, result.ComparisonStatus)
Expand Down
116 changes: 113 additions & 3 deletions tools/querytee/goldfish/end_to_end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ import (

"github.com/go-kit/log"
"github.com/grafana/loki/v3/pkg/goldfish"
"github.com/grafana/loki/v3/tools/querytee/comparator"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// add a helper function to create a SamplesComparator with default tolerance for tests
func testComparator() comparator.SamplesComparator {
return *comparator.NewSamplesComparator(comparator.SampleComparisonOptions{Tolerance: 0.000001})
}

func TestGoldfishEndToEnd(t *testing.T) {
// This test demonstrates the full flow of Goldfish functionality

Expand All @@ -32,7 +38,7 @@ func TestGoldfishEndToEnd(t *testing.T) {
storage := &mockStorage{}

// Create manager
manager, err := NewManager(config, storage, nil, log.NewNopLogger(), prometheus.NewRegistry())
manager, err := NewManager(config, testComparator(), storage, nil, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
defer manager.Close()

Expand Down Expand Up @@ -177,7 +183,7 @@ func TestGoldfishMismatchDetection(t *testing.T) {
}

storage := &mockStorage{}
manager, err := NewManager(config, storage, nil, log.NewNopLogger(), prometheus.NewRegistry())
manager, err := NewManager(config, testComparator(), storage, nil, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
defer manager.Close()

Expand Down Expand Up @@ -278,6 +284,110 @@ func TestGoldfishMismatchDetection(t *testing.T) {
assert.Contains(t, result.DifferenceDetails, "content_hash")
}

func TestGoldfishFloatingPointMismatchDetection(t *testing.T) {
config := Config{
Enabled: true,
SamplingConfig: SamplingConfig{
DefaultRate: 1.0,
},
}

storage := &mockStorage{}
manager, err := NewManager(config, testComparator(), storage, nil, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
defer manager.Close()

req := httptest.NewRequest("GET", "/loki/api/v1/query_range?query={job=\"test\"}", nil)
req.Header.Set("X-Scope-OrgID", "tenant1")

// Different log lines between cells - this will produce different hashes
responseBodyA := []byte(`{
"status": "success",
"data": {
"resultType": "scalar",
"result": [1, "0.003333333333"],
"stats": {
"summary": {
"execTime": 0.05,
"queueTime": 0.01,
"totalBytesProcessed": 500,
"totalLinesProcessed": 1,
"bytesProcessedPerSecond": 10000,
"linesProcessedPerSecond": 20,
"totalEntriesReturned": 1,
"splits": 1,
"shards": 1
}
}
}
}`)

responseBodyB := []byte(`{
"status": "success",
"data": {
"resultType": "scalar",
"result": [1, "0.0033333333335"],
"stats": {
"summary": {
"execTime": 0.05,
"queueTime": 0.01,
"totalBytesProcessed": 500,
"totalLinesProcessed": 1,
"bytesProcessedPerSecond": 10000,
"linesProcessedPerSecond": 20,
"totalEntriesReturned": 1,
"splits": 1,
"shards": 1
}
}
}
}`)

// Extract stats for both responses
extractor := NewStatsExtractor()

statsA, hashA, sizeA, usedNewEngineA, err := extractor.ExtractResponseData(responseBodyA, 50)
require.NoError(t, err)

statsB, hashB, sizeB, usedNewEngineB, err := extractor.ExtractResponseData(responseBodyB, 50)
require.NoError(t, err)

cellAResp := &ResponseData{
Body: responseBodyA,
StatusCode: 200,
Duration: 50 * time.Millisecond,
Stats: statsA,
Hash: hashA,
Size: sizeA,
UsedNewEngine: usedNewEngineA,
}

cellBResp := &ResponseData{
Body: responseBodyB,
StatusCode: 200,
Duration: 50 * time.Millisecond,
Stats: statsB,
Hash: hashB,
Size: sizeB,
UsedNewEngine: usedNewEngineB,
}

ctx := context.Background()
manager.ProcessQueryPair(ctx, req, cellAResp, cellBResp)

time.Sleep(100 * time.Millisecond)

assert.Len(t, storage.results, 1)
result := storage.results[0]

// Verify that different content produces different hashes
assert.NotEqual(t, hashA, hashB, "Different content should produce different hashes")

// Verify that floating-point difference within tolerance is considered a match
assert.Equal(t, goldfish.ComparisonStatusMatch, result.ComparisonStatus, "Floating-point difference within tolerance should be a match")
assert.Equal(t, result.DifferenceDetails["tolerance_match"], true, "A flag indicating this was a tolerance based match should be present")
}

func TestGoldfishNewEngineDetection(t *testing.T) {
config := Config{
Enabled: true,
Expand All @@ -287,7 +397,7 @@ func TestGoldfishNewEngineDetection(t *testing.T) {
}

storage := &mockStorage{}
manager, err := NewManager(config, storage, nil, log.NewNopLogger(), prometheus.NewRegistry())
manager, err := NewManager(config, testComparator(), storage, nil, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
defer manager.Close()

Expand Down
7 changes: 5 additions & 2 deletions tools/querytee/goldfish/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/loki/v3/pkg/goldfish"
"github.com/grafana/loki/v3/tools/querytee/comparator"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand All @@ -36,6 +37,7 @@ type Manager struct {
resultStore ResultStore
logger log.Logger
metrics *metrics
comparator comparator.SamplesComparator
}

type metrics struct {
Expand All @@ -48,7 +50,7 @@ type metrics struct {

// NewManager creates a new Goldfish manager with the provided configuration.
// Returns an error if the configuration is invalid.
func NewManager(config Config, storage goldfish.Storage, resultStore ResultStore, logger log.Logger, registerer prometheus.Registerer) (*Manager, error) {
func NewManager(config Config, comparator comparator.SamplesComparator, storage goldfish.Storage, resultStore ResultStore, logger log.Logger, registerer prometheus.Registerer) (*Manager, error) {
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
Expand Down Expand Up @@ -82,6 +84,7 @@ func NewManager(config Config, storage goldfish.Storage, resultStore ResultStore
Buckets: prometheus.DefBuckets,
}),
},
comparator: comparator,
}

return m, nil
Expand Down Expand Up @@ -159,7 +162,7 @@ func (m *Manager) ProcessQueryPair(ctx context.Context, req *http.Request, cellA
m.metrics.sampledQueries.Inc()

comparisonStart := time.Now()
result := CompareResponses(sample, m.config.PerformanceTolerance)
result := CompareResponses(sample, cellAResp, cellBResp, m.config.PerformanceTolerance, &m.comparator)
m.metrics.comparisonDuration.Observe(time.Since(comparisonStart).Seconds())
m.metrics.comparisonResults.WithLabelValues(string(result.ComparisonStatus)).Inc()

Expand Down
Loading