From 3f8bb1b571a141a6e0da98baf1168514dc35c232 Mon Sep 17 00:00:00 2001 From: "Marcelo E. Magallon" Date: Wed, 3 Jul 2024 15:26:05 -0600 Subject: [PATCH] Try to align runs with frequency When the agent restarts, we might end up in a situation where we are running a check too often. For example, if a check is configured to run once every 10 minutes, and the check ran 1 minute ago, if we run immediately, we will have two executions within that 10 minute window. Since we have to publish samples once every two minutes, we cannot wait for 9 minutes, because we end up with a gap. Instead, do run immediately to avoid that. But if the check ran 9 minutes ago, we can align with the expectation by waiting for 1 minute instead of a random value. Presumably the check ran 9 minutes ago, and the sample was replicated 7, 5, 3 and 1 minutes ago. If we wait for 1 minute, we would end up running the check when it was expected to run. In order to actually fix this issue the agents would have to persist data across runs. It might be possible to do this by offloading publishing to another service. Fixes: #739 Signed-off-by: Marcelo E. Magallon --- internal/scraper/scraper.go | 52 ++++++++++++--- internal/scraper/scraper_test.go | 110 +++++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 8 deletions(-) diff --git a/internal/scraper/scraper.go b/internal/scraper/scraper.go index c491e61b5..b56c47b36 100644 --- a/internal/scraper/scraper.go +++ b/internal/scraper/scraper.go @@ -216,14 +216,8 @@ func (s *Scraper) Run(ctx context.Context) { // TODO(mem): keep count of the number of successive errors and // collect logs if threshold is reached. - var ( - frequency = ms(s.check.Frequency) - offset = ms(s.check.Offset) - ) - - if offset == 0 { - offset = randDuration(min(frequency, maxPublishInterval)) - } + frequency := ms(s.check.Frequency) + offset := computeOffset(ms(s.check.Offset), frequency, timeFromNs(s.check.Created), time.Now()) scrapeHandler := scrapeHandler{scraper: s} @@ -334,6 +328,48 @@ func ms(n int64) time.Duration { return time.Duration(n) * time.Millisecond } +func timeFromNs(ns float64) time.Time { + sec := int64(math.Floor(ns / 1e9)) + nsec := int64(math.Mod(ns, 1e9)) + return time.Unix(sec, nsec) +} + +func computeOffset(offset, frequency time.Duration, t0, now time.Time) time.Duration { + if now.Sub(t0) < frequency { + // The check was created less than the frequency ago, we should + // starting running it right away. + if offset != 0 { + return offset + } + + return randDuration(min(frequency, maxPublishInterval)) + } + + // The check was created more than the frequency ago, so we need to + // compute the time until the next time the check should run. + // + // Compute the number of runs since t0, add one for the next run and + // multiply by the frequency in order to obtain its timestamp. Finally, + // compute the remaining time until that timestamp. + + runs := (now.UnixMilli() - t0.UnixMilli()) / frequency.Milliseconds() + + timeUntilNextRun := t0.Add(time.Duration(runs+1) * frequency).Sub(now) + + if timeUntilNextRun <= maxPublishInterval { + return timeUntilNextRun + } + + // The reason why we need to ignore the computed offset is that the + // check ran in the past, and it's possible that it was filling the + // data with repeated samples that we no longer have access to. We + // cannot wait until the next run because that might be a long time + // from now, creating a gap in the data. Instead we wait for a random + // value that avoids creating gaps (assuming the last published sample + // was recent). + return randDuration(maxPublishInterval) +} + func randDuration(d time.Duration) time.Duration { return time.Duration(rand.Int63n(int64(d))) } diff --git a/internal/scraper/scraper_test.go b/internal/scraper/scraper_test.go index 9734a1711..c45f1f602 100644 --- a/internal/scraper/scraper_test.go +++ b/internal/scraper/scraper_test.go @@ -1991,3 +1991,113 @@ func TestTickWithOffset(t *testing.T) { }) } } + +func TestTimeFromNs(t *testing.T) { + testcases := map[string]struct { + ns float64 + expected time.Time + }{ + "zero": { + ns: 0, + expected: time.Unix(0, 0), + }, + "one": { + ns: 1, + expected: time.Unix(0, 1), + }, + "2020-01-01 00:00:00.000000000": { + ns: 1577836800 * 1e9, + expected: time.Unix(1577836800, 0), + }, + "2024-07-02 21:21:50.123456768": { + ns: 1719955310*1e9 + 123456789, + expected: time.Unix(1719955310, 123456789), + }, + "2262-04-11 23:47:15.999999999": { + ns: 9223372035*1e9 + 999999999, // This is close to the maximum value that can be represented by a time.Time + expected: time.Unix(9223372035, 999999999), + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + actual := timeFromNs(tc.ns) + // Why UnixMicro instead of UnixNano? Because some + // precision is lost during the conversion from int64 + // to float64, and getting this right at the microsecond + // level is good enough. + require.InDelta(t, tc.expected.UnixMicro(), actual.UnixMicro(), 1) + }) + } +} + +func TestComputeOffset(t *testing.T) { + t0 := time.Unix(1_000_000, 0) + + testcases := map[string]struct { + frequency time.Duration + offset time.Duration + now time.Time + expected time.Duration + }{ + "zero": { + offset: 0, + frequency: 60 * time.Second, + now: t0.Add(0), + expected: 0, + }, + "1s": { + offset: 1 * time.Second, + frequency: 60 * time.Second, + now: t0.Add(0), + expected: 1 * time.Second, + }, + "30s": { + offset: 30 * time.Second, + frequency: 60 * time.Second, + now: t0.Add(0), + expected: 30 * time.Second, + }, + "created 100 seconds ago": { + offset: 0 * time.Second, + frequency: 60 * time.Second, + now: t0.Add(100 * time.Second), + expected: 20 * time.Second, // 100 - 60 = 40 -> 60 - 40 = 20 + }, + "created 1000 seconds ago": { + offset: 0 * time.Second, + frequency: 60 * time.Second, + now: t0.Add(1000 * time.Second), + expected: 20 * time.Second, // 1000 / 60 = 16 -> 1000 - 60 * 16 = 40 -> 60 - 40 = 20 + }, + "slow check": { + offset: 0 * time.Second, + frequency: 5 * time.Minute, + now: t0.Add(1000 * time.Minute), + expected: 0, + }, + "slow check close to next run": { + offset: 0 * time.Second, + frequency: 5 * time.Minute, + now: t0.Add(999 * time.Minute), + expected: 1 * time.Minute, + }, + "slow check just ran": { + offset: 0 * time.Second, + frequency: 5 * time.Minute, + now: t0.Add(1001 * time.Minute), + expected: 0, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + actual := computeOffset(tc.offset, tc.frequency, t0, tc.now) + if tc.expected != 0 { + require.Equal(t, tc.expected, actual) + } else { + require.LessOrEqual(t, actual, maxPublishInterval) + } + }) + } +}