From b15d6731fdab299c374b650c13823a4169090676 Mon Sep 17 00:00:00 2001 From: Forrest Marshall Date: Mon, 16 Sep 2024 14:07:58 -0700 Subject: [PATCH] add event export helper --- lib/events/export/date_exporter.go | 461 ++++++++++++++++++++++++ lib/events/export/date_exporter_test.go | 452 +++++++++++++++++++++++ lib/events/test/suite.go | 33 ++ 3 files changed, 946 insertions(+) create mode 100644 lib/events/export/date_exporter.go create mode 100644 lib/events/export/date_exporter_test.go diff --git a/lib/events/export/date_exporter.go b/lib/events/export/date_exporter.go new file mode 100644 index 0000000000000..2ccb5d29b6ca3 --- /dev/null +++ b/lib/events/export/date_exporter.go @@ -0,0 +1,461 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package export + +import ( + "context" + "log/slog" + "sync" + "sync/atomic" + "time" + + "github.com/gravitational/trace" + "golang.org/x/time/rate" + "google.golang.org/protobuf/types/known/timestamppb" + + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" + "github.com/gravitational/teleport/api/utils/retryutils" + "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/teleport/lib/utils/interval" +) + +// Client is the subset of the audit event client that is used by the date exporter. +type Client interface { + ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] + GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] +} + +// DateExporterConfig configures the date exporter. +type DateExporterConfig struct { + // Client is the audit event client used to fetch and export events. + Client Client + // Date is the target date to export events from. + Date time.Time + // Export is the callback used to export events. Must be safe for concurrent use if + // the Concurrency parameter is greater than 1. + Export func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error + // OnIdle is an optional callback that gets invoked periodically when the exporter is idle. Note that it is + // safe to close the exporter or inspect its state from within this callback, but waiting on the exporter's + // Done channel within this callback will deadlock. + OnIdle func(ctx context.Context) + // PreviousState is an optional parameter used to resume from a previous date export run. + PreviousState DateExporterState + // Concurrency sets the maximum number of event chunks that will be processed concurrently (defaults to 1). + Concurrency int + // MaxBackoff optionally overrides the default maximum backoff applied when errors are hit. + MaxBackoff time.Duration + // PollInterval optionally overrides the default poll interval used to fetch event chunks. + PollInterval time.Duration +} + +// CheckAndSetDefaults validates configuration and sets default values for optional parameters. +func (cfg *DateExporterConfig) CheckAndSetDefaults() error { + if cfg.Client == nil { + return trace.BadParameter("missing required parameter Client in DateExporterConfig") + } + if cfg.Export == nil { + return trace.BadParameter("missing required parameter Export in DateExporterConfig") + } + if cfg.Date.IsZero() { + return trace.BadParameter("missing required parameter Date in DateExporterConfig") + } + if cfg.Concurrency == 0 { + cfg.Concurrency = 1 + } + if cfg.MaxBackoff == 0 { + cfg.MaxBackoff = 90 * time.Second + } + if cfg.PollInterval == 0 { + cfg.PollInterval = 16 * time.Second + } + return nil +} + +// chunkEntry represents the state of a single event chunk being processed by the date exporter. Unlike +// the rest of the exporter which uses a basic mutex, the chunk entry uses atomic operations in order to +// minimize the potential for reads to affect event processing perf. +type chunkEntry struct { + cursor atomic.Pointer[string] + done atomic.Bool +} + +func (e *chunkEntry) getCursor() string { + p := e.cursor.Load() + if p == nil { + return "" + } + return *p +} + +func (e *chunkEntry) setCursor(cursor string) { + e.cursor.Store(&cursor) +} + +// DateExporterState represents the current state of the date exporter. State can be used to resume +// export from a previous run using the PreviousState parameter of the DateExporter config. +type DateExporterState struct { + // Completed is an unordered list of the chunks for which all events have been consumed. + Completed []string + // Cursors is a map of chunk to cursor for partially completed chunks. + Cursors map[string]string +} + +// DateExporter is a utility for exporting events for a given date using the chunked event export APIs. Note that +// it is specifically designed to prioritize performance and ensure that events aren't missed. It may not yield events +// in time order, and does not provide a mechanism to decide when export for a given date should be considered complete, +// since there is no 100% reliable way to determine when all events for a given date have been exported. +type DateExporter struct { + cfg DateExporterConfig + log *slog.Logger + mainLogLimiter *rate.Limiter + chunkLogLimiter *rate.Limiter + retry retryutils.Retry + sem chan struct{} + mu sync.Mutex + chunks map[string]*chunkEntry + idle bool + cancel context.CancelFunc + done chan struct{} +} + +// NewDateExporter creates a new date exporter and begin background processing of event chunks. Processing will continue +// until DateExporter.Stop is called, even if no new chunks are showing up. It is the caller's responsibility to decide +// when export for a given date should be considered complete by examining the the exporter's progress. +func NewDateExporter(cfg DateExporterConfig) (*DateExporter, error) { + if err := cfg.CheckAndSetDefaults(); err != nil { + return nil, trace.Wrap(err) + } + + retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{ + First: utils.FullJitter(cfg.MaxBackoff / 16), + Driver: retryutils.NewExponentialDriver(cfg.MaxBackoff / 16), + Max: cfg.MaxBackoff, + Jitter: retryutils.NewHalfJitter(), + }) + if err != nil { + return nil, trace.Wrap(err) + } + + // date exporter should always present a correct chunk progress state. if state from + // a previous run was provided as part of the configuration we want to set it up before + // creating the exporter to ensure that any concurrent operations being used to + // monitor/store progress are always shown the correct state. + chunks := make(map[string]*chunkEntry) + + // set up entries for previously completed chunks + for _, chunk := range cfg.PreviousState.Completed { + entry := new(chunkEntry) + entry.done.Store(true) + chunks[chunk] = entry + } + + // set up entries for partially completed chunks + for chunk, cursor := range cfg.PreviousState.Cursors { + entry := new(chunkEntry) + entry.setCursor(cursor) + chunks[chunk] = entry + } + + ctx, cancel := context.WithCancel(context.Background()) + + exporter := &DateExporter{ + cfg: cfg, + log: slog.With("date", cfg.Date.Format(time.DateOnly)), + mainLogLimiter: rate.NewLimiter(rate.Every(time.Minute), 1), + chunkLogLimiter: rate.NewLimiter(rate.Every(time.Minute), 3), + retry: retry, + sem: make(chan struct{}, cfg.Concurrency), + chunks: chunks, + cancel: cancel, + done: make(chan struct{}), + } + + go exporter.run(ctx) + + return exporter, nil +} + +// GetState loads the current state of the date exporter. Note that there may be concurrent export operations +// in progress, meaning that by the time state is observed it may already be outdated. +func (e *DateExporter) GetState() DateExporterState { + e.mu.Lock() + defer e.mu.Unlock() + + var completed []string + cursors := make(map[string]string) + + for chunk, entry := range e.chunks { + if entry.done.Load() { + completed = append(completed, chunk) + } else { + if cursor := entry.getCursor(); cursor != "" { + cursors[chunk] = cursor + } + } + } + + return DateExporterState{ + Completed: completed, + Cursors: cursors, + } +} + +// IsIdle returns true if the date exporter has successfully discovered and processed all currently extant event chunks. Note that +// this does not imply that all events for a given date have been processed, since there may be replication delays and/or ongoing +// activity if the current date is being polled, but it is a strong indicator that all events have been discovered when the exporter +// is processing dates that are significantly in the past. +func (e *DateExporter) IsIdle() bool { + var idle bool + e.withLock(func() { + idle = e.idle + }) + return idle +} + +// Close terminates all event processing. Note that shutdown is asynchronous. Any operation that needs to wait for export to fully +// terminate should wait on Done after calling Close. +func (e *DateExporter) Close() error { + e.cancel() + return nil +} + +// Done provides a channel that will be closed when the date exporter has completed processing all event chunks. When saving the +// final state of the exporter for future resumption, this channel must be waited upon before state is loaded. Note that the date +// exporter never termiantes unless Close is called, so waiting on Done is only meaningful after Close has been called. +func (e *DateExporter) Done() <-chan struct{} { + return e.done +} + +func (e *DateExporter) run(ctx context.Context) { + defer close(e.done) + retry := e.retry.Clone() + + poll := interval.New(interval.Config{ + Duration: e.cfg.PollInterval, + FirstDuration: utils.FullJitter(e.cfg.PollInterval / 2), + Jitter: retryutils.NewSeventhJitter(), + }) + defer poll.Stop() + + var firstFullCycleCompleted bool + + // resume processing of any partially completed chunks prior to fetching new chunks + for chunk, cursor := range e.GetState().Cursors { + e.log.InfoContext(ctx, "resuming processing of partially completed chunk", "chunk", chunk, "cursor", cursor) + if ok := e.startProcessingChunk(ctx, chunk, cursor); !ok { + return + } + } + + for { + n, err := e.fetchAndProcessChunks(ctx) + if err != nil { + if ctx.Err() != nil { + return + } + + if e.mainLogLimiter.Allow() { + e.log.WarnContext(ctx, "fetch and process of event chunks failed", "error", err) + } + + retry.Inc() + select { + case <-retry.After(): + case <-ctx.Done(): + return + } + continue + } + + retry.Reset() + + // if no new chunks were processed, the exporter is considered idle + idle := n == 0 + e.withLock(func() { + e.idle = idle + }) + if idle && e.cfg.OnIdle != nil { + e.cfg.OnIdle(ctx) + } + + // log first success, and periodically log subsequent non-idle cycles + if (n > 0 && e.mainLogLimiter.Allow()) || !firstFullCycleCompleted { + e.log.InfoContext(ctx, "successful fetch and process of event chunks", "chunks", n, "idle", idle) + } + + firstFullCycleCompleted = true + + select { + case <-poll.Next(): + case <-ctx.Done(): + return + } + } +} + +// waitForInflightChunks blocks until all inflight chunks have been processed by acquiring all +// semaphore tokens and then releasing them. note that this operation does not accept a context, +// which is necessary in order to ensure that Done actually waits for all background processing +// to halt. +func (e *DateExporter) waitForInflightChunks() { + // acquire all semaphore tokens to block until all inflight chunks have been processed + for i := 0; i < e.cfg.Concurrency; i++ { + e.sem <- struct{}{} + } + + // release all semaphore tokens + for i := 0; i < e.cfg.Concurrency; i++ { + <-e.sem + } +} + +// fetchAndProcessChunks fetches and processes all chunks for the current date. if the function returns +// without error, all chunks have been successfully processed. note that all *currently known* chunks being +// processed does not necessarily imply that all events for a given date have been processed since there may +// be event replication delays, and/or ongoing activity if the current date is being polled. +func (e *DateExporter) fetchAndProcessChunks(ctx context.Context) (int, error) { + // wait for inflight chunks before returning. in theory it would be fine (and possibly more performant) + // to return immediately, but doing so makes it difficult to reason about when the exporter has fully exited + // and/or how many complete export cycles have been performed. + defer e.waitForInflightChunks() + + chunks := e.cfg.Client.GetEventExportChunks(ctx, &auditlogpb.GetEventExportChunksRequest{ + Date: timestamppb.New(e.cfg.Date), + }) + + var newChunks int + +Chunks: + for chunks.Next() { + // known chunks should be skipped + var skip bool + e.withLock(func() { + if _, ok := e.chunks[chunks.Item().Chunk]; ok { + skip = true + return + } + + // so long as there is at least one undiscovered chunk, the exporter is not considered idle. + e.idle = false + }) + + if skip { + continue Chunks + } + + if ok := e.startProcessingChunk(ctx, chunks.Item().Chunk, "" /* cursor */); !ok { + return newChunks, trace.Wrap(ctx.Err()) + } + + newChunks++ + } + + if err := chunks.Done(); err != nil { + return newChunks, trace.Wrap(err) + } + + return newChunks, nil +} + +// startProcessingChunk blocks until a semaphore token is acquired, then starts background processing of the +// supplied chunk. returns false if the context is canceled before background processing can be started. +func (e *DateExporter) startProcessingChunk(ctx context.Context, chunk, cursor string) (ok bool) { + // acquire semaphore to start concurrent chunk processing + select { + case e.sem <- struct{}{}: + case <-ctx.Done(): + return false + } + + // set up entry so chunk processing status can be tracked + entry := new(chunkEntry) + if cursor != "" { + entry.setCursor(cursor) + } + + e.withLock(func() { + e.chunks[chunk] = entry + }) + + // process chunk concurrently + go func() { + defer func() { + <-e.sem + }() + + e.processChunk(ctx, chunk, entry) + }() + + return true +} + +// processChunk attempts to export events from a given chunk. it will continuously retry until the context is canceled +// or all events have been successfully exported. +func (e *DateExporter) processChunk(ctx context.Context, chunk string, entry *chunkEntry) { + retry := e.retry.Clone() + var failures int +Outer: + for { + + events := e.cfg.Client.ExportUnstructuredEvents(ctx, &auditlogpb.ExportUnstructuredEventsRequest{ + Date: timestamppb.New(e.cfg.Date), + Chunk: chunk, + Cursor: entry.getCursor(), + }) + + if err := e.exportEvents(ctx, events, entry); err != nil { + failures++ + + if e.chunkLogLimiter.Allow() { + e.log.WarnContext(ctx, "event chunk export failed", "chunk", chunk, "failures", failures, "error", err) + } + retry.Inc() + + select { + case <-retry.After(): + case <-ctx.Done(): + return + } + continue Outer + } + + entry.done.Store(true) + return + } +} + +// exportEvents exports all events from the provided stream, updating the supplied entry on each successful export. +func (e *DateExporter) exportEvents(ctx context.Context, events stream.Stream[*auditlogpb.ExportEventUnstructured], entry *chunkEntry) error { + for events.Next() { + if err := e.cfg.Export(ctx, events.Item()); err != nil { + events.Done() + return trace.Wrap(err) + } + + entry.setCursor(events.Item().Cursor) + } + return trace.Wrap(events.Done()) +} + +func (e *DateExporter) withLock(fn func()) { + e.mu.Lock() + defer e.mu.Unlock() + fn() +} diff --git a/lib/events/export/date_exporter_test.go b/lib/events/export/date_exporter_test.go new file mode 100644 index 0000000000000..a6907ade60015 --- /dev/null +++ b/lib/events/export/date_exporter_test.go @@ -0,0 +1,452 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package export + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/gravitational/trace" + "github.com/stretchr/testify/require" + + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" + apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/events" +) + +// TestDateExporterBasics tests the basic functionality of the date exporter, with and +// without random flake. +func TestDateExporterBasics(t *testing.T) { + t.Parallel() + for _, randomFlake := range []bool{false, true} { + t.Run(fmt.Sprintf("randomFlake=%v", randomFlake), func(t *testing.T) { + t.Parallel() + testDateExporterBasics(t, randomFlake) + }) + } +} + +func testDateExporterBasics(t *testing.T, randomFlake bool) { + clt := newFakeClient() + clt.setRandomFlake(randomFlake) + + now := time.Now() + + var exportedMu sync.Mutex + var exported []*auditlogpb.ExportEventUnstructured + + exportFn := func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error { + exportedMu.Lock() + defer exportedMu.Unlock() + exported = append(exported, event) + return nil + } + + getExported := func() []*auditlogpb.ExportEventUnstructured { + exportedMu.Lock() + defer exportedMu.Unlock() + return append([]*auditlogpb.ExportEventUnstructured(nil), exported...) + } + + idleCh := make(chan struct{}) + + onIdleFn := func(ctx context.Context) { + select { + case idleCh <- struct{}{}: + default: + } + } + + waitIdle := func(t *testing.T) { + // wait for two ticks of idleness (first tick may correspond to a cycle that was finishing + // as the new events were being added, second cycle will have a happens-after relationship to + // this function being called). + timeout := time.After(time.Second * 30) + for i := 0; i < 2; i++ { + select { + case <-idleCh: + case <-timeout: + require.FailNow(t, "timeout waiting for exporter to become idle") + } + } + } + + exporter, err := NewDateExporter(DateExporterConfig{ + Client: clt, + Date: now, + Export: exportFn, + OnIdle: onIdleFn, + Concurrency: 3, + MaxBackoff: time.Millisecond * 600, + PollInterval: time.Millisecond * 200, + }) + require.NoError(t, err) + defer exporter.Close() + + // empty event set means the exporter should become idle almost + // immediately. + waitIdle(t) + require.Empty(t, getExported()) + + var allEvents []*auditlogpb.ExportEventUnstructured + var allChunks []string + // quickly add a bunch of chunks + for i := 0; i < 30; i++ { + chunk := makeEventChunk(t, now, 10) + allEvents = append(allEvents, chunk...) + chunkID := uuid.NewString() + allChunks = append(allChunks, chunkID) + clt.addChunk(now.Format(time.DateOnly), chunkID, chunk) + } + + waitIdle(t) + + require.ElementsMatch(t, allChunks, exporter.GetState().Completed) + require.ElementsMatch(t, allEvents, getExported()) + + // process a second round of chunks to cover the case of new chunks being added + // after non-trivial idleness. + + // note that we do a lot more events here just to make absolutely certain + // that we're hitting a decent amout of random flake. + for i := 0; i < 30; i++ { + chunk := makeEventChunk(t, now, 10) + allEvents = append(allEvents, chunk...) + chunkID := uuid.NewString() + allChunks = append(allChunks, chunkID) + clt.addChunk(now.Format(time.DateOnly), chunkID, chunk) + } + + waitIdle(t) + + require.ElementsMatch(t, allChunks, exporter.GetState().Completed) + require.ElementsMatch(t, allEvents, getExported()) + + // close the exporter + exporter.Close() + timeout := time.After(time.Second * 30) + select { + case <-exporter.Done(): + case <-timeout: + require.FailNow(t, "timeout waiting for exporter to close") + } + + // get the final state of the exporter + state := exporter.GetState() + + // recreate exporter with state from previous run + exporter, err = NewDateExporter(DateExporterConfig{ + Client: clt, + Date: now, + Export: exportFn, + OnIdle: onIdleFn, + PreviousState: state, + Concurrency: 3, + MaxBackoff: time.Millisecond * 600, + PollInterval: time.Millisecond * 200, + }) + require.NoError(t, err) + defer exporter.Close() + + waitIdle(t) + + // no additional events should have been exported + require.ElementsMatch(t, allChunks, exporter.GetState().Completed) + require.ElementsMatch(t, allEvents, getExported()) + + // new chunks should be consumed correctly + for i := 0; i < 30; i++ { + chunk := makeEventChunk(t, now, 10) + allEvents = append(allEvents, chunk...) + chunkID := uuid.NewString() + allChunks = append(allChunks, chunkID) + clt.addChunk(now.Format(time.DateOnly), chunkID, chunk) + } + + waitIdle(t) + + require.ElementsMatch(t, allChunks, exporter.GetState().Completed) + require.ElementsMatch(t, allEvents, getExported()) +} + +// TestDateExporterResume verifies non-trivial exporter resumption behavior, with and without +// random flake. +func TestDateExporterResume(t *testing.T) { + t.Parallel() + for _, randomFlake := range []bool{false, true} { + t.Run(fmt.Sprintf("randomFlake=%v", randomFlake), func(t *testing.T) { + t.Parallel() + testDateExporterResume(t, randomFlake) + }) + } +} + +func testDateExporterResume(t *testing.T, randomFlake bool) { + clt := newFakeClient() + clt.setRandomFlake(randomFlake) + + now := time.Now() + + // export via unbuffered channel so that we can easily block/unblock export from + // the main test routine. + exportCH := make(chan *auditlogpb.ExportEventUnstructured) + + exportFn := func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error { + select { + case exportCH <- event: + case <-ctx.Done(): + return trace.Wrap(ctx.Err()) + } + return nil + } + + idleCh := make(chan struct{}) + + onIdleFn := func(ctx context.Context) { + select { + case idleCh <- struct{}{}: + default: + } + } + + waitIdle := func(t *testing.T) { + // wait for two ticks of idleness (first tick may correspond to a cycle that was finishing + // as the new events were being added, second cycle will have a happens-after relationship to + // this function being called). + timeout := time.After(time.Second * 30) + for i := 0; i < 2; i++ { + select { + case <-idleCh: + case <-timeout: + require.FailNow(t, "timeout waiting for exporter to become idle") + } + } + } + + exporter, err := NewDateExporter(DateExporterConfig{ + Client: clt, + Date: now, + Export: exportFn, + OnIdle: onIdleFn, + Concurrency: 3, /* low concurrency to ensure that we have some in progress chunks */ + MaxBackoff: time.Millisecond * 600, + PollInterval: time.Millisecond * 200, + }) + require.NoError(t, err) + defer exporter.Close() + + // empty event set means the exporter should become idle almost + // immediately. + waitIdle(t) + + var allEvents, gotEvents []*auditlogpb.ExportEventUnstructured + // quickly add a bunch of chunks + for i := 0; i < 10; i++ { + chunk := makeEventChunk(t, now, 10) + allEvents = append(allEvents, chunk...) + chunkID := uuid.NewString() + clt.addChunk(now.Format(time.DateOnly), chunkID, chunk) + } + + // consume a large subset of events s.t. we have some completed + // chunks, some in progress, and some not yet started (note that + // to guarantee some in progress chunks, the number consumed must not + // divide evenly by the chunk size). + timeout := time.After(time.Second * 30) + for i := 0; i < 47; i++ { + select { + case evt := <-exportCH: + gotEvents = append(gotEvents, evt) + case <-timeout: + require.FailNowf(t, "timeout waiting for event", "iteration=%d", i) + } + } + + // close the exporter and wait for it to finish so that + // we can get the correct final state. + exporter.Close() + select { + case <-exporter.Done(): + case <-time.After(time.Second * 30): + require.FailNow(t, "timeout waiting for exporter to close") + } + + // get the final state of the exporter + state := exporter.GetState() + + fmt.Printf("cursors=%+v\n", state.Cursors) + + // recreate exporter with state from previous run + exporter, err = NewDateExporter(DateExporterConfig{ + Client: clt, + Date: now, + Export: exportFn, + OnIdle: onIdleFn, + PreviousState: state, + Concurrency: 3, + MaxBackoff: time.Millisecond * 600, + PollInterval: time.Millisecond * 200, + }) + require.NoError(t, err) + defer exporter.Close() + + // consume remaining events + for i := 0; i < 53; i++ { + select { + case evt := <-exportCH: + gotEvents = append(gotEvents, evt) + case <-timeout: + require.FailNowf(t, "timeout waiting for event", "iteration=%d", i) + } + } + require.ElementsMatch(t, allEvents, gotEvents) + + // ensure that exporter becomes idle + waitIdle(t) +} + +func makeEventChunk(t *testing.T, ts time.Time, n int) []*auditlogpb.ExportEventUnstructured { + var chunk []*auditlogpb.ExportEventUnstructured + for i := 0; i < n; i++ { + baseEvent := apievents.UserLogin{ + Method: events.LoginMethodSAML, + Status: apievents.Status{Success: true}, + UserMetadata: apievents.UserMetadata{User: "alice@example.com"}, + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Type: events.UserLoginEvent, + Time: ts.Add(time.Duration(i)), + }, + } + + event, err := apievents.ToUnstructured(&baseEvent) + require.NoError(t, err) + chunk = append(chunk, &auditlogpb.ExportEventUnstructured{ + Event: event, + Cursor: strconv.Itoa(i + 1), + }) + } + + return chunk +} + +type fakeClient struct { + mu sync.Mutex + data map[string]map[string][]*auditlogpb.ExportEventUnstructured + randomFlake bool +} + +func newFakeClient() *fakeClient { + return &fakeClient{ + data: make(map[string]map[string][]*auditlogpb.ExportEventUnstructured), + } +} + +func (c *fakeClient) setRandomFlake(flake bool) { + c.mu.Lock() + defer c.mu.Unlock() + c.randomFlake = flake +} + +func (c *fakeClient) addChunk(date string, chunk string, events []*auditlogpb.ExportEventUnstructured) { + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.data[date]; !ok { + c.data[date] = make(map[string][]*auditlogpb.ExportEventUnstructured) + } + c.data[date][chunk] = events +} + +func (c *fakeClient) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + c.mu.Lock() + defer c.mu.Unlock() + chunks, ok := c.data[req.Date.AsTime().Format(time.DateOnly)] + if !ok { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.NotFound("date not found")) + } + + chunk, ok := chunks[req.Chunk] + if !ok { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.NotFound("chunk not found")) + } + + var cursor int + if req.Cursor != "" { + var err error + cursor, err = strconv.Atoi(req.Cursor) + if err != nil { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.BadParameter("invalid cursor %q", req.Cursor)) + } + } + + chunk = chunk[cursor:] + + // randomly truncate the chunk and append an error to simulate flake. we target a 33% failure rate + // since event export is more frequent than chunk listing. + var fail bool + if c.randomFlake && rand.Int()%3 == 0 { + chunk = chunk[:rand.Intn(len(chunk))] + fail = true + } + + return stream.MapErr(stream.Slice(chunk), func(err error) error { + if fail { + return trace.NotFound("export failed as random test condition") + } + return err + }) +} + +func (c *fakeClient) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + c.mu.Lock() + defer c.mu.Unlock() + chunks, ok := c.data[req.Date.AsTime().Format(time.DateOnly)] + if !ok { + return stream.Empty[*auditlogpb.EventExportChunk]() + } + + var eec []*auditlogpb.EventExportChunk + for name := range chunks { + eec = append(eec, &auditlogpb.EventExportChunk{ + Chunk: name, + }) + } + + // randomly truncate the chunk list and append an error to simulate flake. we target a 50% failure rate + // since chunk listing is less frequent than event export. + var fail bool + if c.randomFlake && rand.Int()%2 == 0 { + eec = eec[:rand.Intn(len(eec))] + fail = true + } + + return stream.MapErr(stream.Slice(eec), func(err error) error { + if fail { + return trace.NotFound("chunks failed as random test condition") + } + return err + }) +} diff --git a/lib/events/test/suite.go b/lib/events/test/suite.go index f753170b9453b..11b4cb2c1d26b 100644 --- a/lib/events/test/suite.go +++ b/lib/events/test/suite.go @@ -24,6 +24,7 @@ import ( "io" "os" "slices" + "sync/atomic" "testing" "time" @@ -37,6 +38,7 @@ import ( "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/events" + "github.com/gravitational/teleport/lib/events/export" "github.com/gravitational/teleport/lib/fixtures" "github.com/gravitational/teleport/lib/session" ) @@ -208,6 +210,37 @@ func (s *EventsSuite) EventExport(t *testing.T) { require.False(t, chunks.Next()) require.NoError(t, chunks.Done()) + + // as a sanity check, try pulling events using the exporter helper (should be + // equivalent to the above behavior) + var exportedEvents atomic.Uint64 + var exporter *export.DateExporter + var err error + exporter, err = export.NewDateExporter(export.DateExporterConfig{ + Client: s.Log, + Date: baseTime, + Export: func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error { + exportedEvents.Add(1) + return nil + }, + OnIdle: func(ctx context.Context) { + // only exporting extant events, so we can close as soon as we're caught up. + exporter.Close() + }, + Concurrency: 3, + MaxBackoff: time.Millisecond * 600, + PollInterval: time.Millisecond * 200, + }) + require.NoError(t, err) + defer exporter.Close() + + select { + case <-exporter.Done(): + case <-time.After(30 * time.Second): + require.FailNow(t, "timeout waiting for exporter to finish") + } + + require.Equal(t, uint64(8), exportedEvents.Load()) } // EventPagination covers event search pagination.