From 5f74dde8fe7fbf2095480537596f15c40c1bdd0f Mon Sep 17 00:00:00 2001 From: endigma Date: Fri, 7 Mar 2025 15:22:40 +0000 Subject: [PATCH 01/17] feat: add simple file watcher that does not rely on inotify --- router/pkg/watcher/simple_watcher.go | 49 ++++ router/pkg/watcher/simple_watcher_test.go | 267 ++++++++++++++++++++++ 2 files changed, 316 insertions(+) create mode 100644 router/pkg/watcher/simple_watcher.go create mode 100644 router/pkg/watcher/simple_watcher_test.go diff --git a/router/pkg/watcher/simple_watcher.go b/router/pkg/watcher/simple_watcher.go new file mode 100644 index 0000000000..c1f8398ff1 --- /dev/null +++ b/router/pkg/watcher/simple_watcher.go @@ -0,0 +1,49 @@ +package watcher + +import ( + "context" + "os" + "time" + + "go.uber.org/zap" +) + +func SimpleWatch(ctx context.Context, logger *zap.Logger, interval time.Duration, path string, cb func()) error { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + var prevModTime time.Time + + stat, err := os.Stat(path) + if err != nil { + logger.Debug("target file cannot be statted", zap.String("path", path), zap.Error(err)) + } else { + prevModTime = stat.ModTime() + } + + logger.Debug("watching", zap.String("filename", path), zap.Time("initialModTime", prevModTime)) + + for { + select { + case <-ticker.C: + stat, err := os.Stat(path) + if err != nil { + logger.Debug("target file cannot be statted", zap.String("path", path), zap.Error(err)) + + // Reset the mod time so we catch any new file at the target path + prevModTime = time.Time{} + + continue + } + + logger.Debug("checking", zap.String("filename", path), zap.Time("prev", prevModTime), zap.Time("mod", stat.ModTime())) + + if stat.ModTime().After(prevModTime) { + prevModTime = stat.ModTime() + cb() + } + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/router/pkg/watcher/simple_watcher_test.go b/router/pkg/watcher/simple_watcher_test.go new file mode 100644 index 0000000000..bad97335e1 --- /dev/null +++ b/router/pkg/watcher/simple_watcher_test.go @@ -0,0 +1,267 @@ +package watcher_test + +import ( + "context" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router/pkg/watcher" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +var ( + watchInterval = 10 * time.Millisecond +) + +type CallbackSpy struct { + calls int + mu sync.Mutex +} + +func (c *CallbackSpy) Call() { + c.mu.Lock() + defer c.mu.Unlock() + + c.calls++ +} + +func TestWatch(t *testing.T) { + t.Parallel() + + t.Run("create and move", func(t *testing.T) { + t.Parallel() + + var err error + + dir := t.TempDir() + tempFile := filepath.Join(dir, "config.json") + tempFile2 := filepath.Join(dir, "config2.json") + + err = os.WriteFile(tempFile, []byte("a"), 0o600) + require.NoError(t, err) + + err = os.WriteFile(tempFile2, []byte("b"), 0o600) + require.NoError(t, err) + + wg := sync.WaitGroup{} + + eg, ctx := errgroup.WithContext(context.Background()) + eg.Go(func() error { + return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, tempFile, func() { + wg.Done() + }) + }) + + // Wait for the first cycle to complete to set baseline + time.Sleep(2 * watchInterval) + + wg.Add(1) + + // Move the file away, wait a cycle and then move it back + err = os.Rename(tempFile2, tempFile) + require.NoError(t, err) + + // Should get an event for the new file + waitTimeout(&wg, waitForEvents) + }) + + t.Run("modify an existing file", func(t *testing.T) { + t.Parallel() + ctx := context.Background() + + dir := t.TempDir() + tempFile := filepath.Join(dir, "config.json") + + err := os.WriteFile(tempFile, []byte("a"), 0o600) + require.NoError(t, err) + + wg := sync.WaitGroup{} + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, tempFile, func() { + wg.Done() + }) + }) + + // Wait for the first cycle to complete to set baseline + time.Sleep(2 * watchInterval) + + wg.Add(1) + + err = os.WriteFile(tempFile, []byte("b"), 0o600) + require.NoError(t, err) + + waitTimeout(&wg, waitForEvents) + }) + + t.Run("delete and replace a file", func(t *testing.T) { + t.Parallel() + ctx := context.Background() + + dir := t.TempDir() + tempFile := filepath.Join(dir, "config.json") + + err := os.WriteFile(tempFile, []byte("a"), 0o600) + require.NoError(t, err) + + wg := sync.WaitGroup{} + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, tempFile, func() { + wg.Done() + }) + }) + + // Wait for the first cycle to complete to set baseline + time.Sleep(2 * watchInterval) + + wg.Add(1) + + // Delete the file, wait a cycle and then recreate it + os.Remove(tempFile) + + time.Sleep(2 * watchInterval) + + err = os.WriteFile(tempFile, []byte("b"), 0o600) + require.NoError(t, err) + + // Should get an event for the new file + waitTimeout(&wg, waitForEvents) + }) + + t.Run("move and replace a file", func(t *testing.T) { + t.Parallel() + ctx := context.Background() + + dir := t.TempDir() + tempFile := filepath.Join(dir, "config.json") + tempFile2 := filepath.Join(dir, "config2.json") + + err := os.WriteFile(tempFile, []byte("a"), 0o600) + require.NoError(t, err) + + wg := sync.WaitGroup{} + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, tempFile, func() { + wg.Done() + }) + }) + + // Wait for the first cycle to complete to set baseline + time.Sleep(2 * watchInterval) + + wg.Add(1) + + // Move the file away, wait a cycle and then move it back + err = os.Rename(tempFile, tempFile2) + require.NoError(t, err) + + time.Sleep(2 * watchInterval) + + err = os.Rename(tempFile2, tempFile) + require.NoError(t, err) + + // Should get an event for the moved file, even if its identical + waitTimeout(&wg, waitForEvents) + }) + + t.Run("kubernetes-like symlinks", func(t *testing.T) { + t.Parallel() + ctx := context.Background() + dir := t.TempDir() + + /* + In this test, we set up a symlink chain like this: + + config.json -> linked_folder/config.json + linked_folder -> actual_folder + actual_folder/config.json is real file + + This mimics what Kubernetes does when you mount a ConfigMap as a file. + We want to ensure that changes to the real file beneath multiple layers + of symlinks are still detected. + */ + + watchedFile := filepath.Join(dir, "config.json") + + linkedFolder := filepath.Join(dir, "linked_folder") + linkedFile := filepath.Join(linkedFolder, "config.json") + + realFolder := filepath.Join(dir, "real_folder") + realFile := filepath.Join(realFolder, "config.json") + + require.NoError(t, os.Mkdir(realFolder, 0o700)) + require.NoError(t, os.WriteFile(realFile, []byte("a"), 0o600)) + + require.NoError(t, os.Symlink(realFolder, linkedFolder)) + require.NoError(t, os.Symlink(linkedFile, watchedFile)) + + wg := sync.WaitGroup{} + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, watchedFile, func() { + wg.Done() + }) + }) + + // Wait for the first cycle to complete to set baseline + time.Sleep(2 * watchInterval) + + wg.Add(1) + + require.NoError(t, os.WriteFile(realFile, []byte("b"), 0o600)) + + waitTimeout(&wg, waitForEvents) + }) +} + +func TestCancel(t *testing.T) { + t.Parallel() + ctx := context.Background() + + dir := t.TempDir() + tempFile := filepath.Join(dir, "config.json") + + err := os.WriteFile(tempFile, []byte("a"), 0o600) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(ctx, waitForEvents) + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, tempFile, func() {}) + }) + + cancel() + err = eg.Wait() + require.ErrorIs(t, err, context.Canceled) +} + +// !! THIS FUNCTION LEAKS GOROUTINES !! +// In a timeout scenario, the "monitor" goroutine will be leaked. +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + completed := make(chan struct{}) + + // Leaks in a fail case + go func() { + defer close(completed) + wg.Wait() + }() + + select { + case <-completed: + return true // completed + case <-time.After(timeout): + return false // timed out + } +} From 2c859bde9398eb63cd9dcfa935aee0ddda150127 Mon Sep 17 00:00:00 2001 From: endigma Date: Mon, 10 Mar 2025 13:21:51 +0000 Subject: [PATCH 02/17] test: clean up unused callback spy and add goleak --- router/go.mod | 1 + router/pkg/watcher/simple_watcher_test.go | 43 +++++++++++++---------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/router/go.mod b/router/go.mod index 643a42e239..faa19c3b95 100644 --- a/router/go.mod +++ b/router/go.mod @@ -72,6 +72,7 @@ require ( github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 github.com/tonglil/opentelemetry-go-datadog-propagator v0.1.3 github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 + go.uber.org/goleak v1.3.0 go.uber.org/ratelimit v0.3.1 golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 golang.org/x/text v0.21.0 diff --git a/router/pkg/watcher/simple_watcher_test.go b/router/pkg/watcher/simple_watcher_test.go index bad97335e1..a169e58140 100644 --- a/router/pkg/watcher/simple_watcher_test.go +++ b/router/pkg/watcher/simple_watcher_test.go @@ -10,25 +10,16 @@ import ( "github.com/stretchr/testify/require" "github.com/wundergraph/cosmo/router/pkg/watcher" + "go.uber.org/goleak" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) -var ( - watchInterval = 10 * time.Millisecond -) - -type CallbackSpy struct { - calls int - mu sync.Mutex +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) } -func (c *CallbackSpy) Call() { - c.mu.Lock() - defer c.mu.Unlock() - - c.calls++ -} +var watchInterval = 10 * time.Millisecond func TestWatch(t *testing.T) { t.Parallel() @@ -36,6 +27,9 @@ func TestWatch(t *testing.T) { t.Run("create and move", func(t *testing.T) { t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var err error dir := t.TempDir() @@ -50,7 +44,7 @@ func TestWatch(t *testing.T) { wg := sync.WaitGroup{} - eg, ctx := errgroup.WithContext(context.Background()) + eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, tempFile, func() { wg.Done() @@ -72,7 +66,9 @@ func TestWatch(t *testing.T) { t.Run("modify an existing file", func(t *testing.T) { t.Parallel() - ctx := context.Background() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() dir := t.TempDir() tempFile := filepath.Join(dir, "config.json") @@ -102,7 +98,9 @@ func TestWatch(t *testing.T) { t.Run("delete and replace a file", func(t *testing.T) { t.Parallel() - ctx := context.Background() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() dir := t.TempDir() tempFile := filepath.Join(dir, "config.json") @@ -138,7 +136,9 @@ func TestWatch(t *testing.T) { t.Run("move and replace a file", func(t *testing.T) { t.Parallel() - ctx := context.Background() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() dir := t.TempDir() tempFile := filepath.Join(dir, "config.json") @@ -176,7 +176,10 @@ func TestWatch(t *testing.T) { t.Run("kubernetes-like symlinks", func(t *testing.T) { t.Parallel() - ctx := context.Background() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dir := t.TempDir() /* @@ -227,7 +230,9 @@ func TestWatch(t *testing.T) { func TestCancel(t *testing.T) { t.Parallel() - ctx := context.Background() + + ctx, testCancel := context.WithCancel(context.Background()) + defer testCancel() dir := t.TempDir() tempFile := filepath.Join(dir, "config.json") From 45a20310a0ced2d00273403b6bf4db4e50cb600f Mon Sep 17 00:00:00 2001 From: endigma Date: Mon, 10 Mar 2025 14:50:16 +0000 Subject: [PATCH 03/17] improve simplewatch logging --- router/pkg/watcher/simple_watcher.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/router/pkg/watcher/simple_watcher.go b/router/pkg/watcher/simple_watcher.go index c1f8398ff1..aac7e1aaf9 100644 --- a/router/pkg/watcher/simple_watcher.go +++ b/router/pkg/watcher/simple_watcher.go @@ -12,23 +12,25 @@ func SimpleWatch(ctx context.Context, logger *zap.Logger, interval time.Duration ticker := time.NewTicker(interval) defer ticker.Stop() + ll := logger.With(zap.String("path", path)) + var prevModTime time.Time stat, err := os.Stat(path) if err != nil { - logger.Debug("target file cannot be statted", zap.String("path", path), zap.Error(err)) + ll.Debug("Target file cannot be statted", zap.Error(err)) } else { prevModTime = stat.ModTime() } - logger.Debug("watching", zap.String("filename", path), zap.Time("initialModTime", prevModTime)) + ll.Debug("Watching file for changes", zap.Time("initialModTime", prevModTime)) for { select { case <-ticker.C: stat, err := os.Stat(path) if err != nil { - logger.Debug("target file cannot be statted", zap.String("path", path), zap.Error(err)) + ll.Debug("Target file cannot be statted", zap.Error(err)) // Reset the mod time so we catch any new file at the target path prevModTime = time.Time{} @@ -36,7 +38,7 @@ func SimpleWatch(ctx context.Context, logger *zap.Logger, interval time.Duration continue } - logger.Debug("checking", zap.String("filename", path), zap.Time("prev", prevModTime), zap.Time("mod", stat.ModTime())) + ll.Debug("Checking file for changes", zap.Time("prev", prevModTime), zap.Time("mod", stat.ModTime())) if stat.ModTime().After(prevModTime) { prevModTime = stat.ModTime() From dbdad0d1c5f58be0b07e4ed8df609b8d7c6af22f Mon Sep 17 00:00:00 2001 From: endigma Date: Mon, 10 Mar 2025 17:00:41 +0000 Subject: [PATCH 04/17] remove waitTimeout so we can use goleak --- router/pkg/watcher/simple_watcher_test.go | 33 +++++------------------ 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/router/pkg/watcher/simple_watcher_test.go b/router/pkg/watcher/simple_watcher_test.go index a169e58140..523b95dfb5 100644 --- a/router/pkg/watcher/simple_watcher_test.go +++ b/router/pkg/watcher/simple_watcher_test.go @@ -61,7 +61,7 @@ func TestWatch(t *testing.T) { require.NoError(t, err) // Should get an event for the new file - waitTimeout(&wg, waitForEvents) + wg.Wait() }) t.Run("modify an existing file", func(t *testing.T) { @@ -93,7 +93,7 @@ func TestWatch(t *testing.T) { err = os.WriteFile(tempFile, []byte("b"), 0o600) require.NoError(t, err) - waitTimeout(&wg, waitForEvents) + wg.Wait() }) t.Run("delete and replace a file", func(t *testing.T) { @@ -131,7 +131,7 @@ func TestWatch(t *testing.T) { require.NoError(t, err) // Should get an event for the new file - waitTimeout(&wg, waitForEvents) + wg.Wait() }) t.Run("move and replace a file", func(t *testing.T) { @@ -171,7 +171,7 @@ func TestWatch(t *testing.T) { require.NoError(t, err) // Should get an event for the moved file, even if its identical - waitTimeout(&wg, waitForEvents) + wg.Wait() }) t.Run("kubernetes-like symlinks", func(t *testing.T) { @@ -224,15 +224,13 @@ func TestWatch(t *testing.T) { require.NoError(t, os.WriteFile(realFile, []byte("b"), 0o600)) - waitTimeout(&wg, waitForEvents) + wg.Wait() }) } func TestCancel(t *testing.T) { t.Parallel() - - ctx, testCancel := context.WithCancel(context.Background()) - defer testCancel() + ctx := context.Background() dir := t.TempDir() tempFile := filepath.Join(dir, "config.json") @@ -251,22 +249,3 @@ func TestCancel(t *testing.T) { err = eg.Wait() require.ErrorIs(t, err, context.Canceled) } - -// !! THIS FUNCTION LEAKS GOROUTINES !! -// In a timeout scenario, the "monitor" goroutine will be leaked. -func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { - completed := make(chan struct{}) - - // Leaks in a fail case - go func() { - defer close(completed) - wg.Wait() - }() - - select { - case <-completed: - return true // completed - case <-time.After(timeout): - return false // timed out - } -} From 9e99301de5ee36c3bc8d0901d3fd11efb483e5d5 Mon Sep 17 00:00:00 2001 From: endigma Date: Mon, 10 Mar 2025 18:42:18 +0000 Subject: [PATCH 05/17] refactor the create/move test to be less timing sensitive --- router/pkg/watcher/simple_watcher_test.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/router/pkg/watcher/simple_watcher_test.go b/router/pkg/watcher/simple_watcher_test.go index 523b95dfb5..bc32f4d548 100644 --- a/router/pkg/watcher/simple_watcher_test.go +++ b/router/pkg/watcher/simple_watcher_test.go @@ -34,13 +34,11 @@ func TestWatch(t *testing.T) { dir := t.TempDir() tempFile := filepath.Join(dir, "config.json") - tempFile2 := filepath.Join(dir, "config2.json") err = os.WriteFile(tempFile, []byte("a"), 0o600) require.NoError(t, err) - err = os.WriteFile(tempFile2, []byte("b"), 0o600) - require.NoError(t, err) + t.Log("wrote tempFile") wg := sync.WaitGroup{} @@ -56,7 +54,17 @@ func TestWatch(t *testing.T) { wg.Add(1) - // Move the file away, wait a cycle and then move it back + tempFile2 := filepath.Join(dir, "config2.json") + + // Careful, this is subtly timing dependent. If we create + // the new file too quickly after the first, some filesystems + // will not record a different timestamp between the two files. + // The sleep above should be adequate, but if you're not + // seeing the event, try increasing it. + err = os.WriteFile(tempFile2, []byte("b"), 0o600) + require.NoError(t, err) + + // Move new file ontop of the old file err = os.Rename(tempFile2, tempFile) require.NoError(t, err) From 4e88df75ca0ba06388ea46908479034f84c9c621 Mon Sep 17 00:00:00 2001 From: endigma Date: Mon, 10 Mar 2025 19:07:41 +0000 Subject: [PATCH 06/17] replace fsnotify watcher with simple watcher --- router/core/router.go | 24 +++++------------------- router/pkg/watcher/simple_watcher.go | 6 ++++++ 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/router/core/router.go b/router/core/router.go index e0c72539e0..a0916dc5b9 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -1116,24 +1116,16 @@ func (r *Router) Start(ctx context.Context) error { }() if r.executionConfig != nil && r.executionConfig.Watch { - - w, err := watcher.NewWatcher(r.logger.With(zap.String("watcher", "execution_config"))) - if err != nil { - return fmt.Errorf("failed to start watcher for execution config file: %w", err) - } - - // Watch the execution config file for changes. Returning an error will stop the watcher. - // We intentionally ignore the error here because the user can retry. The watcher is closed when context is done. - err = w.Watch(ctx, r.executionConfig.Path, func(events []watcher.Event) error { + go watcher.MustSimpleWatch(ctx, r.logger.With(zap.String("watcher", "execution_config")), time.Second, r.executionConfig.Path, func() { if r.shutdown.Load() { r.logger.Warn("Router is in shutdown state. Skipping config update") - return nil + return } data, err := os.ReadFile(r.executionConfig.Path) if err != nil { r.logger.Error("Failed to read config file", zap.Error(err)) - return nil + return } r.logger.Info("Config file changed. Updating server with new config", zap.String("path", r.executionConfig.Path)) @@ -1141,20 +1133,14 @@ func (r *Router) Start(ctx context.Context) error { cfg, err := execution_config.UnmarshalConfig(data) if err != nil { r.logger.Error("Failed to serialize config file", zap.Error(err)) - return nil + return } if err := r.newServer(ctx, cfg); err != nil { r.logger.Error("Failed to update server with new config", zap.Error(err)) - return nil + return } - - return nil }) - if err != nil { - r.logger.Error("Failed to watch execution config file. Restart the router to apply changes", zap.Error(err)) - return fmt.Errorf("failed to watch execution config file: %w", err) - } r.logger.Info("Watching config file for changes. Router will hot-reload automatically without downtime", zap.String("path", r.executionConfig.Path), diff --git a/router/pkg/watcher/simple_watcher.go b/router/pkg/watcher/simple_watcher.go index aac7e1aaf9..a0397c8810 100644 --- a/router/pkg/watcher/simple_watcher.go +++ b/router/pkg/watcher/simple_watcher.go @@ -8,6 +8,12 @@ import ( "go.uber.org/zap" ) +func MustSimpleWatch(ctx context.Context, logger *zap.Logger, interval time.Duration, path string, cb func()) { + if err := SimpleWatch(ctx, logger, interval, path, cb); err != nil { + logger.Fatal("Error watching file", zap.Error(err)) + } +} + func SimpleWatch(ctx context.Context, logger *zap.Logger, interval time.Duration, path string, cb func()) error { ticker := time.NewTicker(interval) defer ticker.Stop() From 3fe32ec500e7e4f30f209f41aa8ff6ad3384b5f6 Mon Sep 17 00:00:00 2001 From: endigma Date: Mon, 10 Mar 2025 19:14:06 +0000 Subject: [PATCH 07/17] style: use a config struct instead of many arguments --- router/core/router.go | 45 +++++++++++--------- router/pkg/watcher/simple_watcher.go | 25 +++++++---- router/pkg/watcher/simple_watcher_test.go | 51 ++++++++++++++++++----- 3 files changed, 81 insertions(+), 40 deletions(-) diff --git a/router/core/router.go b/router/core/router.go index a0916dc5b9..4028d08ac6 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -1116,30 +1116,35 @@ func (r *Router) Start(ctx context.Context) error { }() if r.executionConfig != nil && r.executionConfig.Watch { - go watcher.MustSimpleWatch(ctx, r.logger.With(zap.String("watcher", "execution_config")), time.Second, r.executionConfig.Path, func() { - if r.shutdown.Load() { - r.logger.Warn("Router is in shutdown state. Skipping config update") - return - } + go watcher.MustSimpleWatch(ctx, watcher.SimpleWatcherOptions{ + Logger: r.logger.With(zap.String("watcher", "execution_config")), + Path: r.executionConfig.Path, + Interval: time.Second, + Callback: func() { + if r.shutdown.Load() { + r.logger.Warn("Router is in shutdown state. Skipping config update") + return + } - data, err := os.ReadFile(r.executionConfig.Path) - if err != nil { - r.logger.Error("Failed to read config file", zap.Error(err)) - return - } + data, err := os.ReadFile(r.executionConfig.Path) + if err != nil { + r.logger.Error("Failed to read config file", zap.Error(err)) + return + } - r.logger.Info("Config file changed. Updating server with new config", zap.String("path", r.executionConfig.Path)) + r.logger.Info("Config file changed. Updating server with new config", zap.String("path", r.executionConfig.Path)) - cfg, err := execution_config.UnmarshalConfig(data) - if err != nil { - r.logger.Error("Failed to serialize config file", zap.Error(err)) - return - } + cfg, err := execution_config.UnmarshalConfig(data) + if err != nil { + r.logger.Error("Failed to serialize config file", zap.Error(err)) + return + } - if err := r.newServer(ctx, cfg); err != nil { - r.logger.Error("Failed to update server with new config", zap.Error(err)) - return - } + if err := r.newServer(ctx, cfg); err != nil { + r.logger.Error("Failed to update server with new config", zap.Error(err)) + return + } + }, }) r.logger.Info("Watching config file for changes. Router will hot-reload automatically without downtime", diff --git a/router/pkg/watcher/simple_watcher.go b/router/pkg/watcher/simple_watcher.go index a0397c8810..b9b25d8887 100644 --- a/router/pkg/watcher/simple_watcher.go +++ b/router/pkg/watcher/simple_watcher.go @@ -8,21 +8,28 @@ import ( "go.uber.org/zap" ) -func MustSimpleWatch(ctx context.Context, logger *zap.Logger, interval time.Duration, path string, cb func()) { - if err := SimpleWatch(ctx, logger, interval, path, cb); err != nil { - logger.Fatal("Error watching file", zap.Error(err)) +type SimpleWatcherOptions struct { + Interval time.Duration + Logger *zap.Logger + Path string + Callback func() +} + +func MustSimpleWatch(ctx context.Context, options SimpleWatcherOptions) { + if err := SimpleWatch(ctx, options); err != nil { + options.Logger.Fatal("Error watching file", zap.Error(err)) } } -func SimpleWatch(ctx context.Context, logger *zap.Logger, interval time.Duration, path string, cb func()) error { - ticker := time.NewTicker(interval) +func SimpleWatch(ctx context.Context, options SimpleWatcherOptions) error { + ticker := time.NewTicker(options.Interval) defer ticker.Stop() - ll := logger.With(zap.String("path", path)) + ll := options.Logger.With(zap.String("path", options.Path)) var prevModTime time.Time - stat, err := os.Stat(path) + stat, err := os.Stat(options.Path) if err != nil { ll.Debug("Target file cannot be statted", zap.Error(err)) } else { @@ -34,7 +41,7 @@ func SimpleWatch(ctx context.Context, logger *zap.Logger, interval time.Duration for { select { case <-ticker.C: - stat, err := os.Stat(path) + stat, err := os.Stat(options.Path) if err != nil { ll.Debug("Target file cannot be statted", zap.Error(err)) @@ -48,7 +55,7 @@ func SimpleWatch(ctx context.Context, logger *zap.Logger, interval time.Duration if stat.ModTime().After(prevModTime) { prevModTime = stat.ModTime() - cb() + options.Callback() } case <-ctx.Done(): return ctx.Err() diff --git a/router/pkg/watcher/simple_watcher_test.go b/router/pkg/watcher/simple_watcher_test.go index bc32f4d548..f9741faf53 100644 --- a/router/pkg/watcher/simple_watcher_test.go +++ b/router/pkg/watcher/simple_watcher_test.go @@ -44,8 +44,13 @@ func TestWatch(t *testing.T) { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, tempFile, func() { - wg.Done() + return watcher.SimpleWatch(ctx, watcher.SimpleWatcherOptions{ + Interval: watchInterval, + Logger: zap.NewNop(), + Path: tempFile, + Callback: func() { + wg.Done() + }, }) }) @@ -88,8 +93,13 @@ func TestWatch(t *testing.T) { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, tempFile, func() { - wg.Done() + return watcher.SimpleWatch(ctx, watcher.SimpleWatcherOptions{ + Interval: watchInterval, + Logger: zap.NewNop(), + Path: tempFile, + Callback: func() { + wg.Done() + }, }) }) @@ -120,8 +130,13 @@ func TestWatch(t *testing.T) { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, tempFile, func() { - wg.Done() + return watcher.SimpleWatch(ctx, watcher.SimpleWatcherOptions{ + Interval: watchInterval, + Logger: zap.NewNop(), + Path: tempFile, + Callback: func() { + wg.Done() + }, }) }) @@ -159,8 +174,13 @@ func TestWatch(t *testing.T) { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, tempFile, func() { - wg.Done() + return watcher.SimpleWatch(ctx, watcher.SimpleWatcherOptions{ + Interval: watchInterval, + Logger: zap.NewNop(), + Path: tempFile, + Callback: func() { + wg.Done() + }, }) }) @@ -220,8 +240,13 @@ func TestWatch(t *testing.T) { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, watchedFile, func() { - wg.Done() + return watcher.SimpleWatch(ctx, watcher.SimpleWatcherOptions{ + Interval: watchInterval, + Logger: zap.NewNop(), + Path: watchedFile, + Callback: func() { + wg.Done() + }, }) }) @@ -250,7 +275,11 @@ func TestCancel(t *testing.T) { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - return watcher.SimpleWatch(ctx, zap.NewNop(), watchInterval, tempFile, func() {}) + return watcher.SimpleWatch(ctx, watcher.SimpleWatcherOptions{ + Interval: watchInterval, + Logger: zap.NewNop(), + Path: tempFile, + }) }) cancel() From d211a44e949dc2009f3d566ed0a1f2a96f8d058f Mon Sep 17 00:00:00 2001 From: endigma Date: Mon, 10 Mar 2025 19:16:35 +0000 Subject: [PATCH 08/17] remove test log --- router/pkg/watcher/simple_watcher_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/router/pkg/watcher/simple_watcher_test.go b/router/pkg/watcher/simple_watcher_test.go index f9741faf53..a00bcc3f1c 100644 --- a/router/pkg/watcher/simple_watcher_test.go +++ b/router/pkg/watcher/simple_watcher_test.go @@ -38,8 +38,6 @@ func TestWatch(t *testing.T) { err = os.WriteFile(tempFile, []byte("a"), 0o600) require.NoError(t, err) - t.Log("wrote tempFile") - wg := sync.WaitGroup{} eg, ctx := errgroup.WithContext(ctx) From dfe611320648b5377b57fc91a27de75ec34fc1d8 Mon Sep 17 00:00:00 2001 From: endigma Date: Tue, 11 Mar 2025 14:06:22 +0000 Subject: [PATCH 09/17] add configurable interval, add test --- router-tests/router_config_watch_test.go | 94 +++++++++++++++++++ router/cmd/instance.go | 5 +- router/core/router.go | 9 +- router/pkg/config/config.go | 5 +- router/pkg/config/config.schema.json | 9 ++ .../pkg/config/testdata/config_defaults.json | 3 +- router/pkg/config/testdata/config_full.json | 3 +- router/pkg/watcher/simple_watcher.go | 4 +- 8 files changed, 120 insertions(+), 12 deletions(-) create mode 100644 router-tests/router_config_watch_test.go diff --git a/router-tests/router_config_watch_test.go b/router-tests/router_config_watch_test.go new file mode 100644 index 0000000000..5b346a12fb --- /dev/null +++ b/router-tests/router_config_watch_test.go @@ -0,0 +1,94 @@ +package integration + +import ( + "encoding/json" + "fmt" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router-tests/testenv" + "github.com/wundergraph/cosmo/router/core" + nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" +) + +func TestRouterConfigWatch(t *testing.T) { + t.Parallel() + + // Create a temporary file for the router config + configFile := t.TempDir() + "/config.json" + + // Initial config with just the employees subgraph + initialConfig := MakeTestConfig("initial") + + // Write initial config to file + initialBytes, err := json.Marshal(initialConfig) + require.NoError(t, err) + err = os.WriteFile(configFile, initialBytes, 0644) + require.NoError(t, err) + + testenv.Run(t, &testenv.Config{ + RouterOptions: []core.Option{ + core.WithExecutionConfig(&core.ExecutionConfig{ + Path: configFile, + Watch: true, + WatchInterval: 100 * time.Millisecond, + }), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { hello }`, + }) + require.JSONEq(t, `{"data":{"hello":"initial"}}`, res.Body) + + updatedConfig := MakeTestConfig("updated") + + updatedBytes, err := json.Marshal(updatedConfig) + require.NoError(t, err) + + err = os.WriteFile(configFile, updatedBytes, 0644) + require.NoError(t, err) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { hello }`, + }) + require.JSONEq(t, `{"data":{"hello":"updated"}}`, res.Body) + }, 2*time.Second, 100*time.Millisecond) + }) +} + +func MakeTestConfig(msg string) *nodev1.RouterConfig { + return &nodev1.RouterConfig{ + Version: "1a7c0b1a-839c-4b6f-9d05-7cb728168f57", + EngineConfig: &nodev1.EngineConfiguration{ + DefaultFlushInterval: 500, + DatasourceConfigurations: []*nodev1.DataSourceConfiguration{ + { + Kind: nodev1.DataSourceKind_STATIC, + RootNodes: []*nodev1.TypeField{ + { + TypeName: "Query", + FieldNames: []string{"hello"}, + }, + }, + CustomStatic: &nodev1.DataSourceCustom_Static{ + Data: &nodev1.ConfigurationVariable{ + StaticVariableContent: fmt.Sprintf(`{"hello": "%s"}`, msg), + }, + }, + Id: "0", + }, + }, + GraphqlSchema: "schema {\n query: Query\n}\ntype Query {\n hello: String\n}", + FieldConfigurations: []*nodev1.FieldConfiguration{ + { + TypeName: "Query", + FieldName: "hello", + }, + }, + }, + } +} diff --git a/router/cmd/instance.go b/router/cmd/instance.go index a633f649b4..c26b2a934c 100644 --- a/router/cmd/instance.go +++ b/router/cmd/instance.go @@ -211,8 +211,9 @@ func NewRouter(ctx context.Context, params Params, additionalOptions ...core.Opt if executionConfigPath != "" { options = append(options, core.WithExecutionConfig(&core.ExecutionConfig{ - Watch: cfg.ExecutionConfig.File.Watch, - Path: executionConfigPath, + Watch: cfg.ExecutionConfig.File.Watch, + WatchInterval: cfg.ExecutionConfig.File.WatchInterval, + Path: executionConfigPath, })) } else { options = append(options, core.WithConfigPollerConfig(&core.RouterConfigPollerConfig{ diff --git a/router/core/router.go b/router/core/router.go index 4028d08ac6..520b210a61 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -136,8 +136,9 @@ type ( } ExecutionConfig struct { - Watch bool - Path string + Watch bool + WatchInterval time.Duration + Path string } AccessLogsConfig struct { @@ -1116,10 +1117,10 @@ func (r *Router) Start(ctx context.Context) error { }() if r.executionConfig != nil && r.executionConfig.Watch { - go watcher.MustSimpleWatch(ctx, watcher.SimpleWatcherOptions{ + go watcher.LogSimpleWatch(ctx, watcher.SimpleWatcherOptions{ Logger: r.logger.With(zap.String("watcher", "execution_config")), Path: r.executionConfig.Path, - Interval: time.Second, + Interval: r.executionConfig.WatchInterval, Callback: func() { if r.shutdown.Load() { r.logger.Warn("Router is in shutdown state. Skipping config update") diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 8f40676bc2..6765a9eee6 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -684,8 +684,9 @@ type FallbackExecutionConfigStorage struct { } type ExecutionConfigFile struct { - Path string `yaml:"path,omitempty" env:"EXECUTION_CONFIG_FILE_PATH"` - Watch bool `yaml:"watch,omitempty" envDefault:"false" env:"EXECUTION_CONFIG_FILE_WATCH"` + Path string `yaml:"path,omitempty" env:"EXECUTION_CONFIG_FILE_PATH"` + Watch bool `yaml:"watch,omitempty" envDefault:"false" env:"EXECUTION_CONFIG_FILE_WATCH"` + WatchInterval time.Duration `yaml:"watch_interval,omitempty" envDefault:"1s" env:"EXECUTION_CONFIG_FILE_WATCH_INTERVAL"` } type ExecutionConfig struct { diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index db09cec9e4..ab7312d460 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -264,6 +264,15 @@ "type": "boolean", "default": false, "description": "Enable the watch mode. The watch mode is used to watch the execution config file for changes. If the file changes, the router will reload the execution config without downtime." + }, + "watch_interval": { + "type": "string", + "description": "The interval at which the file is checked for changes. The period is specified as a string with a number and a unit, e.g. 10ms, 1s, 1m, 1h. The supported units are 'ms', 's', 'm', 'h'.", + "default": "1s", + "duration": { + "minimum": "100ms", + "maximum": "1m" + } } } } diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index 9cbc593802..bd71a366c0 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -359,7 +359,8 @@ "ExecutionConfig": { "File": { "Path": "", - "Watch": false + "Watch": false, + "WatchInterval": 1000000000 }, "Storage": { "ProviderID": "", diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 0cdbc10142..8aa6d50815 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -662,7 +662,8 @@ "ExecutionConfig": { "File": { "Path": "", - "Watch": false + "Watch": false, + "WatchInterval": 1000000000 }, "Storage": { "ProviderID": "s3", diff --git a/router/pkg/watcher/simple_watcher.go b/router/pkg/watcher/simple_watcher.go index b9b25d8887..022b88af94 100644 --- a/router/pkg/watcher/simple_watcher.go +++ b/router/pkg/watcher/simple_watcher.go @@ -15,9 +15,9 @@ type SimpleWatcherOptions struct { Callback func() } -func MustSimpleWatch(ctx context.Context, options SimpleWatcherOptions) { +func LogSimpleWatch(ctx context.Context, options SimpleWatcherOptions) { if err := SimpleWatch(ctx, options); err != nil { - options.Logger.Fatal("Error watching file", zap.Error(err)) + options.Logger.Error("Error watching file", zap.Error(err)) } } From 12b63c10c52e04b050804e486a576651407542e4 Mon Sep 17 00:00:00 2001 From: endigma Date: Tue, 11 Mar 2025 14:45:34 +0000 Subject: [PATCH 10/17] fix: protect graph server swap with mutex --- router/core/http_server.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/router/core/http_server.go b/router/core/http_server.go index d6c0306f01..1e4a6ed6c0 100644 --- a/router/core/http_server.go +++ b/router/core/http_server.go @@ -103,7 +103,6 @@ func (s *server) SwapGraphServer(ctx context.Context, svr *graphServer) { // and no other config changes can happen in the meantime. s.mu.Lock() s.handler = svr.mux - s.mu.Unlock() // If the graph server is nil, we don't need to shutdown anything // This is the case when the router is starting for the first time @@ -113,7 +112,10 @@ func (s *server) SwapGraphServer(ctx context.Context, svr *graphServer) { } } + // Swap the graph server s.graphServer = svr + + s.mu.Unlock() } // listenAndServe starts the server and blocks until the server is shutdown. From 7416557af4c81b3a63c6e607f0a6e3fe54598359 Mon Sep 17 00:00:00 2001 From: endigma Date: Tue, 11 Mar 2025 14:46:56 +0000 Subject: [PATCH 11/17] fix inaccurate log error message --- router/core/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/core/router.go b/router/core/router.go index 520b210a61..ffb74714cd 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -1137,7 +1137,7 @@ func (r *Router) Start(ctx context.Context) error { cfg, err := execution_config.UnmarshalConfig(data) if err != nil { - r.logger.Error("Failed to serialize config file", zap.Error(err)) + r.logger.Error("Failed to unmarshal config file", zap.Error(err)) return } From bb2b4430df905a634852ad380f30cad8e223c5cd Mon Sep 17 00:00:00 2001 From: endigma Date: Tue, 11 Mar 2025 15:10:51 +0000 Subject: [PATCH 12/17] move file watcher tests in with the other hot reload tests --- router-tests/config_hot_reload_test.go | 81 ++++++++++++++++++++ router-tests/router_config_watch_test.go | 94 ------------------------ 2 files changed, 81 insertions(+), 94 deletions(-) delete mode 100644 router-tests/router_config_watch_test.go diff --git a/router-tests/config_hot_reload_test.go b/router-tests/config_hot_reload_test.go index 10618aebb4..045091fc9a 100644 --- a/router-tests/config_hot_reload_test.go +++ b/router-tests/config_hot_reload_test.go @@ -3,6 +3,8 @@ package integration import ( "context" "encoding/json" + "fmt" + "os" "sync/atomic" "testing" "time" @@ -358,6 +360,85 @@ func TestConfigHotReload(t *testing.T) { } +func TestConfigHotReloadFile(t *testing.T) { + t.Parallel() + + // Create a temporary file for the router config + configFile := t.TempDir() + "/config.json" + + // Initial config with just the employees subgraph + initialConfig := MakeTestConfig("initial") + + // Write initial config to file + initialBytes, err := json.Marshal(initialConfig) + require.NoError(t, err) + err = os.WriteFile(configFile, initialBytes, 0644) + require.NoError(t, err) + + testenv.Run(t, &testenv.Config{ + RouterOptions: []core.Option{ + core.WithExecutionConfig(&core.ExecutionConfig{ + Path: configFile, + Watch: true, + WatchInterval: 100 * time.Millisecond, + }), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { hello }`, + }) + require.JSONEq(t, `{"data":{"hello":"initial"}}`, res.Body) + + updatedConfig := MakeTestConfig("updated") + + updatedBytes, err := json.Marshal(updatedConfig) + require.NoError(t, err) + + err = os.WriteFile(configFile, updatedBytes, 0644) + require.NoError(t, err) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { hello }`, + }) + require.JSONEq(t, `{"data":{"hello":"updated"}}`, res.Body) + }, 2*time.Second, 100*time.Millisecond) + }) +} + +func MakeTestConfig(msg string) *nodev1.RouterConfig { + return &nodev1.RouterConfig{ + Version: "1a7c0b1a-839c-4b6f-9d05-7cb728168f57", + EngineConfig: &nodev1.EngineConfiguration{ + DefaultFlushInterval: 500, + DatasourceConfigurations: []*nodev1.DataSourceConfiguration{ + { + Kind: nodev1.DataSourceKind_STATIC, + RootNodes: []*nodev1.TypeField{ + { + TypeName: "Query", + FieldNames: []string{"hello"}, + }, + }, + CustomStatic: &nodev1.DataSourceCustom_Static{ + Data: &nodev1.ConfigurationVariable{ + StaticVariableContent: fmt.Sprintf(`{"hello": "%s"}`, msg), + }, + }, + Id: "0", + }, + }, + GraphqlSchema: "schema {\n query: Query\n}\ntype Query {\n hello: String\n}", + FieldConfigurations: []*nodev1.FieldConfiguration{ + { + TypeName: "Query", + FieldName: "hello", + }, + }, + }, + } +} + func BenchmarkConfigHotReload(b *testing.B) { pm := ConfigPollerMock{ ready: make(chan struct{}), diff --git a/router-tests/router_config_watch_test.go b/router-tests/router_config_watch_test.go deleted file mode 100644 index 5b346a12fb..0000000000 --- a/router-tests/router_config_watch_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package integration - -import ( - "encoding/json" - "fmt" - "os" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/wundergraph/cosmo/router-tests/testenv" - "github.com/wundergraph/cosmo/router/core" - nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" -) - -func TestRouterConfigWatch(t *testing.T) { - t.Parallel() - - // Create a temporary file for the router config - configFile := t.TempDir() + "/config.json" - - // Initial config with just the employees subgraph - initialConfig := MakeTestConfig("initial") - - // Write initial config to file - initialBytes, err := json.Marshal(initialConfig) - require.NoError(t, err) - err = os.WriteFile(configFile, initialBytes, 0644) - require.NoError(t, err) - - testenv.Run(t, &testenv.Config{ - RouterOptions: []core.Option{ - core.WithExecutionConfig(&core.ExecutionConfig{ - Path: configFile, - Watch: true, - WatchInterval: 100 * time.Millisecond, - }), - }, - }, func(t *testing.T, xEnv *testenv.Environment) { - res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ - Query: `query { hello }`, - }) - require.JSONEq(t, `{"data":{"hello":"initial"}}`, res.Body) - - updatedConfig := MakeTestConfig("updated") - - updatedBytes, err := json.Marshal(updatedConfig) - require.NoError(t, err) - - err = os.WriteFile(configFile, updatedBytes, 0644) - require.NoError(t, err) - - require.EventuallyWithT(t, func(t *assert.CollectT) { - res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ - Query: `query { hello }`, - }) - require.JSONEq(t, `{"data":{"hello":"updated"}}`, res.Body) - }, 2*time.Second, 100*time.Millisecond) - }) -} - -func MakeTestConfig(msg string) *nodev1.RouterConfig { - return &nodev1.RouterConfig{ - Version: "1a7c0b1a-839c-4b6f-9d05-7cb728168f57", - EngineConfig: &nodev1.EngineConfiguration{ - DefaultFlushInterval: 500, - DatasourceConfigurations: []*nodev1.DataSourceConfiguration{ - { - Kind: nodev1.DataSourceKind_STATIC, - RootNodes: []*nodev1.TypeField{ - { - TypeName: "Query", - FieldNames: []string{"hello"}, - }, - }, - CustomStatic: &nodev1.DataSourceCustom_Static{ - Data: &nodev1.ConfigurationVariable{ - StaticVariableContent: fmt.Sprintf(`{"hello": "%s"}`, msg), - }, - }, - Id: "0", - }, - }, - GraphqlSchema: "schema {\n query: Query\n}\ntype Query {\n hello: String\n}", - FieldConfigurations: []*nodev1.FieldConfiguration{ - { - TypeName: "Query", - FieldName: "hello", - }, - }, - }, - } -} From 183d5b4c7c39b3c168bec231f1dd829087ea103e Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 12 Mar 2025 12:16:48 +0000 Subject: [PATCH 13/17] add tests for execution config validation --- router/pkg/config/config_test.go | 87 +++++++++++++++++++++++++++----- 1 file changed, 74 insertions(+), 13 deletions(-) diff --git a/router/pkg/config/config_test.go b/router/pkg/config/config_test.go index f72f042720..6eafb869a7 100644 --- a/router/pkg/config/config_test.go +++ b/router/pkg/config/config_test.go @@ -393,7 +393,9 @@ persisted_operations: func TestValidExecutionConfig(t *testing.T) { t.Parallel() - f := createTempFileFromFixture(t, ` + t.Run("s3 storage config", func(t *testing.T) { + + f := createTempFileFromFixture(t, ` version: "1" storage_providers: @@ -410,11 +412,14 @@ execution_config: provider_id: s3 object_path: "5ef73d80-cae4-4d0e-98a7-1e9fa922c1a4/92c25b45-a75b-4954-b8f6-6592a9b203eb/routerconfigs/latest.json" `) - _, err := LoadConfig(f, "") - var js *jsonschema.ValidationError - require.NoError(t, err, &js) + _, err := LoadConfig(f, "") + var js *jsonschema.ValidationError + require.NoError(t, err, &js) + }) - f = createTempFileFromFixture(t, ` + t.Run("cdn storage config", func(t *testing.T) { + + f := createTempFileFromFixture(t, ` version: "1" storage_providers: @@ -427,15 +432,35 @@ execution_config: provider_id: cdn object_path: "5ef73d80-cae4-4d0e-98a7-1e9fa922c1a4/92c25b45-a75b-4954-b8f6-6592a9b203eb/routerconfigs/latest.json" `) - _, err = LoadConfig(f, "") - js = &jsonschema.ValidationError{} - require.NoError(t, err, &js) + _, err := LoadConfig(f, "") + js := &jsonschema.ValidationError{} + require.NoError(t, err, &js) + + }) + + t.Run("file config", func(t *testing.T) { + + f := createTempFileFromFixture(t, ` +version: "1" + +execution_config: + file: + path: "latest.json" + watch: false + watch_interval: "1s" +`) + _, err := LoadConfig(f, "") + js := &jsonschema.ValidationError{} + require.NoError(t, err, &js) + }) } func TestInvalidExecutionConfig(t *testing.T) { t.Parallel() - f := createTempFileFromFixture(t, ` + t.Run("no object_path", func(t *testing.T) { + + f := createTempFileFromFixture(t, ` version: "1" storage_providers: @@ -452,10 +477,46 @@ execution_config: provider_id: s3 # Missing object_path `) - _, err := LoadConfig(f, "") - var js *jsonschema.ValidationError - require.ErrorAs(t, err, &js) - require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config': additional properties 'storage' not allowed\n- at '/execution_config/storage': missing property 'object_path'\n- at '/execution_config': additional properties 'storage' not allowed", js.Causes[0].Error()) + _, err := LoadConfig(f, "") + var js *jsonschema.ValidationError + require.ErrorAs(t, err, &js) + require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config': additional properties 'storage' not allowed\n- at '/execution_config/storage': missing property 'object_path'\n- at '/execution_config': additional properties 'storage' not allowed", js.Causes[0].Error()) + }) + + t.Run("too low watch interval", func(t *testing.T) { + + f := createTempFileFromFixture(t, ` +version: "1" + +execution_config: + file: + path: "latest.json" + watch: false + watch_interval: "50ms" +`) + + _, err := LoadConfig(f, "") + var js *jsonschema.ValidationError + require.ErrorAs(t, err, &js) + require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config/file/watch_interval': duration must be greater or equal than 100ms\n- at '/execution_config': additional properties 'file' not allowed\n- at '/execution_config': additional properties 'file' not allowed", js.Causes[0].Error()) + }) + + t.Run("too high watch interval", func(t *testing.T) { + + f := createTempFileFromFixture(t, ` +version: "1" + +execution_config: + file: + path: "latest.json" + watch: false + watch_interval: "5m" +`) + _, err := LoadConfig(f, "") + var js *jsonschema.ValidationError + require.ErrorAs(t, err, &js) + require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config/file/watch_interval': duration must be less or equal than 1m0s\n- at '/execution_config': additional properties 'file' not allowed\n- at '/execution_config': additional properties 'file' not allowed", js.Causes[0].Error()) + }) } func TestValidLocalExecutionConfig(t *testing.T) { From 5bf593b4b67376d232d571c3436bb3c602e679f3 Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 12 Mar 2025 12:29:40 +0000 Subject: [PATCH 14/17] make watch_interval dependent on watch==true in schema --- router/pkg/config/config.schema.json | 12 ++++++++++-- router/pkg/config/config_test.go | 12 ++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index ab7312d460..6da851eca2 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -254,6 +254,15 @@ "required": [ "path" ], + "dependentSchemas": { + "watch_interval": { + "properties": { + "watch": { + "const": true + } + } + } + }, "properties": { "path": { "type": "string", @@ -270,8 +279,7 @@ "description": "The interval at which the file is checked for changes. The period is specified as a string with a number and a unit, e.g. 10ms, 1s, 1m, 1h. The supported units are 'ms', 's', 'm', 'h'.", "default": "1s", "duration": { - "minimum": "100ms", - "maximum": "1m" + "minimum": "1s" } } } diff --git a/router/pkg/config/config_test.go b/router/pkg/config/config_test.go index 6eafb869a7..ef66c8aeaf 100644 --- a/router/pkg/config/config_test.go +++ b/router/pkg/config/config_test.go @@ -491,17 +491,17 @@ version: "1" execution_config: file: path: "latest.json" - watch: false - watch_interval: "50ms" + watch: true + watch_interval: "500ms" `) _, err := LoadConfig(f, "") var js *jsonschema.ValidationError require.ErrorAs(t, err, &js) - require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config/file/watch_interval': duration must be greater or equal than 100ms\n- at '/execution_config': additional properties 'file' not allowed\n- at '/execution_config': additional properties 'file' not allowed", js.Causes[0].Error()) + require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config/file/watch_interval': duration must be greater or equal than 1s\n- at '/execution_config': additional properties 'file' not allowed\n- at '/execution_config': additional properties 'file' not allowed", js.Causes[0].Error()) }) - t.Run("too high watch interval", func(t *testing.T) { + t.Run("watch interval with watch disabled", func(t *testing.T) { f := createTempFileFromFixture(t, ` version: "1" @@ -510,12 +510,12 @@ execution_config: file: path: "latest.json" watch: false - watch_interval: "5m" + watch_interval: "1s" `) _, err := LoadConfig(f, "") var js *jsonschema.ValidationError require.ErrorAs(t, err, &js) - require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config/file/watch_interval': duration must be less or equal than 1m0s\n- at '/execution_config': additional properties 'file' not allowed\n- at '/execution_config': additional properties 'file' not allowed", js.Causes[0].Error()) + require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config/file/watch': value must be true\n- at '/execution_config': additional properties 'file' not allowed\n- at '/execution_config': additional properties 'file' not allowed", js.Causes[0].Error()) }) } From d42cd7642242354a2717a4a536a2a93c02312046 Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 12 Mar 2025 12:32:44 +0000 Subject: [PATCH 15/17] update positive test --- router/pkg/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/pkg/config/config_test.go b/router/pkg/config/config_test.go index ef66c8aeaf..079ba8b181 100644 --- a/router/pkg/config/config_test.go +++ b/router/pkg/config/config_test.go @@ -446,7 +446,7 @@ version: "1" execution_config: file: path: "latest.json" - watch: false + watch: true watch_interval: "1s" `) _, err := LoadConfig(f, "") From dd9a5c04aa1b95f30f4d43239a0da1a18a95fe86 Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 12 Mar 2025 13:08:17 +0000 Subject: [PATCH 16/17] split httpserver mutex into two lock/unlock cycles to avoid potential deadlocks --- router/core/http_server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/router/core/http_server.go b/router/core/http_server.go index 1e4a6ed6c0..427a29e6d4 100644 --- a/router/core/http_server.go +++ b/router/core/http_server.go @@ -103,6 +103,7 @@ func (s *server) SwapGraphServer(ctx context.Context, svr *graphServer) { // and no other config changes can happen in the meantime. s.mu.Lock() s.handler = svr.mux + s.mu.Unlock() // If the graph server is nil, we don't need to shutdown anything // This is the case when the router is starting for the first time @@ -113,8 +114,8 @@ func (s *server) SwapGraphServer(ctx context.Context, svr *graphServer) { } // Swap the graph server + s.mu.Lock() s.graphServer = svr - s.mu.Unlock() } From 239915d316ba71918dcec829dde762b092592cd3 Mon Sep 17 00:00:00 2001 From: endigma Date: Wed, 12 Mar 2025 13:10:09 +0000 Subject: [PATCH 17/17] improve integration test coverage for file watcher --- router-tests/config_hot_reload_test.go | 379 +++++++++++++++---------- 1 file changed, 235 insertions(+), 144 deletions(-) diff --git a/router-tests/config_hot_reload_test.go b/router-tests/config_hot_reload_test.go index 045091fc9a..c6443237bf 100644 --- a/router-tests/config_hot_reload_test.go +++ b/router-tests/config_hot_reload_test.go @@ -3,7 +3,6 @@ package integration import ( "context" "encoding/json" - "fmt" "os" "sync/atomic" "testing" @@ -47,7 +46,7 @@ func (c *ConfigPollerMock) Stop(_ context.Context) error { return nil } -func TestConfigHotReload(t *testing.T) { +func TestConfigHotReloadPoller(t *testing.T) { t.Parallel() t.Run("Swap config and be able to make requests successfully", func(t *testing.T) { @@ -166,113 +165,6 @@ func TestConfigHotReload(t *testing.T) { }) }) - t.Run("Shutdown server waits until all requests has been served", func(t *testing.T) { - t.Parallel() - - pm := ConfigPollerMock{ - ready: make(chan struct{}), - } - - testenv.Run(t, &testenv.Config{ - Subgraphs: testenv.SubgraphsConfig{ - GlobalDelay: time.Millisecond * 1000, - }, - RouterConfig: &testenv.RouterConfig{ - ConfigPollerFactory: func(config *nodev1.RouterConfig) configpoller.ConfigPoller { - pm.initConfig = config - return &pm - }, - }, - }, func(t *testing.T, xEnv *testenv.Environment) { - - var requestsStarted atomic.Uint32 - var requestsDone atomic.Uint32 - - for i := 0; i < 10; i++ { - requestsStarted.Add(1) - func() { - defer requestsDone.Add(1) - - // Create a new context for each request to ensure that the request is not cancelled by the shutdown - res, err := xEnv.MakeGraphQLRequestWithContext(context.Background(), testenv.GraphQLRequest{ - Query: `{ employees { id } }`, - }) - require.NoError(t, err) - require.Equal(t, res.Response.StatusCode, 200) - require.JSONEq(t, employeesIDData, res.Body) - }() - } - - // Let's wait until all requests are in flight - require.Eventually(t, func() bool { - return requestsStarted.Load() == 10 - }, time.Second*5, time.Millisecond*100) - - xEnv.Shutdown() - - // Let's wait until all requests are completed - require.Eventually(t, func() bool { - return requestsDone.Load() == 10 - }, time.Second*20, time.Millisecond*100) - }) - }) - - t.Run("Router grace period defines how long the shutdown can take until all client connections are closed immediately", func(t *testing.T) { - t.Parallel() - - pm := ConfigPollerMock{ - ready: make(chan struct{}), - } - - testenv.Run(t, &testenv.Config{ - Subgraphs: testenv.SubgraphsConfig{ - // This is a very high delay to make sure that the shutdown is enforced by the grace period - GlobalDelay: time.Hour * 1, - }, - RouterOptions: []core.Option{ - // This results in a context.DeadlineExceeded error after the grace period - core.WithGracePeriod(time.Millisecond * 100), - }, - RouterConfig: &testenv.RouterConfig{ - ConfigPollerFactory: func(config *nodev1.RouterConfig) configpoller.ConfigPoller { - pm.initConfig = config - return &pm - }, - }, - }, func(t *testing.T, xEnv *testenv.Environment) { - - var startedReq atomic.Bool - go func() { - startedReq.Store(true) - res, err := xEnv.MakeGraphQLRequestWithContext(context.Background(), testenv.GraphQLRequest{ - Query: `{ employees { id } }`, - }) - require.NoError(t, err) - assert.Equal(t, res.Response.StatusCode, 200) - assert.Equal(t, `{"errors":[{"message":"Failed to fetch from Subgraph 'employees'."}],"data":{"employees":null}}`, res.Body) - }() - - // Let's wait a bit to make sure all requests are in flight - // otherwise the shutdown will be too fast and the wait-group will not be done fully - require.Eventually(t, func() bool { - return startedReq.Load() - }, time.Second*10, time.Millisecond*100) - time.Sleep(time.Millisecond * 100) - - var done atomic.Bool - go func() { - defer done.Store(true) - - err := xEnv.Router.Shutdown(context.Background()) - assert.ErrorContains(t, err, context.DeadlineExceeded.Error()) - }() - - require.Eventually(t, func() bool { - return done.Load() - }, time.Second*20, time.Millisecond*100) - }) - }) - t.Run("Swap config closes websockets connections of old graph instance immediately", func(t *testing.T) { t.Parallel() @@ -357,58 +249,251 @@ func TestConfigHotReload(t *testing.T) { require.NoError(t, conn.Close()) }) }) - } func TestConfigHotReloadFile(t *testing.T) { t.Parallel() - // Create a temporary file for the router config - configFile := t.TempDir() + "/config.json" + t.Run("hot-reload config from file", func(t *testing.T) { + t.Parallel() - // Initial config with just the employees subgraph - initialConfig := MakeTestConfig("initial") + // Create a temporary file for the router config + configFile := t.TempDir() + "/config.json" - // Write initial config to file - initialBytes, err := json.Marshal(initialConfig) - require.NoError(t, err) - err = os.WriteFile(configFile, initialBytes, 0644) - require.NoError(t, err) + // Initial config with just the employees subgraph + writeTestConfig(t, "initial", configFile) - testenv.Run(t, &testenv.Config{ - RouterOptions: []core.Option{ - core.WithExecutionConfig(&core.ExecutionConfig{ - Path: configFile, - Watch: true, - WatchInterval: 100 * time.Millisecond, - }), - }, - }, func(t *testing.T, xEnv *testenv.Environment) { - res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ - Query: `query { hello }`, + testenv.Run(t, &testenv.Config{ + RouterOptions: []core.Option{ + core.WithConfigVersionHeader(true), + core.WithExecutionConfig(&core.ExecutionConfig{ + Path: configFile, + Watch: true, + WatchInterval: 100 * time.Millisecond, + }), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { hello }`, + }) + require.Equal(t, res.Response.StatusCode, 200) + require.Equal(t, "initial", res.Response.Header.Get("X-Router-Config-Version")) + + writeTestConfig(t, "updated", configFile) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { hello }`, + }) + require.Equal(t, "updated", res.Response.Header.Get("X-Router-Config-Version")) + }, 2*time.Second, 100*time.Millisecond) }) - require.JSONEq(t, `{"data":{"hello":"initial"}}`, res.Body) + }) - updatedConfig := MakeTestConfig("updated") + t.Run("does not hot-reload config from file if watch is disabled", func(t *testing.T) { + t.Parallel() - updatedBytes, err := json.Marshal(updatedConfig) - require.NoError(t, err) + // Create a temporary file for the router config + configFile := t.TempDir() + "/config.json" - err = os.WriteFile(configFile, updatedBytes, 0644) - require.NoError(t, err) + // Initial config with just the employees subgraph + writeTestConfig(t, "initial", configFile) - require.EventuallyWithT(t, func(t *assert.CollectT) { - res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + testenv.Run(t, &testenv.Config{ + RouterOptions: []core.Option{ + core.WithConfigVersionHeader(true), + core.WithExecutionConfig(&core.ExecutionConfig{ + Path: configFile, + Watch: false, + WatchInterval: 100 * time.Millisecond, + }), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { hello }`, + }) + require.Equal(t, res.Response.StatusCode, 200) + require.Equal(t, "initial", res.Response.Header.Get("X-Router-Config-Version")) + + writeTestConfig(t, "updated", configFile) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { hello }`, + }) + require.Equal(t, "initial", res.Response.Header.Get("X-Router-Config-Version")) + }, 2*time.Second, 100*time.Millisecond) + }) + }) + + t.Run("does not interrupt existing client traffic", func(t *testing.T) { + t.Parallel() + + // Create a temporary file for the router config + configFile := t.TempDir() + "/config.json" + + // Initial config with just the employees subgraph + writeTestConfig(t, "initial", configFile) + + testenv.Run(t, &testenv.Config{ + Subgraphs: testenv.SubgraphsConfig{ + GlobalDelay: time.Millisecond * 500, + }, + RouterOptions: []core.Option{ + core.WithConfigVersionHeader(true), + core.WithExecutionConfig(&core.ExecutionConfig{ + Path: configFile, + Watch: true, + WatchInterval: 100 * time.Millisecond, + }), + core.WithEngineExecutionConfig(config.EngineExecutionConfiguration{ + EnableSingleFlight: false, + MaxConcurrentResolvers: 32, + }), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ Query: `query { hello }`, }) - require.JSONEq(t, `{"data":{"hello":"updated"}}`, res.Body) - }, 2*time.Second, 100*time.Millisecond) + require.Equal(t, res.Response.StatusCode, 200) + require.Equal(t, "initial", res.Response.Header.Get("X-Router-Config-Version")) + + var done atomic.Uint32 + + go func() { + defer done.Add(1) + + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { hello }`, + }) + require.Equal(t, res.Response.StatusCode, 200) + require.Equal(t, "initial", res.Response.Header.Get("X-Router-Config-Version")) + }() + + go func() { + defer done.Add(1) + + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { hello }`, + }) + require.Equal(t, res.Response.StatusCode, 200) + require.Equal(t, "initial", res.Response.Header.Get("X-Router-Config-Version")) + }() + + time.Sleep(time.Millisecond * 100) + + writeTestConfig(t, "updated", configFile) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { hello }`, + }) + require.Equal(t, "updated", res.Response.Header.Get("X-Router-Config-Version")) + }, 2*time.Second, 100*time.Millisecond) + + // Ensure that all requests are served successfully + require.Eventually(t, func() bool { + return done.Load() == 2 + }, time.Second*5, time.Millisecond*100) + }) }) } -func MakeTestConfig(msg string) *nodev1.RouterConfig { - return &nodev1.RouterConfig{ - Version: "1a7c0b1a-839c-4b6f-9d05-7cb728168f57", +func TestSwapConfig(t *testing.T) { + t.Parallel() + + t.Run("shutdown server waits until all requests has been served", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + Subgraphs: testenv.SubgraphsConfig{ + GlobalDelay: time.Millisecond * 1000, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + var requestsStarted atomic.Uint32 + var requestsDone atomic.Uint32 + + for i := 0; i < 10; i++ { + requestsStarted.Add(1) + func() { + defer requestsDone.Add(1) + + // Create a new context for each request to ensure that the request is not cancelled by the shutdown + res, err := xEnv.MakeGraphQLRequestWithContext(context.Background(), testenv.GraphQLRequest{ + Query: `{ employees { id } }`, + }) + require.NoError(t, err) + require.Equal(t, res.Response.StatusCode, 200) + require.JSONEq(t, employeesIDData, res.Body) + }() + } + + // Let's wait until all requests are in flight + require.Eventually(t, func() bool { + return requestsStarted.Load() == 10 + }, time.Second*5, time.Millisecond*100) + + xEnv.Shutdown() + + // Let's wait until all requests are completed + require.Eventually(t, func() bool { + return requestsDone.Load() == 10 + }, time.Second*20, time.Millisecond*100) + }) + }) + + t.Run("Router grace period defines how long the shutdown can take until all client connections are closed immediately", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + Subgraphs: testenv.SubgraphsConfig{ + // This is a very high delay to make sure that the shutdown is enforced by the grace period + GlobalDelay: time.Hour * 1, + }, + RouterOptions: []core.Option{ + // This results in a context.DeadlineExceeded error after the grace period + core.WithGracePeriod(time.Millisecond * 100), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + + var startedReq atomic.Bool + go func() { + startedReq.Store(true) + res, err := xEnv.MakeGraphQLRequestWithContext(context.Background(), testenv.GraphQLRequest{ + Query: `{ employees { id } }`, + }) + require.NoError(t, err) + assert.Equal(t, res.Response.StatusCode, 200) + assert.Equal(t, `{"errors":[{"message":"Failed to fetch from Subgraph 'employees'."}],"data":{"employees":null}}`, res.Body) + }() + + // Let's wait a bit to make sure all requests are in flight + // otherwise the shutdown will be too fast and the wait-group will not be done fully + require.Eventually(t, func() bool { + return startedReq.Load() + }, time.Second*10, time.Millisecond*100) + time.Sleep(time.Millisecond * 100) + + var done atomic.Bool + go func() { + defer done.Store(true) + + err := xEnv.Router.Shutdown(context.Background()) + assert.ErrorContains(t, err, context.DeadlineExceeded.Error()) + }() + + require.Eventually(t, func() bool { + return done.Load() + }, time.Second*20, time.Millisecond*100) + }) + }) +} + +func writeTestConfig(t *testing.T, version string, path string) { + t.Helper() + + cfg := &nodev1.RouterConfig{ + Version: version, EngineConfig: &nodev1.EngineConfiguration{ DefaultFlushInterval: 500, DatasourceConfigurations: []*nodev1.DataSourceConfiguration{ @@ -422,7 +507,7 @@ func MakeTestConfig(msg string) *nodev1.RouterConfig { }, CustomStatic: &nodev1.DataSourceCustom_Static{ Data: &nodev1.ConfigurationVariable{ - StaticVariableContent: fmt.Sprintf(`{"hello": "%s"}`, msg), + StaticVariableContent: `{"hello": "Hello!"}`, }, }, Id: "0", @@ -437,6 +522,12 @@ func MakeTestConfig(msg string) *nodev1.RouterConfig { }, }, } + + bytes, err := json.Marshal(cfg) + require.NoError(t, err) + + err = os.WriteFile(path, bytes, 0644) + require.NoError(t, err) } func BenchmarkConfigHotReload(b *testing.B) {