diff --git a/agent/agentserver/server.go b/agent/agentserver/server.go index 93376021b..b31e07004 100644 --- a/agent/agentserver/server.go +++ b/agent/agentserver/server.go @@ -21,10 +21,13 @@ import ( "net/http" _ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux. "os" + "sort" "strings" "sync" "time" + "go.uber.org/zap" + "github.com/uber/kraken/build-index/tagclient" "github.com/uber/kraken/core" "github.com/uber/kraken/lib/containerruntime" @@ -34,6 +37,7 @@ import ( "github.com/uber/kraken/tracker/announceclient" "github.com/uber/kraken/utils/handler" "github.com/uber/kraken/utils/httputil" + "github.com/uber/kraken/utils/log" "github.com/go-chi/chi" "github.com/uber-go/tally" @@ -42,7 +46,32 @@ import ( // Config defines Server configuration. type Config struct { // How long a successful readiness check is valid for. If 0, disable caching successful readiness. - readinessCacheTTL time.Duration `yaml:"readiness_cache_ttl"` + ReadinessCacheTTL time.Duration `yaml:"readiness_cache_ttl"` + + // Timeout for download operations + DownloadTimeout time.Duration `yaml:"download_timeout"` + + // Timeout for container runtime operations + ContainerRuntimeTimeout time.Duration `yaml:"container_runtime_timeout"` + + // Timeout for readiness checks + ReadinessTimeout time.Duration `yaml:"readiness_timeout"` + + // Enable detailed request logging + EnableRequestLogging bool `yaml:"enable_request_logging"` +} + +// applyDefaults sets default values for configuration. +func (c *Config) applyDefaults() { + if c.DownloadTimeout == 0 { + c.DownloadTimeout = 15 * time.Minute + } + if c.ContainerRuntimeTimeout == 0 { + c.ContainerRuntimeTimeout = 10 * time.Minute + } + if c.ReadinessTimeout == 0 { + c.ReadinessTimeout = 30 * time.Second + } } // Server defines the agent HTTP server. @@ -55,6 +84,7 @@ type Server struct { ac announceclient.Client containerRuntime containerruntime.Factory lastReady time.Time + lastReadyMu sync.RWMutex } // New creates a new Server. @@ -65,7 +95,9 @@ func New( sched scheduler.ReloadableScheduler, tags tagclient.Client, ac announceclient.Client, - containerRuntime containerruntime.Factory) *Server { + containerRuntime containerruntime.Factory, +) *Server { + config.applyDefaults() stats = stats.Tagged(map[string]string{ "module": "agentserver", @@ -89,6 +121,10 @@ func (s *Server) Handler() http.Handler { r.Use(middleware.StatusCounter(s.stats)) r.Use(middleware.LatencyTimer(s.stats)) + if s.config.EnableRequestLogging { + r.Use(s.requestLoggingMiddleware) + } + r.Get("/health", handler.Wrap(s.healthHandler)) r.Get("/readiness", handler.Wrap(s.readinessCheckHandler)) @@ -112,145 +148,351 @@ func (s *Server) Handler() http.Handler { return r } +// requestLoggingMiddleware logs request details for debugging. +func (s *Server) requestLoggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Create a logger with request context + logger := log.With( + "method", r.Method, + "path", r.URL.Path, + "remote_addr", r.RemoteAddr, + "user_agent", r.UserAgent(), + ) + + // Add logger to context + ctx := context.WithValue(r.Context(), "logger", logger) + r = r.WithContext(ctx) + + next.ServeHTTP(w, r) + }) +} + +// getLogger extracts logger from context, fallback to default. +func (s *Server) getLogger(ctx context.Context) *zap.SugaredLogger { + if logger, ok := ctx.Value("logger").(*zap.SugaredLogger); ok { + return logger + } + return log.With("component", "agentserver") +} + // getTagHandler proxies get tag requests to the build-index. func (s *Server) getTagHandler(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + logger := s.getLogger(ctx) + tag, err := httputil.ParseParam(r, "tag") if err != nil { - return err + return handler.Errorf("parse tag param: %s", err).Status(http.StatusBadRequest) + } + + // Validate tag format + if strings.TrimSpace(tag) == "" { + return handler.ErrorStatus(http.StatusBadRequest) } + + logger.Debugw("getting tag", "tag", tag) + d, err := s.tags.Get(tag) if err != nil { if err == tagclient.ErrTagNotFound { + logger.Debugw("tag not found", "tag", tag) return handler.ErrorStatus(http.StatusNotFound) } + logger.Errorw("failed to get tag", "tag", tag, "error", err) return handler.Errorf("get tag: %s", err) } + + logger.Debugw("tag found", "tag", tag, "digest", d.String()) + io.WriteString(w, d.String()) return nil } // downloadBlobHandler downloads a blob through p2p. func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + logger := s.getLogger(ctx) + + // Create timeout context for download operation + downloadCtx, cancel := context.WithTimeout(ctx, s.config.DownloadTimeout) + defer cancel() + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { - return err + return handler.Errorf("parse namespace param: %s", err).Status(http.StatusBadRequest) } + d, err := parseDigest(r) if err != nil { + return err // parseDigest already formats the error properly + } + + // Validate inputs + if strings.TrimSpace(namespace) == "" { + return handler.ErrorStatus(http.StatusBadRequest) + } + + logger = logger.With("namespace", namespace, "digest", d.String()) + logger.Debugw("downloading blob") + + // Try to get file from cache first + reader, err := s.getCachedBlob(d) + if err == nil { + defer func() { + if closeErr := reader.Close(); closeErr != nil { + logger.Errorw("failed to close cached reader", "error", closeErr) + } + }() + + logger.Debugw("serving blob from cache") + + if _, err := io.Copy(w, reader); err != nil { + return handler.Errorf("copy cached file: %s", err) + } + return nil + } + + // Cache miss or error - need to download + if err := s.downloadBlob(downloadCtx, logger, namespace, d); err != nil { return err } - f, err := s.cads.Cache().GetFileReader(d.Hex()) + + // Get the downloaded file + reader, err = s.getCachedBlob(d) + if err != nil { + logger.Errorw("failed to get downloaded blob", "error", err) + return handler.Errorf("get downloaded blob: %s", err) + } + defer func() { + if closeErr := reader.Close(); closeErr != nil { + logger.Errorw("failed to close downloaded reader", "error", closeErr) + } + }() + + logger.Debugw("serving downloaded blob") + + if _, err := io.Copy(w, reader); err != nil { + return handler.Errorf("copy downloaded file: %s", err) + } + + return nil +} + +// getCachedBlob attempts to get a blob from the cache. +func (s *Server) getCachedBlob(d core.Digest) (io.ReadCloser, error) { + reader, err := s.cads.Cache().GetFileReader(d.Hex()) if err != nil { if os.IsNotExist(err) || s.cads.InDownloadError(err) { - if err := s.sched.Download(namespace, d); err != nil { - if err == scheduler.ErrTorrentNotFound { - return handler.ErrorStatus(http.StatusNotFound) - } - return handler.Errorf("download torrent: %s", err) - } - f, err = s.cads.Cache().GetFileReader(d.Hex()) - if err != nil { - return handler.Errorf("store: %s", err) - } - } else { - return handler.Errorf("store: %s", err) + return nil, fmt.Errorf("blob not in cache: %w", err) } + return nil, fmt.Errorf("cache error: %w", err) } - if _, err := io.Copy(w, f); err != nil { - return fmt.Errorf("copy file: %s", err) + return reader, nil +} + +// downloadBlob downloads a blob via the scheduler. +func (s *Server) downloadBlob(ctx context.Context, logger *zap.SugaredLogger, namespace string, d core.Digest) error { + logger.Debugw("downloading blob via scheduler") + + // Monitor download with context + done := make(chan error, 1) + go func() { + done <- s.sched.Download(namespace, d) + }() + + select { + case err := <-done: + if err != nil { + if err == scheduler.ErrTorrentNotFound { + logger.Debugw("torrent not found", "error", err) + return handler.ErrorStatus(http.StatusNotFound) + } + logger.Errorw("download failed", "error", err) + return handler.Errorf("download torrent: %s", err) + } + logger.Debugw("download completed successfully") + return nil + + case <-ctx.Done(): + logger.Warnw("download timeout", "timeout", s.config.DownloadTimeout) + return handler.Errorf("download timeout after %v", s.config.DownloadTimeout).Status(http.StatusRequestTimeout) } - return nil } func (s *Server) deleteBlobHandler(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + logger := s.getLogger(ctx) + d, err := parseDigest(r) if err != nil { return err } + + logger = logger.With("digest", d.String()) + logger.Debugw("deleting blob") + if err := s.sched.RemoveTorrent(d); err != nil { + logger.Errorw("failed to remove torrent", "error", err) return handler.Errorf("remove torrent: %s", err) } + logger.Debugw("blob deleted successfully") return nil } // preloadTagHandler triggers docker daemon to download specified docker image. func (s *Server) preloadTagHandler(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + logger := s.getLogger(ctx) + + // Create timeout context for container runtime operations + runtimeCtx, cancel := context.WithTimeout(ctx, s.config.ContainerRuntimeTimeout) + defer cancel() + tag, err := httputil.ParseParam(r, "tag") if err != nil { - return err + return handler.Errorf("parse tag param: %s", err).Status(http.StatusBadRequest) } + parts := strings.Split(tag, ":") if len(parts) != 2 { - return handler.Errorf("failed to parse docker image tag") + return handler.Errorf("invalid docker image tag format: %s", tag).Status(http.StatusBadRequest) + } + repo, tagName := parts[0], parts[1] + + // Validate inputs + if strings.TrimSpace(repo) == "" || strings.TrimSpace(tagName) == "" { + return handler.ErrorStatus(http.StatusBadRequest) } - repo, tag := parts[0], parts[1] rt := httputil.GetQueryArg(r, "runtime", "docker") ns := httputil.GetQueryArg(r, "namespace", "") + + logger = logger.With("repo", repo, "tag", tagName, "runtime", rt, "namespace", ns) + logger.Debugw("preloading image") + + var preloadErr error switch rt { case "docker": - if err := s.containerRuntime.DockerClient(). - PullImage(context.Background(), repo, tag); err != nil { - return handler.Errorf("docker pull: %s", err) - } + preloadErr = s.containerRuntime.DockerClient().PullImage(runtimeCtx, repo, tagName) case "containerd": - if err := s.containerRuntime.ContainerdClient(). - PullImage(context.Background(), ns, repo, tag); err != nil { - return handler.Errorf("containerd pull: %s", err) - } + preloadErr = s.containerRuntime.ContainerdClient().PullImage(runtimeCtx, ns, repo, tagName) default: - return handler.Errorf("unsupported container runtime") + return handler.Errorf("unsupported container runtime: %s", rt) } + + if preloadErr != nil { + // Check if it's a context timeout + if runtimeCtx.Err() == context.DeadlineExceeded { + logger.Warnw("preload timeout", "timeout", s.config.ContainerRuntimeTimeout) + return handler.Errorf("preload timeout after %v", s.config.ContainerRuntimeTimeout).Status(http.StatusRequestTimeout) + } + + logger.Errorw("preload failed", "error", preloadErr) + return handler.Errorf("%s pull: %s", rt, preloadErr) + } + + logger.Debugw("preload completed successfully") return nil } func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + logger := s.getLogger(ctx) + + logger.Debugw("health check") + if err := s.sched.Probe(); err != nil { + logger.Errorw("health check failed", "error", err) return handler.Errorf("probe torrent client: %s", err) } + io.WriteString(w, "OK") return nil } func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) error { - if s.config.readinessCacheTTL != 0 { - rCacheValid := s.lastReady.Add(s.config.readinessCacheTTL).After(time.Now()) + ctx := r.Context() + logger := s.getLogger(ctx) + + // Check cache first + s.lastReadyMu.RLock() + if s.config.ReadinessCacheTTL != 0 { + rCacheValid := s.lastReady.Add(s.config.ReadinessCacheTTL).After(time.Now()) if rCacheValid { + s.lastReadyMu.RUnlock() + logger.Debugw("readiness check cached") io.WriteString(w, "OK") return nil } } + s.lastReadyMu.RUnlock() + + logger.Debugw("performing readiness check") + + // Create timeout context for readiness checks + checkCtx, cancel := context.WithTimeout(ctx, s.config.ReadinessTimeout) + defer cancel() + + type checkResult struct { + name string + err error + } - var schedErr, buildIndexErr, trackerErr error - var wg sync.WaitGroup + results := make(chan checkResult, 3) - wg.Add(3) + // Run checks concurrently with timeout go func() { - schedErr = s.sched.Probe() - wg.Done() + err := s.sched.Probe() + select { + case results <- checkResult{"scheduler", err}: + case <-checkCtx.Done(): + } }() + go func() { - buildIndexErr = s.tags.CheckReadiness() - wg.Done() + err := s.tags.CheckReadiness() + select { + case results <- checkResult{"build-index", err}: + case <-checkCtx.Done(): + } }() + go func() { - trackerErr = s.ac.CheckReadiness() - wg.Done() + err := s.ac.CheckReadiness() + select { + case results <- checkResult{"tracker", err}: + case <-checkCtx.Done(): + } }() - wg.Wait() - // TODO(akalpakchiev): Replace with errors.Join once upgraded to Go 1.20+. - errMsgs := []string{} - for _, err := range []error{schedErr, buildIndexErr, trackerErr} { - if err != nil { - errMsgs = append(errMsgs, err.Error()) + // Collect results + var errMsgs []string + for i := 0; i < 3; i++ { + select { + case result := <-results: + if result.err != nil { + errMsgs = append(errMsgs, fmt.Sprintf("%s: %v", result.name, result.err)) + } + case <-checkCtx.Done(): + logger.Warnw("readiness check timeout", "timeout", s.config.ReadinessTimeout) + return handler.Errorf("readiness check timeout after %v", s.config.ReadinessTimeout).Status(http.StatusServiceUnavailable) } } + if len(errMsgs) != 0 { - return handler.Errorf("agent not ready: %v", strings.Join(errMsgs, "\n")).Status(http.StatusServiceUnavailable) + // Sort error messages for deterministic output + sort.Strings(errMsgs) + errMsg := strings.Join(errMsgs, "\n") + logger.Warnw("readiness check failed", "errors", errMsg) + return handler.Errorf("agent not ready: %v", errMsg).Status(http.StatusServiceUnavailable) } + // Update cache + s.lastReadyMu.Lock() s.lastReady = time.Now() + s.lastReadyMu.Unlock() + + logger.Debugw("readiness check passed") io.WriteString(w, "OK") return nil } @@ -258,31 +500,55 @@ func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) e // patchSchedulerConfigHandler restarts the agent torrent scheduler with // the config in request body. func (s *Server) patchSchedulerConfigHandler(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + logger := s.getLogger(ctx) defer r.Body.Close() + + logger.Debugw("patching scheduler config") + var config scheduler.Config if err := json.NewDecoder(r.Body).Decode(&config); err != nil { + logger.Errorw("failed to decode scheduler config", "error", err) return handler.Errorf("json decode: %s", err).Status(http.StatusBadRequest) } + s.sched.Reload(config) + logger.Infow("scheduler config reloaded") return nil } func (s *Server) getBlacklistHandler(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + logger := s.getLogger(ctx) + + logger.Debugw("getting blacklist") + blacklist, err := s.sched.BlacklistSnapshot() if err != nil { + logger.Errorw("failed to get blacklist", "error", err) return handler.Errorf("blacklist snapshot: %s", err) } + if err := json.NewEncoder(w).Encode(&blacklist); err != nil { + logger.Errorw("failed to encode blacklist", "error", err) return handler.Errorf("json encode: %s", err) } + + logger.Debugw("blacklist retrieved", "count", len(blacklist)) return nil } func parseDigest(r *http.Request) (core.Digest, error) { raw, err := httputil.ParseParam(r, "digest") if err != nil { - return core.Digest{}, err + return core.Digest{}, handler.Errorf("parse digest param: %s", err).Status(http.StatusBadRequest) } + + // Validate digest format + if strings.TrimSpace(raw) == "" { + return core.Digest{}, handler.ErrorStatus(http.StatusBadRequest) + } + // TODO(codyg): Accept only a fully formed digest. d, err := core.NewSHA256DigestFromHex(raw) if err != nil { diff --git a/agent/agentserver/server_test.go b/agent/agentserver/server_test.go index 7bea45eeb..58098d7b3 100644 --- a/agent/agentserver/server_test.go +++ b/agent/agentserver/server_test.go @@ -15,7 +15,6 @@ package agentserver import ( "bytes" - "context" "encoding/json" "errors" "fmt" @@ -232,28 +231,28 @@ func TestReadinessCheckHandler(t *testing.T) { probeErr: errors.New("test scheduler error"), buildIndexErr: nil, trackerErr: nil, - wantErr: "GET http://{address}/readiness 503: agent not ready: test scheduler error", + wantErr: "GET http://{address}/readiness 503: agent not ready: scheduler: test scheduler error", }, { desc: "failure (build index not ready)", probeErr: nil, buildIndexErr: errors.New("build index not ready"), trackerErr: nil, - wantErr: "GET http://{address}/readiness 503: agent not ready: build index not ready", + wantErr: "GET http://{address}/readiness 503: agent not ready: build-index: build index not ready", }, { desc: "failure (tracker not ready)", probeErr: nil, buildIndexErr: nil, trackerErr: errors.New("tracker not ready"), - wantErr: "GET http://{address}/readiness 503: agent not ready: tracker not ready", + wantErr: "GET http://{address}/readiness 503: agent not ready: tracker: tracker not ready", }, { desc: "failure (all conditions fail)", probeErr: errors.New("test scheduler error"), buildIndexErr: errors.New("build index not ready"), trackerErr: errors.New("tracker not ready"), - wantErr: "GET http://{address}/readiness 503: agent not ready: test scheduler error\nbuild index not ready\ntracker not ready", + wantErr: "GET http://{address}/readiness 503: agent not ready: build-index: build index not ready\nscheduler: test scheduler error\ntracker: tracker not ready", }, } { t.Run(tc.desc, func(t *testing.T) { @@ -356,7 +355,7 @@ func TestReadinessCheckHandlerCache(t *testing.T) { tc.setupMocks(mocks) - s, addr := mocks.startServer(Config{readinessCacheTTL: tc.readinessCacheTTL}) + s, addr := mocks.startServer(Config{ReadinessCacheTTL: tc.readinessCacheTTL}) _, err := httputil.Get(fmt.Sprintf("http://%s/readiness", addr)) if tc.wantFirstCallSuccess { require.NoError(err) @@ -453,7 +452,7 @@ func TestPreloadHandler(t *testing.T) { url: fmt.Sprintf("/preload/tags/%s", tag), setup: func(mocks *serverMocks) { mocks.dockerCli.EXPECT(). - PullImage(context.Background(), "repo1", "tag1").Return(nil) + PullImage(gomock.Any(), "repo1", "tag1").Return(nil) mocks.containerRuntime.EXPECT(). DockerClient().Return(mocks.dockerCli) }, @@ -463,7 +462,7 @@ func TestPreloadHandler(t *testing.T) { url: fmt.Sprintf("/preload/tags/%s?runtime=containerd&namespace=name.space1", tag), setup: func(mocks *serverMocks) { mocks.containerdCli.EXPECT(). - PullImage(context.Background(), "name.space1", "repo1", "tag1").Return(nil) + PullImage(gomock.Any(), "name.space1", "repo1", "tag1").Return(nil) mocks.containerRuntime.EXPECT(). ContainerdClient().Return(mocks.containerdCli) }, @@ -472,7 +471,7 @@ func TestPreloadHandler(t *testing.T) { name: "unsupported runtime", url: fmt.Sprintf("/preload/tags/%s?runtime=crio", tag), setup: func(_ *serverMocks) {}, - expectedError: "/preload/tags/repo1:tag1?runtime=crio 500: unsupported container runtime", + expectedError: "/preload/tags/repo1:tag1?runtime=crio 500: unsupported container runtime: crio", }, } diff --git a/agent/cmd/cmd.go b/agent/cmd/cmd.go index 496a20232..2aec0a22b 100644 --- a/agent/cmd/cmd.go +++ b/agent/cmd/cmd.go @@ -14,6 +14,7 @@ package cmd import ( + "context" "flag" "fmt" "net/http" @@ -35,6 +36,7 @@ import ( "github.com/uber/kraken/utils/log" "github.com/uber/kraken/utils/netutil" + "github.com/docker/distribution/registry" "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -99,154 +101,347 @@ func WithLogger(l *zap.Logger) Option { return func(o *options) { o.logger = l } } -// Run runs the agent. -func Run(flags *Flags, opts ...Option) { - if flags.PeerPort == 0 { - panic("must specify non-zero peer port") +// App represents the agent application with all its components. +type App struct { + config Config + flags *Flags + stats tally.Scope + logger *zap.Logger + + // Components + peerContext core.PeerContext + cads *store.CADownloadStore + scheduler scheduler.ReloadableScheduler + tagClient tagclient.Client + registry *registry.Registry + agentServer *agentserver.Server + + // Cleanup functions + cleanup []func() +} + +// NewApp creates a new agent application. +func NewApp(flags *Flags, opts ...Option) (*App, error) { + app := &App{ + flags: flags, + cleanup: make([]func(), 0), } - if flags.AgentServerPort == 0 { - panic("must specify non-zero agent server port") + + if err := app.parseOptions(opts...); err != nil { + return nil, fmt.Errorf("parse options: %w", err) + } + + if err := app.validateFlags(); err != nil { + return nil, fmt.Errorf("validate flags: %w", err) } - if flags.AgentRegistryPort == 0 { - panic("must specify non-zero agent registry port") + + if err := app.loadConfig(); err != nil { + return nil, fmt.Errorf("load config: %w", err) + } + + if err := app.setupLogging(); err != nil { + return nil, fmt.Errorf("setup logging: %w", err) + } + + if err := app.setupMetrics(); err != nil { + return nil, fmt.Errorf("setup metrics: %w", err) } + return app, nil +} + +func (a *App) parseOptions(opts ...Option) error { var overrides options for _, o := range opts { o(&overrides) } - var config Config if overrides.config != nil { - config = *overrides.config - } else { - if err := configutil.Load(flags.ConfigFile, &config); err != nil { - panic(err) + a.config = *overrides.config + } + if overrides.metrics != nil { + a.stats = overrides.metrics + } + if overrides.logger != nil { + a.logger = overrides.logger + } + + return nil +} + +func (a *App) validateFlags() error { + if a.flags.PeerPort == 0 { + return fmt.Errorf("must specify non-zero peer port") + } + if a.flags.AgentServerPort == 0 { + return fmt.Errorf("must specify non-zero agent server port") + } + if a.flags.AgentRegistryPort == 0 { + return fmt.Errorf("must specify non-zero agent registry port") + } + return nil +} + +func (a *App) loadConfig() error { + if a.config.PeerIDFactory == "" && a.flags.ConfigFile != "" { + if err := configutil.Load(a.flags.ConfigFile, &a.config); err != nil { + return fmt.Errorf("load config file: %w", err) } - if flags.SecretsFile != "" { - if err := configutil.Load(flags.SecretsFile, &config); err != nil { - panic(err) + if a.flags.SecretsFile != "" { + if err := configutil.Load(a.flags.SecretsFile, &a.config); err != nil { + return fmt.Errorf("load secrets file: %w", err) } } } + return nil +} - if overrides.logger != nil { - log.SetGlobalLogger(overrides.logger.Sugar()) +func (a *App) setupLogging() error { + if a.logger != nil { + log.SetGlobalLogger(a.logger.Sugar()) } else { - zlog := log.ConfigureLogger(config.ZapLogging) - defer zlog.Sync() + zlog := log.ConfigureLogger(a.config.ZapLogging) + a.logger = zlog.Desugar() + a.cleanup = append(a.cleanup, func() { zlog.Sync() }) } + return nil +} - stats := overrides.metrics - if stats == nil { - s, closer, err := metrics.New(config.Metrics, flags.KrakenCluster) +func (a *App) setupMetrics() error { + if a.stats == nil { + s, closer, err := metrics.New(a.config.Metrics, a.flags.KrakenCluster) if err != nil { - log.Fatalf("Failed to init metrics: %s", err) + return fmt.Errorf("init metrics: %w", err) } - stats = s - defer closer.Close() + a.stats = s + a.cleanup = append(a.cleanup, func() { closer.Close() }) } - go metrics.EmitVersion(stats) + go metrics.EmitVersion(a.stats) + return nil +} - if flags.PeerIP == "" { +func (a *App) setupPeerContext() error { + peerIP := a.flags.PeerIP + if peerIP == "" { localIP, err := netutil.GetLocalIP() if err != nil { - log.Fatalf("Error getting local ip: %s", err) + return fmt.Errorf("get local IP: %w", err) } - flags.PeerIP = localIP + peerIP = localIP } pctx, err := core.NewPeerContext( - config.PeerIDFactory, flags.Zone, flags.KrakenCluster, flags.PeerIP, flags.PeerPort, false) + a.config.PeerIDFactory, a.flags.Zone, a.flags.KrakenCluster, peerIP, a.flags.PeerPort, false) if err != nil { - log.Fatalf("Failed to create peer context: %s", err) + return fmt.Errorf("create peer context: %w", err) } - cads, err := store.NewCADownloadStore(config.CADownloadStore, stats) + a.peerContext = pctx + return nil +} + +func (a *App) setupStorage() error { + cads, err := store.NewCADownloadStore(a.config.CADownloadStore, a.stats) if err != nil { - log.Fatalf("Failed to create local store: %s", err) + return fmt.Errorf("create CA download store: %w", err) } + a.cads = cads + return nil +} - netevents, err := networkevent.NewProducer(config.NetworkEvent) +func (a *App) setupScheduler() error { + netevents, err := networkevent.NewProducer(a.config.NetworkEvent) if err != nil { - log.Fatalf("Failed to create network event producer: %s", err) + return fmt.Errorf("create network event producer: %w", err) } - trackers, err := config.Tracker.Build() + trackers, err := a.config.Tracker.Build() if err != nil { - log.Fatalf("Error building tracker upstream: %s", err) + return fmt.Errorf("build tracker upstream: %w", err) } go trackers.Monitor(nil) - tls, err := config.TLS.BuildClient() + tls, err := a.config.TLS.BuildClient() if err != nil { - log.Fatalf("Error building client tls config: %s", err) + return fmt.Errorf("build client TLS config: %w", err) } - announceClient := announceclient.New(pctx, trackers, tls) + announceClient := announceclient.New(a.peerContext, trackers, tls) sched, err := scheduler.NewAgentScheduler( - config.Scheduler, stats, pctx, cads, netevents, trackers, announceClient, tls) + a.config.Scheduler, a.stats, a.peerContext, a.cads, netevents, trackers, announceClient, tls) + if err != nil { + return fmt.Errorf("create scheduler: %w", err) + } + + a.scheduler = sched + return nil +} + +func (a *App) setupTagClient() error { + buildIndexes, err := a.config.BuildIndex.Build() if err != nil { - log.Fatalf("Error creating scheduler: %s", err) + return fmt.Errorf("build build-index upstream: %w", err) } - buildIndexes, err := config.BuildIndex.Build() + tls, err := a.config.TLS.BuildClient() if err != nil { - log.Fatalf("Error building build-index upstream: %s", err) + return fmt.Errorf("build client TLS config: %w", err) } - tagClient := tagclient.NewClusterClient(buildIndexes, tls) + a.tagClient = tagclient.NewClusterClient(buildIndexes, tls) + return nil +} - transferer := transfer.NewReadOnlyTransferer(stats, cads, tagClient, sched) +func (a *App) setupRegistry() error { + transferer := transfer.NewReadOnlyTransferer(a.stats, a.cads, a.tagClient, a.scheduler) - registry, err := config.Registry.Build(config.Registry.ReadOnlyParameters(transferer, cads, stats)) + registry, err := a.config.Registry.Build(a.config.Registry.ReadOnlyParameters(transferer, a.cads, a.stats)) if err != nil { - log.Fatalf("Failed to init registry: %s", err) + return fmt.Errorf("init registry: %w", err) } - registryAddr := fmt.Sprintf("127.0.0.1:%d", flags.AgentRegistryPort) - containerRuntimeCfg := config.ContainerRuntime + a.registry = registry + return nil +} + +func (a *App) setupAgentServer() error { + registryAddr := fmt.Sprintf("127.0.0.1:%d", a.flags.AgentRegistryPort) + + containerRuntimeCfg := a.config.ContainerRuntime dockerdaemonCfg := dockerdaemon.Config{} - if config.DockerDaemon != dockerdaemonCfg { + if a.config.DockerDaemon != dockerdaemonCfg { log.Warn("please move docker config under \"container_runtime\"") - containerRuntimeCfg.Docker = config.DockerDaemon + containerRuntimeCfg.Docker = a.config.DockerDaemon } + containerRuntimeFactory, err := containerruntime.NewFactory(containerRuntimeCfg, registryAddr) if err != nil { - log.Fatalf("Failed to create container runtime factory: %s", err) + return fmt.Errorf("create container runtime factory: %w", err) + } + + tls, err := a.config.TLS.BuildClient() + if err != nil { + return fmt.Errorf("build client TLS config: %w", err) + } + + announceClient := announceclient.New(a.peerContext, nil, tls) + a.agentServer = agentserver.New( + a.config.AgentServer, a.stats, a.cads, a.scheduler, a.tagClient, announceClient, containerRuntimeFactory) + + return nil +} + +// Initialize sets up all the agent components. +func (a *App) Initialize() error { + setupSteps := []struct { + name string + fn func() error + }{ + {"peer context", a.setupPeerContext}, + {"storage", a.setupStorage}, + {"scheduler", a.setupScheduler}, + {"tag client", a.setupTagClient}, + {"registry", a.setupRegistry}, + {"agent server", a.setupAgentServer}, + } + + for _, step := range setupSteps { + if err := step.fn(); err != nil { + return fmt.Errorf("setup %s: %w", step.name, err) + } + } + + return nil +} + +// Run starts the agent application. +func (a *App) Run(ctx context.Context) error { + agentAddr := fmt.Sprintf(":%d", a.flags.AgentServerPort) + log.Infof("Starting agent server on %s", agentAddr) + + agentSrv := &http.Server{ + Addr: agentAddr, + Handler: a.agentServer.Handler(), } - agentServer := agentserver.New( - config.AgentServer, stats, cads, sched, tagClient, announceClient, containerRuntimeFactory) - addr := fmt.Sprintf(":%d", flags.AgentServerPort) - log.Infof("Starting agent server on %s", addr) go func() { - log.Fatal(http.ListenAndServe(addr, agentServer.Handler())) + if err := agentSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Errorf("Agent server error: %v", err) + } }() log.Info("Starting registry...") go func() { - log.Fatal(registry.ListenAndServe()) + if err := a.registry.ListenAndServe(); err != nil { + log.Errorf("Registry error: %v", err) + } + }() + + // Start heartbeat + go a.heartbeat() + + // Start nginx + nginxDone := make(chan error, 1) + go func() { + err := nginx.Run(a.config.Nginx, map[string]interface{}{ + "allowed_cidrs": a.config.AllowedCidrs, + "port": a.flags.AgentRegistryPort, + "registry_server": nginx.GetServer( + a.config.Registry.Docker.HTTP.Net, a.config.Registry.Docker.HTTP.Addr), + "agent_server": fmt.Sprintf("127.0.0.1:%d", a.flags.AgentServerPort), + "registry_backup": a.config.RegistryBackup}, + nginx.WithTLS(a.config.TLS)) + nginxDone <- err }() - go heartbeat(stats) + // Wait for context cancellation or nginx error + select { + case <-ctx.Done(): + log.Info("Shutting down agent...") + return a.shutdown(agentSrv) + case err := <-nginxDone: + return fmt.Errorf("nginx error: %w", err) + } +} + +func (a *App) shutdown(agentSrv *http.Server) error { + // Shutdown agent server gracefully + shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := agentSrv.Shutdown(shutdownCtx); err != nil { + log.Errorf("Agent server shutdown error: %v", err) + } + + // Run cleanup functions + for i := len(a.cleanup) - 1; i >= 0; i-- { + a.cleanup[i]() + } - log.Fatal(nginx.Run(config.Nginx, map[string]interface{}{ - "allowed_cidrs": config.AllowedCidrs, - "port": flags.AgentRegistryPort, - "registry_server": nginx.GetServer( - config.Registry.Docker.HTTP.Net, config.Registry.Docker.HTTP.Addr), - "agent_server": fmt.Sprintf("127.0.0.1:%d", flags.AgentServerPort), - "registry_backup": config.RegistryBackup}, - nginx.WithTLS(config.TLS))) + return nil } -// heartbeat periodically emits a counter metric which allows us to monitor the -// number of active agents. -func heartbeat(stats tally.Scope) { +func (a *App) heartbeat() { for { - stats.Counter("heartbeat").Inc(1) + a.stats.Counter("heartbeat").Inc(1) time.Sleep(10 * time.Second) } } + +// Run runs the agent (legacy function for backward compatibility). +func Run(flags *Flags, opts ...Option) { + app, err := NewApp(flags, opts...) + if err != nil { + log.Fatalf("Failed to create app: %v", err) + } + + if err := app.Initialize(); err != nil { + log.Fatalf("Failed to initialize app: %v", err) + } + + ctx := context.Background() + if err := app.Run(ctx); err != nil { + log.Fatalf("App run error: %v", err) + } +} diff --git a/docker/agent/Dockerfile b/docker/agent/Dockerfile index e17122cbb..b75b39165 100644 --- a/docker/agent/Dockerfile +++ b/docker/agent/Dockerfile @@ -1,6 +1,30 @@ -FROM debian:10 +FROM debian:12 -RUN apt-get update && apt-get install -y curl nginx +# Fix repository configuration for Debian 12 (bookworm) with multiple mirrors and better retry logic +RUN rm -f /etc/apt/sources.list.d/debian.sources && \ + echo "deb http://deb.debian.org/debian bookworm main" > /etc/apt/sources.list && \ + echo "deb http://security.debian.org/debian-security bookworm-security main" >> /etc/apt/sources.list && \ + echo "deb http://deb.debian.org/debian bookworm-updates main" >> /etc/apt/sources.list && \ + echo 'Acquire::Check-Valid-Until "false";' > /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::AllowInsecureRepositories "true";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::Retries "10";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::http::Timeout "60";' >> /etc/apt/apt.conf.d/99no-check-valid-until + +RUN apt-get clean \ + && rm -rf /var/lib/apt/lists/* \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get update --allow-releaseinfo-change -o Acquire::Retries=10 -o Acquire::http::No-Cache=true -o Acquire::http::timeout=60 -o Acquire::ForceIPv4=true \ + ) \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get install -y --no-install-recommends --fix-missing --allow-unauthenticated \ + curl \ + nginx \ + ) \ + && rm -rf /var/lib/apt/lists/* RUN mkdir -p -m 777 /var/log/kraken/kraken-agent RUN mkdir -p -m 777 /var/cache/kraken/kraken-agent diff --git a/docker/build-index/Dockerfile b/docker/build-index/Dockerfile index 0d734a3a5..b26629a50 100644 --- a/docker/build-index/Dockerfile +++ b/docker/build-index/Dockerfile @@ -1,6 +1,30 @@ -FROM debian:10 +FROM debian:12 -RUN apt-get update && apt-get install -y curl nginx +# Fix repository configuration for Debian 12 (bookworm) with multiple mirrors and better retry logic +RUN rm -f /etc/apt/sources.list.d/debian.sources && \ + echo "deb http://deb.debian.org/debian bookworm main" > /etc/apt/sources.list && \ + echo "deb http://security.debian.org/debian-security bookworm-security main" >> /etc/apt/sources.list && \ + echo "deb http://deb.debian.org/debian bookworm-updates main" >> /etc/apt/sources.list && \ + echo 'Acquire::Check-Valid-Until "false";' > /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::AllowInsecureRepositories "true";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::Retries "10";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::http::Timeout "60";' >> /etc/apt/apt.conf.d/99no-check-valid-until + +RUN apt-get clean \ + && rm -rf /var/lib/apt/lists/* \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get update --allow-releaseinfo-change -o Acquire::Retries=10 -o Acquire::http::No-Cache=true -o Acquire::http::timeout=60 -o Acquire::ForceIPv4=true \ + ) \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get install -y --no-install-recommends --fix-missing --allow-unauthenticated \ + curl \ + nginx \ + ) \ + && rm -rf /var/lib/apt/lists/* RUN mkdir -p -m 777 /var/log/kraken/kraken-build-index RUN mkdir -p -m 777 /var/cache/kraken/kraken-build-index diff --git a/docker/herd/Dockerfile b/docker/herd/Dockerfile index 3ef96044f..54f9c7ab2 100644 --- a/docker/herd/Dockerfile +++ b/docker/herd/Dockerfile @@ -1,8 +1,36 @@ # This image combines all central components into one container, for easier # deployment and management. -FROM debian:10 +FROM debian:12 -RUN apt-get update && apt-get install -y build-essential curl sqlite3 nginx sudo procps +# Fix repository configuration for Debian 12 (bookworm) with multiple mirrors and better retry logic +RUN rm -f /etc/apt/sources.list.d/debian.sources && \ + echo "deb http://deb.debian.org/debian bookworm main" > /etc/apt/sources.list && \ + echo "deb http://security.debian.org/debian-security bookworm-security main" >> /etc/apt/sources.list && \ + echo "deb http://deb.debian.org/debian bookworm-updates main" >> /etc/apt/sources.list && \ + echo 'Acquire::Check-Valid-Until "false";' > /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::AllowInsecureRepositories "true";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::Retries "10";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::http::Timeout "60";' >> /etc/apt/apt.conf.d/99no-check-valid-until + +RUN apt-get clean \ + && rm -rf /var/lib/apt/lists/* \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get update --allow-releaseinfo-change -o Acquire::Retries=10 -o Acquire::http::No-Cache=true -o Acquire::http::timeout=60 -o Acquire::ForceIPv4=true \ + ) \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get install -y --no-install-recommends --fix-missing --allow-unauthenticated \ + build-essential \ + curl \ + sqlite3 \ + nginx \ + sudo \ + procps \ + ) \ + && rm -rf /var/lib/apt/lists/* # Install redis. ADD http://download.redis.io/redis-stable.tar.gz /tmp/redis-stable.tar.gz diff --git a/docker/origin/Dockerfile b/docker/origin/Dockerfile index db6af3186..b3be561bf 100644 --- a/docker/origin/Dockerfile +++ b/docker/origin/Dockerfile @@ -1,16 +1,26 @@ -FROM debian:10 +FROM debian:12 + +# Fix repository configuration for Debian 12 (bookworm) with multiple mirrors and better retry logic +RUN rm -f /etc/apt/sources.list.d/debian.sources && \ + echo "deb http://deb.debian.org/debian bookworm main" > /etc/apt/sources.list && \ + echo "deb http://security.debian.org/debian-security bookworm-security main" >> /etc/apt/sources.list && \ + echo "deb http://deb.debian.org/debian bookworm-updates main" >> /etc/apt/sources.list && \ + echo 'Acquire::Check-Valid-Until "false";' > /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::AllowInsecureRepositories "true";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::Retries "10";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::http::Timeout "60";' >> /etc/apt/apt.conf.d/99no-check-valid-until RUN apt-get clean \ && rm -rf /var/lib/apt/lists/* \ && \ ( \ unset http_proxy https_proxy no_proxy && \ - apt-get update -o Acquire::Retries=3 -o Acquire::http::No-Cache=true \ + apt-get update --allow-releaseinfo-change -o Acquire::Retries=10 -o Acquire::http::No-Cache=true -o Acquire::http::timeout=60 -o Acquire::ForceIPv4=true \ ) \ && \ ( \ unset http_proxy https_proxy no_proxy && \ - apt-get install -y --no-install-recommends \ + apt-get install -y --no-install-recommends --fix-missing --allow-unauthenticated \ curl \ sqlite3 \ nginx \ diff --git a/docker/proxy/Dockerfile b/docker/proxy/Dockerfile index 4f0f07afd..9be7f75e0 100644 --- a/docker/proxy/Dockerfile +++ b/docker/proxy/Dockerfile @@ -1,6 +1,30 @@ -FROM debian:10 +FROM debian:12 -RUN apt-get update && apt-get install -y curl nginx +# Fix repository configuration for Debian 12 (bookworm) with multiple mirrors and better retry logic +RUN rm -f /etc/apt/sources.list.d/debian.sources && \ + echo "deb http://deb.debian.org/debian bookworm main" > /etc/apt/sources.list && \ + echo "deb http://security.debian.org/debian-security bookworm-security main" >> /etc/apt/sources.list && \ + echo "deb http://deb.debian.org/debian bookworm-updates main" >> /etc/apt/sources.list && \ + echo 'Acquire::Check-Valid-Until "false";' > /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::AllowInsecureRepositories "true";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::Retries "10";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::http::Timeout "60";' >> /etc/apt/apt.conf.d/99no-check-valid-until + +RUN apt-get clean \ + && rm -rf /var/lib/apt/lists/* \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get update --allow-releaseinfo-change -o Acquire::Retries=10 -o Acquire::http::No-Cache=true -o Acquire::http::timeout=60 -o Acquire::ForceIPv4=true \ + ) \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get install -y --no-install-recommends --fix-missing --allow-unauthenticated \ + curl \ + nginx \ + ) \ + && rm -rf /var/lib/apt/lists/* RUN mkdir -p -m 777 /var/log/kraken/kraken-proxy RUN mkdir -p -m 777 /var/cache/kraken/kraken-proxy diff --git a/docker/testfs/Dockerfile b/docker/testfs/Dockerfile index 53918f183..cf324df51 100644 --- a/docker/testfs/Dockerfile +++ b/docker/testfs/Dockerfile @@ -1,6 +1,29 @@ -FROM debian:10 +FROM debian:12 -RUN apt-get update && apt-get install -y curl +# Fix repository configuration for Debian 12 (bookworm) with CDN mirror for better stability +RUN rm -f /etc/apt/sources.list.d/debian.sources && \ + echo "deb http://cdn-fastly.deb.debian.org/debian bookworm main" > /etc/apt/sources.list && \ + echo "deb http://security.debian.org/debian-security bookworm-security main" >> /etc/apt/sources.list && \ + echo "deb http://cdn-fastly.deb.debian.org/debian bookworm-updates main" >> /etc/apt/sources.list && \ + echo 'Acquire::Check-Valid-Until "false";' > /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::AllowInsecureRepositories "true";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::Retries "15";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::http::Timeout "120";' >> /etc/apt/apt.conf.d/99no-check-valid-until + +RUN apt-get clean \ + && rm -rf /var/lib/apt/lists/* \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get update --allow-releaseinfo-change -o Acquire::Retries=15 -o Acquire::http::No-Cache=true -o Acquire::http::timeout=120 -o Acquire::ForceIPv4=true \ + ) \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get install -y --no-install-recommends --fix-missing --allow-unauthenticated \ + curl \ + ) \ + && rm -rf /var/lib/apt/lists/* RUN mkdir -p -m 777 /var/log/kraken/kraken-testfs RUN mkdir -p -m 777 /var/cache/kraken/kraken-testfs diff --git a/docker/tracker/Dockerfile b/docker/tracker/Dockerfile index a49402700..56faddf79 100644 --- a/docker/tracker/Dockerfile +++ b/docker/tracker/Dockerfile @@ -1,6 +1,30 @@ -FROM debian:10 +FROM debian:12 -RUN apt-get update && apt-get install -y curl nginx +# Fix repository configuration for Debian 12 (bookworm) with multiple mirrors and better retry logic +RUN rm -f /etc/apt/sources.list.d/debian.sources && \ + echo "deb http://deb.debian.org/debian bookworm main" > /etc/apt/sources.list && \ + echo "deb http://security.debian.org/debian-security bookworm-security main" >> /etc/apt/sources.list && \ + echo "deb http://deb.debian.org/debian bookworm-updates main" >> /etc/apt/sources.list && \ + echo 'Acquire::Check-Valid-Until "false";' > /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::AllowInsecureRepositories "true";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::Retries "10";' >> /etc/apt/apt.conf.d/99no-check-valid-until && \ + echo 'Acquire::http::Timeout "60";' >> /etc/apt/apt.conf.d/99no-check-valid-until + +RUN apt-get clean \ + && rm -rf /var/lib/apt/lists/* \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get update --allow-releaseinfo-change -o Acquire::Retries=10 -o Acquire::http::No-Cache=true -o Acquire::http::timeout=60 -o Acquire::ForceIPv4=true \ + ) \ + && \ + ( \ + unset http_proxy https_proxy no_proxy && \ + apt-get install -y --no-install-recommends --fix-missing --allow-unauthenticated \ + curl \ + nginx \ + ) \ + && rm -rf /var/lib/apt/lists/* RUN mkdir -p -m 777 /var/log/kraken/kraken-tracker RUN mkdir -p -m 777 /var/cache/kraken/kraken-tracker