Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: polling based execution config watcher #1671

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
358 changes: 265 additions & 93 deletions router-tests/config_hot_reload_test.go

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions router/cmd/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ func NewRouter(ctx context.Context, params Params, additionalOptions ...core.Opt

if executionConfigPath != "" {
options = append(options, core.WithExecutionConfig(&core.ExecutionConfig{
Watch: cfg.ExecutionConfig.File.Watch,
Path: executionConfigPath,
Watch: cfg.ExecutionConfig.File.Watch,
WatchInterval: cfg.ExecutionConfig.File.WatchInterval,
Path: executionConfigPath,
}))
} else {
options = append(options, core.WithConfigPollerConfig(&core.RouterConfigPollerConfig{
Expand Down
3 changes: 3 additions & 0 deletions router/core/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ func (s *server) SwapGraphServer(ctx context.Context, svr *graphServer) {
}
}

// Swap the graph server
s.mu.Lock()
s.graphServer = svr
s.mu.Unlock()
}

// listenAndServe starts the server and blocks until the server is shutdown.
Expand Down
64 changes: 28 additions & 36 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ type (
}

ExecutionConfig struct {
Watch bool
Path string
Watch bool
WatchInterval time.Duration
Path string
}

AccessLogsConfig struct {
Expand Down Expand Up @@ -1116,45 +1117,36 @@ func (r *Router) Start(ctx context.Context) error {
}()

if r.executionConfig != nil && r.executionConfig.Watch {
go watcher.LogSimpleWatch(ctx, watcher.SimpleWatcherOptions{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old watcher is unused now, correct? Any reason to keep unused code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posterity I guess, but its broken in other ways anyway. I'll remove it

Logger: r.logger.With(zap.String("watcher", "execution_config")),
Path: r.executionConfig.Path,
Interval: r.executionConfig.WatchInterval,
Callback: func() {
if r.shutdown.Load() {
r.logger.Warn("Router is in shutdown state. Skipping config update")
return
}

w, err := watcher.NewWatcher(r.logger.With(zap.String("watcher", "execution_config")))
if err != nil {
return fmt.Errorf("failed to start watcher for execution config file: %w", err)
}

// Watch the execution config file for changes. Returning an error will stop the watcher.
// We intentionally ignore the error here because the user can retry. The watcher is closed when context is done.
err = w.Watch(ctx, r.executionConfig.Path, func(events []watcher.Event) error {
if r.shutdown.Load() {
r.logger.Warn("Router is in shutdown state. Skipping config update")
return nil
}

data, err := os.ReadFile(r.executionConfig.Path)
if err != nil {
r.logger.Error("Failed to read config file", zap.Error(err))
return nil
}

r.logger.Info("Config file changed. Updating server with new config", zap.String("path", r.executionConfig.Path))
data, err := os.ReadFile(r.executionConfig.Path)
if err != nil {
r.logger.Error("Failed to read config file", zap.Error(err))
return
}

cfg, err := execution_config.UnmarshalConfig(data)
if err != nil {
r.logger.Error("Failed to serialize config file", zap.Error(err))
return nil
}
r.logger.Info("Config file changed. Updating server with new config", zap.String("path", r.executionConfig.Path))

if err := r.newServer(ctx, cfg); err != nil {
r.logger.Error("Failed to update server with new config", zap.Error(err))
return nil
}
cfg, err := execution_config.UnmarshalConfig(data)
if err != nil {
r.logger.Error("Failed to unmarshal config file", zap.Error(err))
return
}

return nil
if err := r.newServer(ctx, cfg); err != nil {
r.logger.Error("Failed to update server with new config", zap.Error(err))
return
}
},
})
if err != nil {
r.logger.Error("Failed to watch execution config file. Restart the router to apply changes", zap.Error(err))
return fmt.Errorf("failed to watch execution config file: %w", err)
}

r.logger.Info("Watching config file for changes. Router will hot-reload automatically without downtime",
zap.String("path", r.executionConfig.Path),
Expand Down
1 change: 1 addition & 0 deletions router/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ require (
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1
github.com/tonglil/opentelemetry-go-datadog-propagator v0.1.3
github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083
go.uber.org/goleak v1.3.0
go.uber.org/ratelimit v0.3.1
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
golang.org/x/text v0.21.0
Expand Down
5 changes: 3 additions & 2 deletions router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,9 @@ type FallbackExecutionConfigStorage struct {
}

type ExecutionConfigFile struct {
Path string `yaml:"path,omitempty" env:"EXECUTION_CONFIG_FILE_PATH"`
Watch bool `yaml:"watch,omitempty" envDefault:"false" env:"EXECUTION_CONFIG_FILE_WATCH"`
Path string `yaml:"path,omitempty" env:"EXECUTION_CONFIG_FILE_PATH"`
Watch bool `yaml:"watch,omitempty" envDefault:"false" env:"EXECUTION_CONFIG_FILE_WATCH"`
WatchInterval time.Duration `yaml:"watch_interval,omitempty" envDefault:"1s" env:"EXECUTION_CONFIG_FILE_WATCH_INTERVAL"`
}

type ExecutionConfig struct {
Expand Down
17 changes: 17 additions & 0 deletions router/pkg/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,15 @@
"required": [
"path"
],
"dependentSchemas": {
"watch_interval": {
"properties": {
"watch": {
"const": true
}
}
}
},
"properties": {
"path": {
"type": "string",
Expand All @@ -264,6 +273,14 @@
"type": "boolean",
"default": false,
"description": "Enable the watch mode. The watch mode is used to watch the execution config file for changes. If the file changes, the router will reload the execution config without downtime."
},
"watch_interval": {
"type": "string",
"description": "The interval at which the file is checked for changes. The period is specified as a string with a number and a unit, e.g. 10ms, 1s, 1m, 1h. The supported units are 'ms', 's', 'm', 'h'.",
"default": "1s",
"duration": {
"minimum": "1s"
}
}
}
}
Expand Down
87 changes: 74 additions & 13 deletions router/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,9 @@ persisted_operations:
func TestValidExecutionConfig(t *testing.T) {
t.Parallel()

f := createTempFileFromFixture(t, `
t.Run("s3 storage config", func(t *testing.T) {

f := createTempFileFromFixture(t, `
version: "1"

storage_providers:
Expand All @@ -410,11 +412,14 @@ execution_config:
provider_id: s3
object_path: "5ef73d80-cae4-4d0e-98a7-1e9fa922c1a4/92c25b45-a75b-4954-b8f6-6592a9b203eb/routerconfigs/latest.json"
`)
_, err := LoadConfig(f, "")
var js *jsonschema.ValidationError
require.NoError(t, err, &js)
_, err := LoadConfig(f, "")
var js *jsonschema.ValidationError
require.NoError(t, err, &js)
})

f = createTempFileFromFixture(t, `
t.Run("cdn storage config", func(t *testing.T) {

f := createTempFileFromFixture(t, `
version: "1"

storage_providers:
Expand All @@ -427,15 +432,35 @@ execution_config:
provider_id: cdn
object_path: "5ef73d80-cae4-4d0e-98a7-1e9fa922c1a4/92c25b45-a75b-4954-b8f6-6592a9b203eb/routerconfigs/latest.json"
`)
_, err = LoadConfig(f, "")
js = &jsonschema.ValidationError{}
require.NoError(t, err, &js)
_, err := LoadConfig(f, "")
js := &jsonschema.ValidationError{}
require.NoError(t, err, &js)

})

t.Run("file config", func(t *testing.T) {

f := createTempFileFromFixture(t, `
version: "1"

execution_config:
file:
path: "latest.json"
watch: true
watch_interval: "1s"
`)
_, err := LoadConfig(f, "")
js := &jsonschema.ValidationError{}
require.NoError(t, err, &js)
})
}

func TestInvalidExecutionConfig(t *testing.T) {
t.Parallel()

f := createTempFileFromFixture(t, `
t.Run("no object_path", func(t *testing.T) {

f := createTempFileFromFixture(t, `
version: "1"

storage_providers:
Expand All @@ -452,10 +477,46 @@ execution_config:
provider_id: s3
# Missing object_path
`)
_, err := LoadConfig(f, "")
var js *jsonschema.ValidationError
require.ErrorAs(t, err, &js)
require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config': additional properties 'storage' not allowed\n- at '/execution_config/storage': missing property 'object_path'\n- at '/execution_config': additional properties 'storage' not allowed", js.Causes[0].Error())
_, err := LoadConfig(f, "")
var js *jsonschema.ValidationError
require.ErrorAs(t, err, &js)
require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config': additional properties 'storage' not allowed\n- at '/execution_config/storage': missing property 'object_path'\n- at '/execution_config': additional properties 'storage' not allowed", js.Causes[0].Error())
})

t.Run("too low watch interval", func(t *testing.T) {

f := createTempFileFromFixture(t, `
version: "1"

execution_config:
file:
path: "latest.json"
watch: true
watch_interval: "500ms"
`)

_, err := LoadConfig(f, "")
var js *jsonschema.ValidationError
require.ErrorAs(t, err, &js)
require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config/file/watch_interval': duration must be greater or equal than 1s\n- at '/execution_config': additional properties 'file' not allowed\n- at '/execution_config': additional properties 'file' not allowed", js.Causes[0].Error())
})

t.Run("watch interval with watch disabled", func(t *testing.T) {

f := createTempFileFromFixture(t, `
version: "1"

execution_config:
file:
path: "latest.json"
watch: false
watch_interval: "1s"
`)
_, err := LoadConfig(f, "")
var js *jsonschema.ValidationError
require.ErrorAs(t, err, &js)
require.Equal(t, "at '/execution_config': oneOf failed, none matched\n- at '/execution_config/file/watch': value must be true\n- at '/execution_config': additional properties 'file' not allowed\n- at '/execution_config': additional properties 'file' not allowed", js.Causes[0].Error())
})
}

func TestValidLocalExecutionConfig(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion router/pkg/config/testdata/config_defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@
"ExecutionConfig": {
"File": {
"Path": "",
"Watch": false
"Watch": false,
"WatchInterval": 1000000000
},
"Storage": {
"ProviderID": "",
Expand Down
3 changes: 2 additions & 1 deletion router/pkg/config/testdata/config_full.json
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,8 @@
"ExecutionConfig": {
"File": {
"Path": "",
"Watch": false
"Watch": false,
"WatchInterval": 1000000000
},
"Storage": {
"ProviderID": "s3",
Expand Down
64 changes: 64 additions & 0 deletions router/pkg/watcher/simple_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package watcher

import (
"context"
"os"
"time"

"go.uber.org/zap"
)

type SimpleWatcherOptions struct {
Interval time.Duration
Logger *zap.Logger
Path string
Callback func()
}

func LogSimpleWatch(ctx context.Context, options SimpleWatcherOptions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we basically require all options to be set, would it make sense to make them private and have a function NewSimpleWatcherOptions() with a bit of validation that returns an error instead of a panicking when calling Logger or Callback?

Copy link
Contributor Author

@endigma endigma Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I could also split it into like NewSimpleWatcher(options) and func (*SimpleWatcher) Start(ctx) or something.

if err := SimpleWatch(ctx, options); err != nil {
options.Logger.Error("Error watching file", zap.Error(err))
}
}

func SimpleWatch(ctx context.Context, options SimpleWatcherOptions) error {
ticker := time.NewTicker(options.Interval)
defer ticker.Stop()

ll := options.Logger.With(zap.String("path", options.Path))

var prevModTime time.Time

stat, err := os.Stat(options.Path)
if err != nil {
ll.Debug("Target file cannot be statted", zap.Error(err))
} else {
prevModTime = stat.ModTime()
}

ll.Debug("Watching file for changes", zap.Time("initialModTime", prevModTime))

for {
select {
case <-ticker.C:
stat, err := os.Stat(options.Path)
if err != nil {
ll.Debug("Target file cannot be statted", zap.Error(err))

// Reset the mod time so we catch any new file at the target path
prevModTime = time.Time{}

continue
}

ll.Debug("Checking file for changes", zap.Time("prev", prevModTime), zap.Time("mod", stat.ModTime()))

if stat.ModTime().After(prevModTime) {
prevModTime = stat.ModTime()
options.Callback()
}
case <-ctx.Done():
return ctx.Err()
}
}
}
Loading