-
Couldn't load subscription status.
- Fork 10
RSDK-11248 RSDK-11266 RSDK-11900 RSDK-11901 Add restart checking logic #153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1ac0153
4fdb071
f081f26
e6cf56d
f23c01d
c95f513
aab8e64
7013574
b220f48
48fff6a
649a74f
961ecb0
b71b513
c8bc58a
b5d1d6f
7d701d3
75b10a4
76f8a47
2497b05
f0338c5
01f5025
19b26dc
4dd0863
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ import ( | |
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "net/url" | ||
| "os" | ||
| "regexp" | ||
|
|
@@ -21,13 +22,18 @@ import ( | |
| "github.com/viamrobotics/agent/subsystems/viamserver" | ||
| "github.com/viamrobotics/agent/utils" | ||
| pb "go.viam.com/api/app/agent/v1" | ||
| apppb "go.viam.com/api/app/v1" | ||
| "go.viam.com/rdk/logging" | ||
| goutils "go.viam.com/utils" | ||
| "go.viam.com/utils/rpc" | ||
| ) | ||
|
|
||
| const ( | ||
| minimalCheckInterval = time.Second * 5 | ||
| // The minimal (and default) interval for checking for config updates via DeviceAgentConfig. | ||
| minimalDeviceAgentConfigCheckInterval = time.Second * 5 | ||
| // The minimal (and default) interval for checking whether agent needs to be restarted. | ||
| minimalNeedsRestartCheckInterval = time.Second * 1 | ||
|
|
||
| defaultNetworkTimeout = time.Second * 15 | ||
| // stopAllTimeout must be lower than systemd subsystems/viamagent/viam-agent.service timeout of 4mins | ||
| // and higher than subsystems/viamserver/viamserver.go timeout of 2mins. | ||
|
|
@@ -42,7 +48,6 @@ type Manager struct { | |
|
|
||
| connMu sync.RWMutex | ||
| conn rpc.ClientConn | ||
| client pb.AgentDeviceServiceClient | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO this was a pointless field to store on the manager; creating a gRPC client on top of |
||
| cloudConfig *logging.CloudConfig | ||
|
|
||
| logger logging.Logger | ||
|
|
@@ -209,7 +214,7 @@ func (m *Manager) SubsystemUpdates(ctx context.Context) { | |
| m.logger.Warn(err) | ||
| } | ||
| if m.viamAgentNeedsRestart { | ||
| m.Exit() | ||
| m.Exit(fmt.Sprintf("A new version of %s has been installed", SubsystemName)) | ||
| return | ||
| } | ||
| } else { | ||
|
|
@@ -221,17 +226,19 @@ func (m *Manager) SubsystemUpdates(ctx context.Context) { | |
| needRestartConfigChange := m.viamServer.Update(ctx, m.cfg) | ||
|
|
||
| if needRestart || needRestartConfigChange || m.viamServerNeedsRestart || m.viamAgentNeedsRestart { | ||
| if m.viamServer.(viamserver.RestartCheck).SafeToRestart(ctx) { | ||
| if m.viamServer.Property(ctx, viamserver.RestartPropertyRestartAllowed) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed the |
||
| m.logger.Infof("%s has allowed a restart; will restart", viamserver.SubsysName) | ||
| if err := m.viamServer.Stop(ctx); err != nil { | ||
| m.logger.Warn(err) | ||
| } else { | ||
| m.viamServerNeedsRestart = false | ||
| } | ||
| if m.viamAgentNeedsRestart { | ||
| m.Exit() | ||
| m.Exit(fmt.Sprintf("A new version of %s has been installed", SubsystemName)) | ||
| return | ||
| } | ||
| } else { | ||
| m.logger.Warnf("%s has NOT allowed a restart; will NOT restart", viamserver.SubsysName) | ||
| m.viamServerNeedsRestart = true | ||
| } | ||
| } | ||
|
|
@@ -280,26 +287,26 @@ func (m *Manager) SubsystemUpdates(ctx context.Context) { | |
| // CheckUpdates retrieves an updated config from the cloud, and then passes it to SubsystemUpdates(). | ||
| func (m *Manager) CheckUpdates(ctx context.Context) time.Duration { | ||
| defer utils.Recover(m.logger, nil) | ||
| m.logger.Debug("Checking cloud for update") | ||
| interval, err := m.GetConfig(ctx) | ||
| m.logger.Debug("Checking cloud for device agent config updates") | ||
| deviceAgentConfigCheckInterval, err := m.GetConfig(ctx) | ||
|
|
||
| if interval < minimalCheckInterval { | ||
| interval = minimalCheckInterval | ||
| if deviceAgentConfigCheckInterval < minimalDeviceAgentConfigCheckInterval { | ||
| deviceAgentConfigCheckInterval = minimalDeviceAgentConfigCheckInterval | ||
| } | ||
|
|
||
| // randomly fuzz the interval by +/- 5% | ||
| interval = utils.FuzzTime(interval, 0.05) | ||
| deviceAgentConfigCheckInterval = utils.FuzzTime(deviceAgentConfigCheckInterval, 0.05) | ||
|
|
||
| // we already log in all error cases inside GetConfig, so | ||
| // no need to log again. | ||
| if err != nil { | ||
| return interval | ||
| return deviceAgentConfigCheckInterval | ||
| } | ||
|
|
||
| // update and (re)start subsystems | ||
| m.SubsystemUpdates(ctx) | ||
|
|
||
| return interval | ||
| return deviceAgentConfigCheckInterval | ||
| } | ||
|
|
||
| func (m *Manager) setDebug(debug bool) { | ||
|
|
@@ -380,13 +387,51 @@ func (m *Manager) SubsystemHealthChecks(ctx context.Context) { | |
| } | ||
| } | ||
|
|
||
| // CheckIfNeedsRestart returns the check restart interval and whether the agent (and | ||
| // therefore all its subsystems) has been forcibly restarted by app. | ||
| func (m *Manager) CheckIfNeedsRestart(ctx context.Context) (time.Duration, bool) { | ||
| m.logger.Debug("Checking cloud for forced restarts") | ||
| if m.cloudConfig == nil { | ||
| m.logger.Warn("can't CheckIfNeedsRestart until successful config load") | ||
jmatth marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return minimalNeedsRestartCheckInterval, false | ||
| } | ||
|
|
||
| // Only continue this check if viam-server does not handle restart checking itself | ||
| // (return early if viamserver _does_ handle restart checking). | ||
| if !m.viamServer.Property(ctx, viamserver.RestartPropertyDoesNotHandleNeedsRestart) { | ||
| return minimalNeedsRestartCheckInterval, false | ||
| } | ||
|
|
||
| m.logger.Debug("Checking cloud for forced restarts") | ||
| timeoutCtx, cancelFunc := context.WithTimeout(ctx, defaultNetworkTimeout) | ||
| defer cancelFunc() | ||
|
|
||
| if err := m.dial(timeoutCtx); err != nil { | ||
| m.logger.Warn(errw.Wrapf(err, "dialing to check if restart needed")) | ||
| return minimalNeedsRestartCheckInterval, false | ||
| } | ||
|
|
||
| robotServiceClient := apppb.NewRobotServiceClient(m.conn) | ||
| req := &apppb.NeedsRestartRequest{Id: m.cloudConfig.ID} | ||
| res, err := robotServiceClient.NeedsRestart(timeoutCtx, req) | ||
| if err != nil { | ||
| m.logger.Warn(errw.Wrapf(err, "checking if restart needed")) | ||
| return minimalNeedsRestartCheckInterval, false | ||
| } | ||
|
|
||
| return res.GetRestartCheckInterval().AsDuration(), res.GetMustRestart() | ||
| } | ||
|
|
||
| // CloseAll stops all subsystems and closes the cloud connection. | ||
| func (m *Manager) CloseAll() { | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
|
|
||
| // Use a slow goroutine watcher to log and continue if shutdown is taking too long. | ||
| slowWatcher, slowWatcherCancel := goutils.SlowGoroutineWatcher( | ||
| stopAllTimeout, "Agent is taking a while to shut down,", m.logger) | ||
| stopAllTimeout, | ||
| fmt.Sprintf("Viam agent subsystems and/or background workers failed to shut down within %v", stopAllTimeout), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [drive-by] This log was getting output after agent shutdown timed out, so the message was slightly inaccurate. |
||
| m.logger, | ||
| ) | ||
|
|
||
| slowTicker := time.NewTicker(10 * time.Second) | ||
| defer slowTicker.Stop() | ||
|
|
@@ -430,7 +475,6 @@ func (m *Manager) CloseAll() { | |
| } | ||
| } | ||
|
|
||
| m.client = nil | ||
| m.conn = nil | ||
| }) | ||
|
|
||
|
|
@@ -479,7 +523,8 @@ func (m *Manager) CloseAll() { | |
| } | ||
| } | ||
|
|
||
| // StartBackgroundChecks kicks off a go routine that loops on a timer to check for updates and health checks. | ||
| // StartBackgroundChecks kicks off go routines that loop on a timerr to check for updates, | ||
| // health checks, and restarts. | ||
| func (m *Manager) StartBackgroundChecks(ctx context.Context) { | ||
| if ctx.Err() != nil { | ||
| return | ||
|
|
@@ -495,18 +540,18 @@ func (m *Manager) StartBackgroundChecks(ctx context.Context) { | |
| }) | ||
| defer m.activeBackgroundWorkers.Done() | ||
|
|
||
| checkInterval := minimalCheckInterval | ||
| deviceAgentConfigCheckInterval := minimalDeviceAgentConfigCheckInterval | ||
| m.cfgMu.RLock() | ||
| wait := m.cfg.AdvancedSettings.WaitForUpdateCheck.Get() | ||
| m.cfgMu.RUnlock() | ||
| if wait { | ||
| checkInterval = m.CheckUpdates(ctx) | ||
| deviceAgentConfigCheckInterval = m.CheckUpdates(ctx) | ||
| } else { | ||
| // premptively start things before we go into the regular update/check/restart | ||
| m.SubsystemHealthChecks(ctx) | ||
| } | ||
|
|
||
| timer := time.NewTimer(checkInterval) | ||
| timer := time.NewTimer(deviceAgentConfigCheckInterval) | ||
| defer timer.Stop() | ||
| for { | ||
| if ctx.Err() != nil { | ||
|
|
@@ -516,9 +561,39 @@ func (m *Manager) StartBackgroundChecks(ctx context.Context) { | |
| case <-ctx.Done(): | ||
| return | ||
| case <-timer.C: | ||
| checkInterval = m.CheckUpdates(ctx) | ||
| deviceAgentConfigCheckInterval = m.CheckUpdates(ctx) | ||
| m.SubsystemHealthChecks(ctx) | ||
| timer.Reset(checkInterval) | ||
| timer.Reset(deviceAgentConfigCheckInterval) | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| m.activeBackgroundWorkers.Add(1) | ||
| go func() { | ||
| defer m.activeBackgroundWorkers.Done() | ||
|
|
||
| timer := time.NewTimer(minimalNeedsRestartCheckInterval) | ||
| defer timer.Stop() | ||
| for { | ||
| if ctx.Err() != nil { | ||
| return | ||
| } | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-timer.C: | ||
| needsRestartCheckInterval, needsRestart := m.CheckIfNeedsRestart(ctx) | ||
| if needsRestartCheckInterval < minimalNeedsRestartCheckInterval { | ||
| needsRestartCheckInterval = minimalNeedsRestartCheckInterval | ||
| } | ||
| if needsRestart { | ||
| // Do not mark m.agentNeedsRestart and instead Exit immediately; we do not want | ||
| // to wait for viam-server to allow a restart as it may be in a bad state. | ||
| m.Exit(fmt.Sprintf("A restart of %s was requested from app", SubsystemName)) | ||
| } | ||
| // As with the device agent config check interval, randomly fuzz the interval by | ||
| // +/- 5%. | ||
| timer.Reset(utils.FuzzTime(needsRestartCheckInterval, 0.05)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We were doing this for the config check interval, too... I'm not sure why; anyone know? |
||
| } | ||
| } | ||
| }() | ||
|
|
@@ -531,11 +606,11 @@ func (m *Manager) dial(ctx context.Context) error { | |
| return ctx.Err() | ||
| } | ||
| if m.cloudConfig == nil { | ||
| return errors.New("cannot dial() until successful LoadConfig") | ||
| return errors.New("cannot dial() until successful config load") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [drive-by] |
||
| } | ||
| m.connMu.Lock() | ||
| defer m.connMu.Unlock() | ||
| if m.client != nil { | ||
| if m.conn != nil { | ||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -564,7 +639,6 @@ func (m *Manager) dial(ctx context.Context) error { | |
| return err | ||
| } | ||
| m.conn = conn | ||
| m.client = pb.NewAgentDeviceServiceClient(m.conn) | ||
|
|
||
| if m.netAppender != nil { | ||
| m.netAppender.SetConn(conn, true) | ||
|
|
@@ -577,27 +651,28 @@ func (m *Manager) dial(ctx context.Context) error { | |
| // GetConfig retrieves the configuration from the cloud. | ||
| func (m *Manager) GetConfig(ctx context.Context) (time.Duration, error) { | ||
| if m.cloudConfig == nil { | ||
| err := errors.New("can't GetConfig until successful LoadConfig") | ||
| err := errors.New("can't GetConfig until successful config load") | ||
| m.logger.Warn(err) | ||
| return minimalCheckInterval, err | ||
| return minimalDeviceAgentConfigCheckInterval, err | ||
| } | ||
| timeoutCtx, cancelFunc := context.WithTimeout(ctx, defaultNetworkTimeout) | ||
| defer cancelFunc() | ||
|
|
||
| if err := m.dial(timeoutCtx); err != nil { | ||
| m.logger.Warn(errw.Wrapf(err, "fetching %s config", SubsystemName)) | ||
| return minimalCheckInterval, err | ||
| m.logger.Warn(errw.Wrapf(err, "dialing to fetch %s config", SubsystemName)) | ||
| return minimalDeviceAgentConfigCheckInterval, err | ||
| } | ||
|
|
||
| agentDeviceServiceClient := pb.NewAgentDeviceServiceClient(m.conn) | ||
| req := &pb.DeviceAgentConfigRequest{ | ||
| Id: m.cloudConfig.ID, | ||
| HostInfo: m.getHostInfo(), | ||
| VersionInfo: m.getVersions(), | ||
| } | ||
| resp, err := m.client.DeviceAgentConfig(timeoutCtx, req) | ||
| resp, err := agentDeviceServiceClient.DeviceAgentConfig(timeoutCtx, req) | ||
| if err != nil { | ||
| m.logger.Warn(errw.Wrapf(err, "fetching %s config", SubsystemName)) | ||
| return minimalCheckInterval, err | ||
| return minimalDeviceAgentConfigCheckInterval, err | ||
| } | ||
| fixWindowsPaths(resp) | ||
|
|
||
|
|
@@ -699,7 +774,7 @@ func (m *Manager) getVersions() *pb.VersionInfo { | |
| return vers | ||
| } | ||
|
|
||
| func (m *Manager) Exit() { | ||
| m.logger.Info("A new viam-agent has been installed. Will now exit to be restarted by service manager.") | ||
| func (m *Manager) Exit(reason string) { | ||
| m.logger.Infow(fmt.Sprintf("%s will now exit to be restarted by service manager", SubsystemName), "reason", reason) | ||
| m.globalCancel() | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now have two background check goroutines: one checks for a new config every 5s (existing), and another checks for a restart every 1s (new). Each check can receive a new, different interval from the app call, so they need to be running at different cadences in different goroutines. You'll also notice that I renamed some
intervalvariable names in this file to be more specific as to which "interval" they were associated with.