Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query-tee: Add an option to shift the query times for comparison #9319

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ProxyConfig struct {
PreferredBackend string
BackendReadTimeout time.Duration
CompareResponses bool
ShiftComparisonQueriesBy time.Duration
LogSlowQueryResponseThreshold time.Duration
ValueComparisonTolerance float64
UseRelativeError bool
Expand All @@ -43,6 +44,7 @@ type ProxyConfig struct {
BackendSkipTLSVerify bool
AddMissingTimeParamToInstantQueries bool
SecondaryBackendsRequestProportion float64
ShiftComparisonSamplingRatio float64
}

func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -69,6 +71,9 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.PassThroughNonRegisteredRoutes, "proxy.passthrough-non-registered-routes", false, "Passthrough requests for non-registered routes to preferred backend.")
f.BoolVar(&cfg.AddMissingTimeParamToInstantQueries, "proxy.add-missing-time-parameter-to-instant-queries", true, "Add a 'time' parameter to proxied instant query requests if they do not have one.")
f.Float64Var(&cfg.SecondaryBackendsRequestProportion, "proxy.secondary-backends-request-proportion", 1.0, "Proportion of requests to send to secondary backends. Must be between 0 and 1 (inclusive), and if not 1, then -backend.preferred must be set.")
f.DurationVar(&cfg.ShiftComparisonQueriesBy, "proxy.shift-comparison-queries-by", 0, "Shift the timestamps of the queries by the given duration before querying and comparing them. This will still do the query for the preferred backend with the original timestamps but do another query with shifted timestamps for comparison.")
// Defaulted to 0 to avoid mistakes of not setting this correctly and overloading the store-gateways with shifted queries.
f.Float64Var(&cfg.ShiftComparisonSamplingRatio, "proxy.shift-comparison-sampling-ratio", 0, "Ratio of queries for which query times are shifted based on proxy.shift-comparison-queries-by config, sampled randomly. Must be between 0 and 1 (inclusive).")
codesome marked this conversation as resolved.
Show resolved Hide resolved
}

type Route struct {
Expand Down Expand Up @@ -232,7 +237,7 @@ func (p *Proxy) Start() error {
if p.cfg.CompareResponses {
comparator = route.ResponseComparator
}
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route, p.metrics, p.logger, comparator, p.cfg.LogSlowQueryResponseThreshold, p.cfg.SecondaryBackendsRequestProportion))
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route, p.metrics, p.logger, comparator, p.cfg))
}

if p.cfg.PassThroughNonRegisteredRoutes {
Expand Down
222 changes: 154 additions & 68 deletions tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go/ext"

"github.com/grafana/mimir/pkg/frontend/querymiddleware"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

Expand All @@ -28,20 +29,19 @@ type ResponsesComparator interface {
}

type ProxyEndpoint struct {
backends []ProxyBackendInterface
metrics *ProxyMetrics
logger log.Logger
comparator ResponsesComparator
slowResponseThreshold time.Duration
secondaryBackendRequestProportion float64
backends []ProxyBackendInterface
metrics *ProxyMetrics
logger log.Logger
comparator ResponsesComparator
cfg ProxyConfig

// The preferred backend, if any.
preferredBackend ProxyBackendInterface

route Route
}

func NewProxyEndpoint(backends []ProxyBackendInterface, route Route, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator, slowResponseThreshold time.Duration, secondaryBackendRequestProportion float64) *ProxyEndpoint {
func NewProxyEndpoint(backends []ProxyBackendInterface, route Route, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator, cfg ProxyConfig) *ProxyEndpoint {
var preferredBackend ProxyBackendInterface
for _, backend := range backends {
if backend.Preferred() {
Expand All @@ -51,14 +51,13 @@ func NewProxyEndpoint(backends []ProxyBackendInterface, route Route, metrics *Pr
}

return &ProxyEndpoint{
backends: backends,
route: route,
metrics: metrics,
logger: logger,
comparator: comparator,
slowResponseThreshold: slowResponseThreshold,
secondaryBackendRequestProportion: secondaryBackendRequestProportion,
preferredBackend: preferredBackend,
backends: backends,
route: route,
metrics: metrics,
logger: logger,
comparator: comparator,
cfg: cfg,
preferredBackend: preferredBackend,
}
}

Expand All @@ -85,15 +84,15 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (p *ProxyEndpoint) selectBackends() []ProxyBackendInterface {
if len(p.backends) == 1 || p.secondaryBackendRequestProportion == 1.0 {
if len(p.backends) == 1 || p.cfg.SecondaryBackendsRequestProportion == 1.0 {
return p.backends
}

if p.secondaryBackendRequestProportion == 0.0 {
if p.cfg.SecondaryBackendsRequestProportion == 0.0 {
return []ProxyBackendInterface{p.preferredBackend}
}

if rand.Float64() > p.secondaryBackendRequestProportion {
if rand.Float64() > p.cfg.SecondaryBackendsRequestProportion {
return []ProxyBackendInterface{p.preferredBackend}
}

Expand All @@ -102,14 +101,11 @@ func (p *ProxyEndpoint) selectBackends() []ProxyBackendInterface {

func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []ProxyBackendInterface, resCh chan *backendResponse) {
var (
wg = sync.WaitGroup{}
err error
body []byte
responses = make([]*backendResponse, 0, len(backends))
responsesMtx = sync.Mutex{}
timingMtx = sync.Mutex{}
query = req.URL.RawQuery
logger, ctx = spanlogger.NewWithLogger(req.Context(), p.logger, "Incoming proxied request")
err error
body []byte
timingMtx = sync.Mutex{}
query = req.URL.RawQuery
logger, ctx = spanlogger.NewWithLogger(req.Context(), p.logger, "Incoming proxied request")
)

defer logger.Finish()
Expand Down Expand Up @@ -168,10 +164,12 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
fastestBackend ProxyBackendInterface
slowestDuration time.Duration
slowestBackend ProxyBackendInterface
responses = make([]*backendResponse, 0, len(backends))
responsesMtx = sync.Mutex{}
wg = sync.WaitGroup{}
)

wg.Add(len(backends))
for _, b := range backends {
spawnRequest := func(b ProxyBackendInterface, req *http.Request, body []byte, recordResponse bool) {
go func() {
defer wg.Done()

Expand All @@ -191,7 +189,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
elapsed, status, body, resp, err := b.ForwardRequest(ctx, req, bodyReader)
contentType := ""

if p.slowResponseThreshold > 0 {
if p.cfg.LogSlowQueryResponseThreshold > 0 {
timingMtx.Lock()
if elapsed > slowestDuration {
slowestDuration = elapsed
Expand Down Expand Up @@ -236,7 +234,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
logger.SetTag("status", status)

// Keep track of the response if required.
if p.comparator != nil {
if p.comparator != nil && recordResponse {
responsesMtx.Lock()
responses = append(responses, res)
responsesMtx.Unlock()
Expand All @@ -246,52 +244,140 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
}()
}

if p.comparator == nil || len(backends) != 2 {
// If we're not comparing responses, we can spawn all requests at once and simply return.
wg.Add(len(backends))
for _, b := range backends {
spawnRequest(b, req, body, false)
}
wg.Wait()
close(resCh)
return
}

// We have only 2 backends when comparing responses.

// We do one request with original timings for the primary backend.
// We then do the shifted request for both backends again and compare those responses.
codesome marked this conversation as resolved.
Show resolved Hide resolved
preferred, secondary := backends[0], backends[1]
if secondary.Preferred() {
preferred, secondary = secondary, preferred
}

var (
shiftedReq *http.Request
shiftedBody []byte
)
if p.cfg.ShiftComparisonQueriesBy > 0 && rand.Float64() < p.cfg.ShiftComparisonSamplingRatio {
codesome marked this conversation as resolved.
Show resolved Hide resolved
shiftedReq = p.shiftQueryRequest(req, p.cfg.ShiftComparisonQueriesBy)
if shiftedReq != nil && shiftedReq.Body != nil {
shiftedBody, err = io.ReadAll(shiftedReq.Body)
if err != nil {
level.Warn(logger).Log("msg", "Unable to read shifted request body", "err", err)
shiftedReq = nil
} else {
if err := shiftedReq.Body.Close(); err != nil {
level.Warn(logger).Log("msg", "Unable to close request body", "err", err)
}

shiftedReq.Body = io.NopCloser(bytes.NewReader(shiftedBody))
if err := shiftedReq.ParseForm(); err != nil {
level.Warn(logger).Log("msg", "Unable to parse form", "err", err)
}
}
}
codesome marked this conversation as resolved.
Show resolved Hide resolved
}

if shiftedReq == nil {
wg.Add(2)
spawnRequest(preferred, req, body, true)
spawnRequest(secondary, req, body, true)
} else {
wg.Add(3)
spawnRequest(preferred, req, body, false)
// TODO: verify that the user gets the response from the above query
// and the below duplicate query to the preferred backend does not
// change anything user facing.
spawnRequest(preferred, shiftedReq, shiftedBody, true)
spawnRequest(secondary, shiftedReq, shiftedBody, true)

}

// Wait until all backend requests completed.
wg.Wait()
close(resCh)

// Compare responses, but only if comparison is enabled and we ran this request against two backends.
if p.comparator != nil && len(backends) == 2 {
expectedResponse := responses[0]
actualResponse := responses[1]
if responses[1].backend.Preferred() {
expectedResponse, actualResponse = actualResponse, expectedResponse
}
// Compare responses.
expectedResponse := responses[0]
actualResponse := responses[1]
if responses[1].backend.Preferred() {
expectedResponse, actualResponse = actualResponse, expectedResponse
}

result, err := p.compareResponses(expectedResponse, actualResponse)
if result == ComparisonFailed {
level.Error(logger).Log(
"msg", "response comparison failed",
"err", err,
"expected_response_duration", expectedResponse.elapsedTime,
"actual_response_duration", actualResponse.elapsedTime,
)
} else if result == ComparisonSkipped {
level.Warn(logger).Log(
"msg", "response comparison skipped",
"err", err,
"expected_response_duration", expectedResponse.elapsedTime,
"actual_response_duration", actualResponse.elapsedTime,
)
}
result, err := p.compareResponses(expectedResponse, actualResponse)
if result == ComparisonFailed {
level.Error(logger).Log(
"msg", "response comparison failed",
"err", err,
"expected_response_duration", expectedResponse.elapsedTime,
"actual_response_duration", actualResponse.elapsedTime,
)
} else if result == ComparisonSkipped {
level.Warn(logger).Log(
"msg", "response comparison skipped",
"err", err,
"expected_response_duration", expectedResponse.elapsedTime,
"actual_response_duration", actualResponse.elapsedTime,
)
}

// Log queries that are slower in some backends than others
if p.slowResponseThreshold > 0 && slowestDuration-fastestDuration >= p.slowResponseThreshold {
level.Warn(logger).Log(
"msg", "response time difference between backends exceeded threshold",
"slowest_duration", slowestDuration,
"slowest_backend", slowestBackend.Name(),
"fastest_duration", fastestDuration,
"fastest_backend", fastestBackend.Name(),
)
}
// Log queries that are slower in some backends than others
if p.cfg.LogSlowQueryResponseThreshold > 0 && slowestDuration-fastestDuration >= p.cfg.LogSlowQueryResponseThreshold {
level.Warn(logger).Log(
"msg", "response time difference between backends exceeded threshold",
"slowest_duration", slowestDuration,
"slowest_backend", slowestBackend.Name(),
"fastest_duration", fastestDuration,
"fastest_backend", fastestBackend.Name(),
)
}

relativeDuration := actualResponse.elapsedTime - expectedResponse.elapsedTime
proportionalDurationDifference := relativeDuration.Seconds() / expectedResponse.elapsedTime.Seconds()
p.metrics.relativeDuration.WithLabelValues(p.route.RouteName).Observe(relativeDuration.Seconds())
p.metrics.proportionalDuration.WithLabelValues(p.route.RouteName).Observe(proportionalDurationDifference)
p.metrics.responsesComparedTotal.WithLabelValues(p.route.RouteName, string(result)).Inc()
if shiftedReq != nil {
p.metrics.shiftedComparisonsTotal.WithLabelValues(p.route.RouteName).Inc()
}
}

relativeDuration := actualResponse.elapsedTime - expectedResponse.elapsedTime
proportionalDurationDifference := relativeDuration.Seconds() / expectedResponse.elapsedTime.Seconds()
p.metrics.relativeDuration.WithLabelValues(p.route.RouteName).Observe(relativeDuration.Seconds())
p.metrics.proportionalDuration.WithLabelValues(p.route.RouteName).Observe(proportionalDurationDifference)
p.metrics.responsesComparedTotal.WithLabelValues(p.route.RouteName, string(result)).Inc()
// shiftQueryRequest shifts the query times of the request for instant and range queries.
// If there was any error, the error is just logged and a nil request is returned.
func (p *ProxyEndpoint) shiftQueryRequest(req *http.Request, d time.Duration) (shiftedRequest *http.Request) {
if !querymiddleware.IsRangeQuery(req.URL.Path) && !querymiddleware.IsInstantQuery(req.URL.Path) {
return
}

codec := querymiddleware.NewPrometheusCodec(nil, 5*time.Minute, "protobuf")
codesome marked this conversation as resolved.
Show resolved Hide resolved
decodedRequest, err := codec.DecodeMetricsQueryRequest(req.Context(), req)
if err != nil {
level.Error(p.logger).Log("msg", "Unable to decode request when shifting query", "err", err)
return nil
}
start := decodedRequest.GetStart() - d.Milliseconds()
end := decodedRequest.GetEnd() - d.Milliseconds()
decodedRequest, err = decodedRequest.WithStartEnd(start, end)
if err != nil {
level.Error(p.logger).Log("msg", "Unable to change times when shifting query", "err", err)
return nil
}
shiftedRequest, err = codec.EncodeMetricsQueryRequest(req.Context(), decodedRequest)
if err != nil {
level.Error(p.logger).Log("msg", "Unable to encode request when shifting query", "err", err)
return nil
}
return shiftedRequest
}

func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResponse) *backendResponse {
Expand Down
Loading
Loading