diff --git a/pkg/frontend/querymiddleware/codec.go b/pkg/frontend/querymiddleware/codec.go index 70224131ecb..53d48517fd5 100644 --- a/pkg/frontend/querymiddleware/codec.go +++ b/pkg/frontend/querymiddleware/codec.go @@ -554,9 +554,9 @@ func (c prometheusCodec) EncodeMetricsQueryRequest(ctx context.Context, r Metric u = &url.URL{ Path: r.GetPath(), RawQuery: url.Values{ - "start": []string{encodeTime(r.GetStart())}, - "end": []string{encodeTime(r.GetEnd())}, - "step": []string{encodeDurationMs(r.GetStep())}, + "start": []string{EncodeTime(r.GetStart())}, + "end": []string{EncodeTime(r.GetEnd())}, + "step": []string{EncodeDurationMs(r.GetStep())}, "query": []string{r.GetQuery()}, }.Encode(), } @@ -564,7 +564,7 @@ func (c prometheusCodec) EncodeMetricsQueryRequest(ctx context.Context, r Metric u = &url.URL{ Path: r.GetPath(), RawQuery: url.Values{ - "time": []string{encodeTime(r.GetTime())}, + "time": []string{EncodeTime(r.GetTime())}, "query": []string{r.GetQuery()}, }.Encode(), } @@ -617,10 +617,10 @@ func (c prometheusCodec) EncodeLabelsQueryRequest(ctx context.Context, req Label case *PrometheusLabelNamesQueryRequest: urlValues := url.Values{} if req.GetStart() != 0 { - urlValues["start"] = []string{encodeTime(req.Start)} + urlValues["start"] = []string{EncodeTime(req.Start)} } if req.GetEnd() != 0 { - urlValues["end"] = []string{encodeTime(req.End)} + urlValues["end"] = []string{EncodeTime(req.End)} } if len(req.GetLabelMatcherSets()) > 0 { urlValues["match[]"] = req.GetLabelMatcherSets() @@ -637,10 +637,10 @@ func (c prometheusCodec) EncodeLabelsQueryRequest(ctx context.Context, req Label // does not support accessing struct members on a typeA|typeB switch urlValues := url.Values{} if req.GetStart() != 0 { - urlValues["start"] = []string{encodeTime(req.Start)} + urlValues["start"] = []string{EncodeTime(req.Start)} } if req.GetEnd() != 0 { - urlValues["end"] = []string{encodeTime(req.End)} + urlValues["end"] = []string{EncodeTime(req.End)} } if len(req.GetLabelMatcherSets()) > 0 { urlValues["match[]"] = req.GetLabelMatcherSets() @@ -958,12 +958,12 @@ func parseDurationMs(s string) (int64, error) { return 0, apierror.Newf(apierror.TypeBadData, "cannot parse %q to a valid duration", s) } -func encodeTime(t int64) string { +func EncodeTime(t int64) string { f := float64(t) / 1.0e3 return strconv.FormatFloat(f, 'f', -1, 64) } -func encodeDurationMs(d int64) string { +func EncodeDurationMs(d int64) string { return strconv.FormatFloat(float64(d)/float64(time.Second/time.Millisecond), 'f', -1, 64) } diff --git a/pkg/frontend/querymiddleware/roundtrip.go b/pkg/frontend/querymiddleware/roundtrip.go index f6d2ed82bb6..6d61caf875e 100644 --- a/pkg/frontend/querymiddleware/roundtrip.go +++ b/pkg/frontend/querymiddleware/roundtrip.go @@ -30,8 +30,8 @@ import ( const ( day = 24 * time.Hour - queryRangePathSuffix = "/api/v1/query_range" - instantQueryPathSuffix = "/api/v1/query" + QueryRangePathSuffix = "/api/v1/query_range" + InstantQueryPathSuffix = "/api/v1/query" cardinalityLabelNamesPathSuffix = "/api/v1/cardinality/label_names" cardinalityLabelValuesPathSuffix = "/api/v1/cardinality/label_values" cardinalityActiveSeriesPathSuffix = "/api/v1/cardinality/active_series" @@ -499,11 +499,11 @@ func newQueryCountTripperware(registerer prometheus.Registerer) Tripperware { } func IsRangeQuery(path string) bool { - return strings.HasSuffix(path, queryRangePathSuffix) + return strings.HasSuffix(path, QueryRangePathSuffix) } func IsInstantQuery(path string) bool { - return strings.HasSuffix(path, instantQueryPathSuffix) + return strings.HasSuffix(path, InstantQueryPathSuffix) } func IsCardinalityQuery(path string) bool { diff --git a/pkg/frontend/querymiddleware/roundtrip_test.go b/pkg/frontend/querymiddleware/roundtrip_test.go index 679c67b01fd..bfb785ed42f 100644 --- a/pkg/frontend/querymiddleware/roundtrip_test.go +++ b/pkg/frontend/querymiddleware/roundtrip_test.go @@ -815,12 +815,12 @@ func TestTripperware_ShouldSupportReadConsistencyOffsetsInjection(t *testing.T) }{ "range query": { makeRequest: func() *http.Request { - return httptest.NewRequest("GET", queryRangePathSuffix+"?start=1536673680&end=1536716880&step=120&query=up", nil) + return httptest.NewRequest("GET", QueryRangePathSuffix+"?start=1536673680&end=1536716880&step=120&query=up", nil) }, }, "instant query": { makeRequest: func() *http.Request { - return httptest.NewRequest("GET", instantQueryPathSuffix+"?time=1536673680&query=up", nil) + return httptest.NewRequest("GET", InstantQueryPathSuffix+"?time=1536673680&query=up", nil) }, }, "cardinality label names": { diff --git a/tools/querytee/proxy.go b/tools/querytee/proxy.go index 02d1e9f43a8..75dc171cedf 100644 --- a/tools/querytee/proxy.go +++ b/tools/querytee/proxy.go @@ -34,6 +34,7 @@ type ProxyConfig struct { PreferredBackend string BackendReadTimeout time.Duration CompareResponses bool + ShiftComparisonQueriesBy time.Duration LogSlowQueryResponseThreshold time.Duration ValueComparisonTolerance float64 UseRelativeError bool @@ -43,6 +44,7 @@ type ProxyConfig struct { BackendSkipTLSVerify bool AddMissingTimeParamToInstantQueries bool SecondaryBackendsRequestProportion float64 + ShiftComparisonSamplingRatio float64 } func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { @@ -69,6 +71,9 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.PassThroughNonRegisteredRoutes, "proxy.passthrough-non-registered-routes", false, "Passthrough requests for non-registered routes to preferred backend.") f.BoolVar(&cfg.AddMissingTimeParamToInstantQueries, "proxy.add-missing-time-parameter-to-instant-queries", true, "Add a 'time' parameter to proxied instant query requests if they do not have one.") f.Float64Var(&cfg.SecondaryBackendsRequestProportion, "proxy.secondary-backends-request-proportion", 1.0, "Proportion of requests to send to secondary backends. Must be between 0 and 1 (inclusive), and if not 1, then -backend.preferred must be set.") + f.DurationVar(&cfg.ShiftComparisonQueriesBy, "proxy.shift-comparison-queries-by", 0, "Shift the timestamps of the queries by the given duration before querying and comparing them. This will still do the query for the preferred backend with the original timestamps but do another query with shifted timestamps for comparison.") + // Defaulted to 0 to avoid mistakes of not setting this correctly and overloading the store-gateways with shifted queries. + f.Float64Var(&cfg.ShiftComparisonSamplingRatio, "proxy.shift-comparison-sampling-ratio", 0, "Ratio of queries for which query times are shifted based on -proxy.shift-comparison-queries-by config, sampled randomly. Must be between 0 and 1 (inclusive).") } type Route struct { @@ -232,7 +237,7 @@ func (p *Proxy) Start() error { if p.cfg.CompareResponses { comparator = route.ResponseComparator } - router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route, p.metrics, p.logger, comparator, p.cfg.LogSlowQueryResponseThreshold, p.cfg.SecondaryBackendsRequestProportion)) + router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route, p.metrics, p.logger, comparator, p.cfg)) } if p.cfg.PassThroughNonRegisteredRoutes { diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 51ddecb1740..243ca44a5be 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -20,6 +20,8 @@ import ( "github.com/go-kit/log/level" "github.com/opentracing/opentracing-go/ext" + "github.com/grafana/mimir/pkg/frontend/querymiddleware" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -28,12 +30,11 @@ type ResponsesComparator interface { } type ProxyEndpoint struct { - backends []ProxyBackendInterface - metrics *ProxyMetrics - logger log.Logger - comparator ResponsesComparator - slowResponseThreshold time.Duration - secondaryBackendRequestProportion float64 + backends []ProxyBackendInterface + metrics *ProxyMetrics + logger log.Logger + comparator ResponsesComparator + cfg ProxyConfig // The preferred backend, if any. preferredBackend ProxyBackendInterface @@ -41,7 +42,7 @@ type ProxyEndpoint struct { route Route } -func NewProxyEndpoint(backends []ProxyBackendInterface, route Route, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator, slowResponseThreshold time.Duration, secondaryBackendRequestProportion float64) *ProxyEndpoint { +func NewProxyEndpoint(backends []ProxyBackendInterface, route Route, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator, cfg ProxyConfig) *ProxyEndpoint { var preferredBackend ProxyBackendInterface for _, backend := range backends { if backend.Preferred() { @@ -51,14 +52,13 @@ func NewProxyEndpoint(backends []ProxyBackendInterface, route Route, metrics *Pr } return &ProxyEndpoint{ - backends: backends, - route: route, - metrics: metrics, - logger: logger, - comparator: comparator, - slowResponseThreshold: slowResponseThreshold, - secondaryBackendRequestProportion: secondaryBackendRequestProportion, - preferredBackend: preferredBackend, + backends: backends, + route: route, + metrics: metrics, + logger: logger, + comparator: comparator, + cfg: cfg, + preferredBackend: preferredBackend, } } @@ -85,15 +85,15 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (p *ProxyEndpoint) selectBackends() []ProxyBackendInterface { - if len(p.backends) == 1 || p.secondaryBackendRequestProportion == 1.0 { + if len(p.backends) == 1 || p.cfg.SecondaryBackendsRequestProportion == 1.0 { return p.backends } - if p.secondaryBackendRequestProportion == 0.0 { + if p.cfg.SecondaryBackendsRequestProportion == 0.0 { return []ProxyBackendInterface{p.preferredBackend} } - if rand.Float64() > p.secondaryBackendRequestProportion { + if rand.Float64() > p.cfg.SecondaryBackendsRequestProportion { return []ProxyBackendInterface{p.preferredBackend} } @@ -102,14 +102,11 @@ func (p *ProxyEndpoint) selectBackends() []ProxyBackendInterface { func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []ProxyBackendInterface, resCh chan *backendResponse) { var ( - wg = sync.WaitGroup{} - err error - body []byte - responses = make([]*backendResponse, 0, len(backends)) - responsesMtx = sync.Mutex{} - timingMtx = sync.Mutex{} - query = req.URL.RawQuery - logger, ctx = spanlogger.NewWithLogger(req.Context(), p.logger, "Incoming proxied request") + err error + body []byte + timingMtx = sync.Mutex{} + query = req.URL.RawQuery + logger, ctx = spanlogger.NewWithLogger(req.Context(), p.logger, "Incoming proxied request") ) defer logger.Finish() @@ -168,10 +165,12 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro fastestBackend ProxyBackendInterface slowestDuration time.Duration slowestBackend ProxyBackendInterface + responses = make([]*backendResponse, 0, len(backends)) + responsesMtx = sync.Mutex{} + wg = sync.WaitGroup{} ) - wg.Add(len(backends)) - for _, b := range backends { + spawnRequest := func(b ProxyBackendInterface, req *http.Request, body []byte, recordResponse bool) { go func() { defer wg.Done() @@ -191,7 +190,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro elapsed, status, body, resp, err := b.ForwardRequest(ctx, req, bodyReader) contentType := "" - if p.slowResponseThreshold > 0 { + if p.cfg.LogSlowQueryResponseThreshold > 0 { timingMtx.Lock() if elapsed > slowestDuration { slowestDuration = elapsed @@ -236,7 +235,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro logger.SetTag("status", status) // Keep track of the response if required. - if p.comparator != nil { + if p.comparator != nil && recordResponse { responsesMtx.Lock() responses = append(responses, res) responsesMtx.Unlock() @@ -246,52 +245,141 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro }() } + if p.comparator == nil || len(backends) != 2 { + // If we're not comparing responses, we can spawn all requests at once and simply return. + wg.Add(len(backends)) + for _, b := range backends { + spawnRequest(b, req, body, false) + } + wg.Wait() + close(resCh) + return + } + + // We have only 2 backends when comparing responses. + preferred, secondary := backends[0], backends[1] + if secondary.Preferred() { + preferred, secondary = secondary, preferred + } + + var ( + shiftedReq *http.Request + shiftedBody []byte + ) + if p.cfg.ShiftComparisonQueriesBy > 0 && p.cfg.ShiftComparisonSamplingRatio > 0 && + rand.Float64() < p.cfg.ShiftComparisonSamplingRatio { + shiftedReq, shiftedBody = p.shiftQueryRequest(req, p.cfg.ShiftComparisonQueriesBy) + } + + if shiftedReq == nil { + wg.Add(2) + spawnRequest(preferred, req, body, true) + spawnRequest(secondary, req, body, true) + } else { + // We do one request with original timings for the primary backend. + // We then do the shifted request for both backends again and compare those responses. + wg.Add(1) + spawnRequest(preferred, req, body, false) + + // We wait for the original request to finish so that we do not send back + // the result from shifted queries due to any race. + wg.Wait() + + wg.Add(2) + spawnRequest(preferred, shiftedReq, shiftedBody, true) + spawnRequest(secondary, shiftedReq, shiftedBody, true) + + } + // Wait until all backend requests completed. wg.Wait() close(resCh) - // Compare responses, but only if comparison is enabled and we ran this request against two backends. - if p.comparator != nil && len(backends) == 2 { - expectedResponse := responses[0] - actualResponse := responses[1] - if responses[1].backend.Preferred() { - expectedResponse, actualResponse = actualResponse, expectedResponse - } + // Compare responses. + expectedResponse := responses[0] + actualResponse := responses[1] + if responses[1].backend.Preferred() { + expectedResponse, actualResponse = actualResponse, expectedResponse + } - result, err := p.compareResponses(expectedResponse, actualResponse) - if result == ComparisonFailed { - level.Error(logger).Log( - "msg", "response comparison failed", - "err", err, - "expected_response_duration", expectedResponse.elapsedTime, - "actual_response_duration", actualResponse.elapsedTime, - ) - } else if result == ComparisonSkipped { - level.Warn(logger).Log( - "msg", "response comparison skipped", - "err", err, - "expected_response_duration", expectedResponse.elapsedTime, - "actual_response_duration", actualResponse.elapsedTime, - ) - } + result, err := p.compareResponses(expectedResponse, actualResponse) + if result == ComparisonFailed { + level.Error(logger).Log( + "msg", "response comparison failed", + "err", err, + "expected_response_duration", expectedResponse.elapsedTime, + "actual_response_duration", actualResponse.elapsedTime, + ) + } else if result == ComparisonSkipped { + level.Warn(logger).Log( + "msg", "response comparison skipped", + "err", err, + "expected_response_duration", expectedResponse.elapsedTime, + "actual_response_duration", actualResponse.elapsedTime, + ) + } - // Log queries that are slower in some backends than others - if p.slowResponseThreshold > 0 && slowestDuration-fastestDuration >= p.slowResponseThreshold { - level.Warn(logger).Log( - "msg", "response time difference between backends exceeded threshold", - "slowest_duration", slowestDuration, - "slowest_backend", slowestBackend.Name(), - "fastest_duration", fastestDuration, - "fastest_backend", fastestBackend.Name(), - ) - } + // Log queries that are slower in some backends than others + if p.cfg.LogSlowQueryResponseThreshold > 0 && slowestDuration-fastestDuration >= p.cfg.LogSlowQueryResponseThreshold { + level.Warn(logger).Log( + "msg", "response time difference between backends exceeded threshold", + "slowest_duration", slowestDuration, + "slowest_backend", slowestBackend.Name(), + "fastest_duration", fastestDuration, + "fastest_backend", fastestBackend.Name(), + ) + } + + relativeDuration := actualResponse.elapsedTime - expectedResponse.elapsedTime + proportionalDurationDifference := relativeDuration.Seconds() / expectedResponse.elapsedTime.Seconds() + p.metrics.relativeDuration.WithLabelValues(p.route.RouteName).Observe(relativeDuration.Seconds()) + p.metrics.proportionalDuration.WithLabelValues(p.route.RouteName).Observe(proportionalDurationDifference) + p.metrics.responsesComparedTotal.WithLabelValues(p.route.RouteName, string(result)).Inc() + if shiftedReq != nil { + p.metrics.shiftedComparisonsTotal.WithLabelValues(p.route.RouteName, string(result)).Inc() + } +} + +// shiftQueryRequest shifts the query times of the request for instant and range queries. +// If there was any error, the error is just logged and a nil request is returned. +func (p *ProxyEndpoint) shiftQueryRequest(req *http.Request, d time.Duration) (shiftedRequest *http.Request, shiftedBody []byte) { + if !querymiddleware.IsRangeQuery(req.URL.Path) && !querymiddleware.IsInstantQuery(req.URL.Path) { + return + } - relativeDuration := actualResponse.elapsedTime - expectedResponse.elapsedTime - proportionalDurationDifference := relativeDuration.Seconds() / expectedResponse.elapsedTime.Seconds() - p.metrics.relativeDuration.WithLabelValues(p.route.RouteName).Observe(relativeDuration.Seconds()) - p.metrics.proportionalDuration.WithLabelValues(p.route.RouteName).Observe(proportionalDurationDifference) - p.metrics.responsesComparedTotal.WithLabelValues(p.route.RouteName, string(result)).Inc() + codec := querymiddleware.NewPrometheusCodec(nil, 5*time.Minute, "json") + decodedRequest, err := codec.DecodeMetricsQueryRequest(req.Context(), req) + if err != nil { + level.Error(p.logger).Log("msg", "Unable to decode request when shifting query", "err", err) + return nil, nil + } + start := decodedRequest.GetStart() - d.Milliseconds() + end := decodedRequest.GetEnd() - d.Milliseconds() + decodedRequest, err = decodedRequest.WithStartEnd(start, end) + if err != nil { + level.Error(p.logger).Log("msg", "Unable to change times when shifting query", "err", err) + return nil, nil + } + shiftedRequest, err = codec.EncodeMetricsQueryRequest(req.Context(), decodedRequest) + if err != nil { + level.Error(p.logger).Log("msg", "Unable to encode request when shifting query", "err", err) + return nil, nil + } + + if shiftedRequest == nil || shiftedRequest.Body == nil { + return shiftedRequest, nil + } + + shiftedBody, err = util.ReadRequestBodyWithoutConsuming(shiftedRequest) + if err != nil { + level.Warn(p.logger).Log("msg", "Unable to read shifted request body", "err", err) + shiftedRequest = nil + } else { + if err := shiftedRequest.ParseForm(); err != nil { + level.Warn(p.logger).Log("msg", "Unable to parse shifted request form", "err", err) + } } + return shiftedRequest, shiftedBody } func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResponse) *backendResponse { diff --git a/tools/querytee/proxy_endpoint_test.go b/tools/querytee/proxy_endpoint_test.go index 86f59834ee9..bb6a0f777fb 100644 --- a/tools/querytee/proxy_endpoint_test.go +++ b/tools/querytee/proxy_endpoint_test.go @@ -23,9 +23,13 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" + "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" + + "github.com/grafana/mimir/pkg/frontend/querymiddleware" + "github.com/grafana/mimir/pkg/util/test" ) func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) { @@ -115,7 +119,7 @@ func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) { testData := testData t.Run(testName, func(t *testing.T) { - endpoint := NewProxyEndpoint(testData.backends, testRoute, NewProxyMetrics(nil), log.NewNopLogger(), nil, 0, 1.0) + endpoint := NewProxyEndpoint(testData.backends, testRoute, NewProxyMetrics(nil), log.NewNopLogger(), nil, testConfig(0, 1)) // Send the responses from a dedicated goroutine. resCh := make(chan *backendResponse) @@ -160,7 +164,7 @@ func Test_ProxyEndpoint_Requests(t *testing.T) { NewProxyBackend("backend-1", backendURL1, time.Second, true, false), NewProxyBackend("backend-2", backendURL2, time.Second, false, false), } - endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(nil), log.NewNopLogger(), nil, 0, 1.0) + endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(nil), log.NewNopLogger(), nil, testConfig(0, 1)) for _, tc := range []struct { name string @@ -336,7 +340,7 @@ func Test_ProxyEndpoint_Comparison(t *testing.T) { comparisonError: scenario.comparatorError, } - endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(reg), logger, comparator, 0, 1.0) + endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(reg), logger, comparator, testConfig(0, 1)) resp := httptest.NewRecorder() req, err := http.NewRequest("GET", "http://test/api/v1/test", nil) @@ -428,8 +432,8 @@ func Test_ProxyEndpoint_LogSlowQueries(t *testing.T) { for name, scenario := range scenarios { t.Run(name, func(t *testing.T) { backends := []ProxyBackendInterface{ - newMockProxyBackend("preferred-backend", time.Second, true, []time.Duration{scenario.preferredResponseLatency}), - newMockProxyBackend("secondary-backend", time.Second, false, []time.Duration{scenario.secondaryResponseLatency}), + newMockProxyBackend("preferred-backend", time.Second, true, []time.Duration{scenario.preferredResponseLatency}, nil), + newMockProxyBackend("secondary-backend", time.Second, false, []time.Duration{scenario.secondaryResponseLatency}, nil), } logger := newMockLogger() @@ -438,7 +442,7 @@ func Test_ProxyEndpoint_LogSlowQueries(t *testing.T) { comparisonResult: ComparisonSuccess, } - endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(reg), logger, comparator, scenario.slowResponseThreshold, 1.0) + endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(reg), logger, comparator, testConfig(scenario.slowResponseThreshold, 1)) resp := httptest.NewRecorder() req, err := http.NewRequest("GET", "http://test/api/v1/test", nil) @@ -497,8 +501,8 @@ func Test_ProxyEndpoint_RelativeDurationMetric(t *testing.T) { t.Run(name, func(t *testing.T) { preferredLatencies, secondaryLatencies := splitLatencyPairs(scenario.latencyPairs) backends := []ProxyBackendInterface{ - newMockProxyBackend("preferred-backend", time.Second, true, preferredLatencies), - newMockProxyBackend("secondary-backend", time.Second, false, secondaryLatencies), + newMockProxyBackend("preferred-backend", time.Second, true, preferredLatencies, nil), + newMockProxyBackend("secondary-backend", time.Second, false, secondaryLatencies, nil), } logger := newMockLogger() @@ -507,7 +511,7 @@ func Test_ProxyEndpoint_RelativeDurationMetric(t *testing.T) { comparisonResult: ComparisonSuccess, } - endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(reg), logger, comparator, 0, 1.0) + endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(reg), logger, comparator, testConfig(0, 1)) resp := httptest.NewRecorder() req, err := http.NewRequest("GET", "http://test/api/v1/test", nil) @@ -538,6 +542,140 @@ func Test_ProxyEndpoint_RelativeDurationMetric(t *testing.T) { } } +func Test_ProxyEndpoint_ShiftedQueriesForComparison(t *testing.T) { + testRoute := Route{RouteName: "test"} + + scenarios := map[string]struct { + route string + start, end int64 + shiftBy time.Duration + }{ + "no shifts for range query": { + route: querymiddleware.QueryRangePathSuffix, + end: time.Now().UnixMilli(), + start: time.Now().Add(-time.Hour).UnixMilli(), + }, + "no shifts for instant query": { + route: querymiddleware.InstantQueryPathSuffix, + start: time.Now().UnixMilli(), + }, + "no shifts non-promql query": { + route: "/api/v1/test", + }, + "shifted range query": { + route: querymiddleware.QueryRangePathSuffix, + end: time.Now().UnixMilli(), + start: time.Now().Add(-time.Hour).UnixMilli(), + shiftBy: 4 * time.Hour, + }, + "shifted instant query": { + route: querymiddleware.InstantQueryPathSuffix, + start: time.Now().UnixMilli(), + shiftBy: 4 * time.Hour, + }, + } + + codec := querymiddleware.NewPrometheusCodec(nil, 5*time.Minute, "json") + + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + latencies := []time.Duration{time.Second, time.Second, time.Second} + var preferredReqs, secondaryReqs []*http.Request + backends := []ProxyBackendInterface{ + newMockProxyBackend("preferred-backend", time.Second, true, latencies, func(r *http.Request) { + preferredReqs = append(preferredReqs, r) + }), + newMockProxyBackend("secondary-backend", time.Second, false, latencies, func(r *http.Request) { + secondaryReqs = append(secondaryReqs, r) + }), + } + + logger := test.NewTestingLogger(t) + reg := prometheus.NewPedanticRegistry() + comparator := &mockComparator{ + comparisonResult: ComparisonSuccess, + } + + cfg := testConfig(0, 1) + cfg.ShiftComparisonQueriesBy = scenario.shiftBy + if cfg.ShiftComparisonQueriesBy > 0 { + cfg.ShiftComparisonSamplingRatio = 1 + } + endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(reg), logger, comparator, cfg) + + resp := httptest.NewRecorder() + var req *http.Request + + expr, err := parser.ParseExpr("up") + require.NoError(t, err) + switch scenario.route { + case querymiddleware.QueryRangePathSuffix: + queryReq := querymiddleware.NewPrometheusRangeQueryRequest( + "http://test"+scenario.route, nil, + scenario.start, scenario.end, time.Second.Milliseconds(), + time.Second, expr, querymiddleware.Options{}, nil) + req, err = codec.EncodeMetricsQueryRequest(context.Background(), queryReq) + require.NoError(t, err) + case querymiddleware.InstantQueryPathSuffix: + queryReq := querymiddleware.NewPrometheusInstantQueryRequest( + "http://test"+scenario.route, nil, scenario.start, + time.Second, expr, querymiddleware.Options{}, nil) + req, err = codec.EncodeMetricsQueryRequest(context.Background(), queryReq) + require.NoError(t, err) + scenario.end = scenario.start + default: + req, err = http.NewRequest("GET", "http://test"+scenario.route, http.NoBody) + require.NoError(t, err) + } + + // Do the request. + endpoint.ServeHTTP(resp, req) + // The HTTP request above will return as soon as the primary response is received, but this doesn't guarantee that the response comparison has been completed. + // Wait for the response comparison to complete before checking the logged messages. + waitForResponseComparisonMetric(t, reg, ComparisonSuccess, 1) + + got, done, err := prometheus.ToTransactionalGatherer(reg).Gather() + defer done() + require.NoError(t, err, "Failed to gather metrics from registry") + comparisons := filterMetrics(got, []string{"cortex_querytee_shifted_comparisons_total"}) + + require.Len(t, secondaryReqs, 1) + if scenario.shiftBy == 0 || (scenario.route != querymiddleware.QueryRangePathSuffix && scenario.route != querymiddleware.InstantQueryPathSuffix) { + require.Len(t, preferredReqs, 1) + require.Len(t, comparisons, 0, "Expect no comparison metric") + return + } + + require.Len(t, preferredReqs, 2) + require.Len(t, comparisons, 1, "Expect only one metric after filtering") + require.Equal(t, float64(1), comparisons[0].Metric[0].Counter.GetValue()) + + compareRequest := func(req querymiddleware.MetricsQueryRequest, shift time.Duration) { + require.Equal(t, scenario.start-shift.Milliseconds(), req.GetStart()) + require.Equal(t, scenario.end-shift.Milliseconds(), req.GetEnd()) + require.Equal(t, "up", req.GetQuery()) + } + + pr1, err := codec.DecodeMetricsQueryRequest(context.Background(), preferredReqs[0]) + require.NoError(t, err) + pr2, err := codec.DecodeMetricsQueryRequest(context.Background(), preferredReqs[1]) + require.NoError(t, err) + + if pr1.GetStart() == scenario.start { + compareRequest(pr1, 0) + compareRequest(pr2, scenario.shiftBy) + } else { + compareRequest(pr1, scenario.shiftBy) + compareRequest(pr2, 0) + } + + sr, err := codec.DecodeMetricsQueryRequest(context.Background(), secondaryReqs[0]) + require.NoError(t, err) + compareRequest(sr, scenario.shiftBy) + }) + } +} + func filterMetrics(metrics []*dto.MetricFamily, names []string) []*dto.MetricFamily { var filtered []*dto.MetricFamily for _, m := range metrics { @@ -756,14 +894,16 @@ type mockProxyBackend struct { preferred bool fakeResponseLatencies []time.Duration responseIndex int + requestCallback func(r *http.Request) } -func newMockProxyBackend(name string, timeout time.Duration, preferred bool, fakeResponseLatencies []time.Duration) ProxyBackendInterface { +func newMockProxyBackend(name string, timeout time.Duration, preferred bool, fakeResponseLatencies []time.Duration, requestCallback func(r *http.Request)) ProxyBackendInterface { return &mockProxyBackend{ name: name, timeout: timeout, preferred: preferred, fakeResponseLatencies: fakeResponseLatencies, + requestCallback: requestCallback, } } @@ -779,7 +919,10 @@ func (b *mockProxyBackend) Preferred() bool { return b.preferred } -func (b *mockProxyBackend) ForwardRequest(_ context.Context, _ *http.Request, _ io.ReadCloser) (time.Duration, int, []byte, *http.Response, error) { +func (b *mockProxyBackend) ForwardRequest(_ context.Context, req *http.Request, _ io.ReadCloser) (time.Duration, int, []byte, *http.Response, error) { + if b.requestCallback != nil { + b.requestCallback(req) + } resp := &http.Response{ StatusCode: 200, Header: make(http.Header), @@ -804,37 +947,37 @@ func TestProxyEndpoint_BackendSelection(t *testing.T) { expectedPreferredOnlySelectionCount int // Out of 1000 runs }{ "single preferred backend, secondary request proportion 0.0": { - backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, true, nil)}, + backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, true, nil, nil)}, secondaryBackendRequestProportion: 0.0, expectedPreferredOnlySelectionCount: runCount, }, "single preferred backend, secondary request proportion 1.0": { - backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, true, nil)}, + backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, true, nil, nil)}, secondaryBackendRequestProportion: 1.0, expectedPreferredOnlySelectionCount: runCount, }, "single non-preferred backend, secondary request proportion 0.0": { - backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, false, nil)}, + backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, false, nil, nil)}, secondaryBackendRequestProportion: 0.0, expectedPreferredOnlySelectionCount: runCount, }, "single non-preferred backend, secondary request proportion 1.0": { - backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, false, nil)}, + backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, false, nil, nil)}, secondaryBackendRequestProportion: 1.0, expectedPreferredOnlySelectionCount: runCount, }, "multiple backends, secondary request proportion 0.0": { - backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, true, nil), newMockProxyBackend("non-preferred-backend", 0, false, nil)}, + backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, true, nil, nil), newMockProxyBackend("non-preferred-backend", 0, false, nil, nil)}, secondaryBackendRequestProportion: 0.0, expectedPreferredOnlySelectionCount: runCount, }, "multiple backends, secondary request proportion 0.2": { - backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, true, nil), newMockProxyBackend("non-preferred-backend", 0, false, nil)}, + backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, true, nil, nil), newMockProxyBackend("non-preferred-backend", 0, false, nil, nil)}, secondaryBackendRequestProportion: 0.2, expectedPreferredOnlySelectionCount: 800, }, "multiple backends, secondary request proportion 1.0": { - backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, true, nil), newMockProxyBackend("non-preferred-backend", 0, false, nil)}, + backends: []ProxyBackendInterface{newMockProxyBackend("preferred-backend", 0, true, nil, nil), newMockProxyBackend("non-preferred-backend", 0, false, nil, nil)}, secondaryBackendRequestProportion: 1.0, expectedPreferredOnlySelectionCount: 0, }, @@ -842,7 +985,7 @@ func TestProxyEndpoint_BackendSelection(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - proxyEndpoint := NewProxyEndpoint(testCase.backends, Route{}, nil, nil, nil, 0, testCase.secondaryBackendRequestProportion) + proxyEndpoint := NewProxyEndpoint(testCase.backends, Route{}, nil, nil, nil, testConfig(0, testCase.secondaryBackendRequestProportion)) preferredOnlySelectionCount := 0 for i := 0; i < runCount; i++ { @@ -866,3 +1009,10 @@ func TestProxyEndpoint_BackendSelection(t *testing.T) { }) } } + +func testConfig(slowQueryResponseThreshold time.Duration, secondaryBackendsRequestProportion float64) ProxyConfig { + return ProxyConfig{ + LogSlowQueryResponseThreshold: slowQueryResponseThreshold, + SecondaryBackendsRequestProportion: secondaryBackendsRequestProportion, + } +} diff --git a/tools/querytee/proxy_metrics.go b/tools/querytee/proxy_metrics.go index 7fec69c2890..33cf13751d4 100644 --- a/tools/querytee/proxy_metrics.go +++ b/tools/querytee/proxy_metrics.go @@ -20,11 +20,12 @@ const ( type ComparisonResult string type ProxyMetrics struct { - requestDuration *prometheus.HistogramVec - responsesTotal *prometheus.CounterVec - responsesComparedTotal *prometheus.CounterVec - relativeDuration *prometheus.HistogramVec - proportionalDuration *prometheus.HistogramVec + requestDuration *prometheus.HistogramVec + responsesTotal *prometheus.CounterVec + responsesComparedTotal *prometheus.CounterVec + shiftedComparisonsTotal *prometheus.CounterVec + relativeDuration *prometheus.HistogramVec + proportionalDuration *prometheus.HistogramVec } func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics { @@ -45,6 +46,11 @@ func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics { Name: "responses_compared_total", Help: "Total number of responses compared per route name by result.", }, []string{"route", "result"}), + shiftedComparisonsTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: queryTeeMetricsNamespace, + Name: "shifted_comparisons_total", + Help: "Total number of responses compared per route name by result.", + }, []string{"route", "result"}), relativeDuration: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ Namespace: queryTeeMetricsNamespace, Name: "backend_response_relative_duration_seconds",