diff --git a/origin/blobserver/config.go b/origin/blobserver/config.go index d6719311f..76c5539f2 100644 --- a/origin/blobserver/config.go +++ b/origin/blobserver/config.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -23,11 +23,57 @@ import ( type Config struct { Listener listener.Config `yaml:"listener"` DuplicateWriteBackStagger time.Duration `yaml:"duplicate_write_back_stagger"` + + // Timeout configurations + DownloadTimeout time.Duration `yaml:"download_timeout"` + UploadTimeout time.Duration `yaml:"upload_timeout"` + ReplicationTimeout time.Duration `yaml:"replication_timeout"` + BackendTimeout time.Duration `yaml:"backend_timeout"` + ReadinessTimeout time.Duration `yaml:"readiness_timeout"` + + // Limit configurations + MaxConcurrentDownloads int `yaml:"max_concurrent_downloads"` + MaxConcurrentUploads int `yaml:"max_concurrent_uploads"` + + // Retry configurations + MaxRetries int `yaml:"max_retries"` + RetryDelay time.Duration `yaml:"retry_delay"` + RetryMaxDelay time.Duration `yaml:"retry_max_delay"` } func (c Config) applyDefaults() Config { if c.DuplicateWriteBackStagger == 0 { c.DuplicateWriteBackStagger = 30 * time.Minute } + if c.DownloadTimeout == 0 { + c.DownloadTimeout = 5 * time.Minute + } + if c.UploadTimeout == 0 { + c.UploadTimeout = 10 * time.Minute + } + if c.ReplicationTimeout == 0 { + c.ReplicationTimeout = 5 * time.Minute + } + if c.BackendTimeout == 0 { + c.BackendTimeout = 2 * time.Minute + } + if c.ReadinessTimeout == 0 { + c.ReadinessTimeout = 30 * time.Second + } + if c.MaxConcurrentDownloads == 0 { + c.MaxConcurrentDownloads = 20 + } + if c.MaxConcurrentUploads == 0 { + c.MaxConcurrentUploads = 10 + } + if c.MaxRetries == 0 { + c.MaxRetries = 3 + } + if c.RetryDelay == 0 { + c.RetryDelay = 100 * time.Millisecond + } + if c.RetryMaxDelay == 0 { + c.RetryMaxDelay = 5 * time.Second + } return c } diff --git a/origin/blobserver/server.go b/origin/blobserver/server.go index 3abd83d74..e87571ccb 100644 --- a/origin/blobserver/server.go +++ b/origin/blobserver/server.go @@ -14,9 +14,11 @@ package blobserver import ( + "context" "encoding/json" "fmt" "io" + "math/rand" "net/http" _ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux. "os" @@ -42,7 +44,6 @@ import ( "github.com/uber/kraken/utils/httputil" "github.com/uber/kraken/utils/listener" "github.com/uber/kraken/utils/log" - "github.com/uber/kraken/utils/memsize" "github.com/uber/kraken/utils/stringset" "github.com/andres-erbsen/clock" @@ -50,8 +51,6 @@ import ( "github.com/uber-go/tally" ) -const _uploadChunkSize = 16 * memsize.MB - // Server defines a server that serves blob data for agent. type Server struct { config Config @@ -73,6 +72,21 @@ type Server struct { // a given torrent, however this requires blob server to understand the // context of the p2p client running alongside it. pctx core.PeerContext + + // Resource management + downloadSemaphore chan struct{} + uploadSemaphore chan struct{} + + // Metrics + downloadTimer tally.Timer + uploadTimer tally.Timer + replicationTimer tally.Timer + downloadCounter tally.Counter + uploadCounter tally.Counter + replicationCounter tally.Counter + errorCounter tally.Counter + timeoutCounter tally.Counter + resourceLeakCounter tally.Counter } // New initializes a new Server. @@ -98,20 +112,31 @@ func New( }) return &Server{ - config: config, - stats: stats, - clk: clk, - addr: addr, - hashRing: hashRing, - cas: cas, - clientProvider: clientProvider, - clusterProvider: clusterProvider, - backends: backends, - blobRefresher: blobRefresher, - metaInfoGenerator: metaInfoGenerator, - uploader: newUploader(cas), - writeBackManager: writeBackManager, - pctx: pctx, + config: config, + stats: stats, + clk: clk, + addr: addr, + hashRing: hashRing, + cas: cas, + clientProvider: clientProvider, + clusterProvider: clusterProvider, + backends: backends, + blobRefresher: blobRefresher, + metaInfoGenerator: metaInfoGenerator, + uploader: newUploader(cas), + writeBackManager: writeBackManager, + pctx: pctx, + downloadSemaphore: make(chan struct{}, config.MaxConcurrentDownloads), + uploadSemaphore: make(chan struct{}, config.MaxConcurrentUploads), + downloadTimer: stats.Timer("download_duration"), + uploadTimer: stats.Timer("upload_duration"), + replicationTimer: stats.Timer("replication_duration"), + downloadCounter: stats.Counter("downloads"), + uploadCounter: stats.Counter("uploads"), + replicationCounter: stats.Counter("replications"), + errorCounter: stats.Counter("errors"), + timeoutCounter: stats.Counter("timeouts"), + resourceLeakCounter: stats.Counter("resource_leaks"), }, nil } @@ -126,6 +151,7 @@ func (s *Server) Handler() http.Handler { r.Use(middleware.StatusCounter(s.stats)) r.Use(middleware.LatencyTimer(s.stats)) + r.Use(s.requestTracingMiddleware) // Public endpoints: @@ -169,6 +195,40 @@ func (s *Server) Handler() http.Handler { return r } +// requestTracingMiddleware adds structured logging with request tracing +func (s *Server) requestTracingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestID := fmt.Sprintf("%d", rand.Int63()) + start := time.Now() + + // Add request ID to context for downstream handlers + ctx := context.WithValue(r.Context(), "request_id", requestID) + r = r.WithContext(ctx) + + // Add request ID to response headers for debugging + w.Header().Set("X-Request-ID", requestID) + + log.With( + "request_id", requestID, + "method", r.Method, + "path", r.URL.Path, + "remote_addr", r.RemoteAddr, + ).Info("Request started") + + defer func() { + duration := time.Since(start) + log.With( + "request_id", requestID, + "method", r.Method, + "path", r.URL.Path, + "duration_ms", duration.Milliseconds(), + ).Info("Request completed") + }() + + next.ServeHTTP(w, r) + }) +} + // ListenAndServe is a blocking call which runs s. func (s *Server) ListenAndServe(h http.Handler) error { log.Infof("Starting blob server on %s", s.config.Listener) @@ -181,56 +241,113 @@ func (s *Server) healthCheckHandler(w http.ResponseWriter, r *http.Request) erro } func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) error { - err := s.backends.CheckReadiness() - if err != nil { - return handler.Errorf("not ready to serve traffic: %s", err).Status(http.StatusServiceUnavailable) + ctx, cancel := context.WithTimeout(r.Context(), s.config.ReadinessTimeout) + defer cancel() + + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "readiness_check") + + done := make(chan error, 1) + go func() { + done <- s.backends.CheckReadiness() + }() + + select { + case err := <-done: + if err != nil { + logger.Errorf("Readiness check failed: %s", err) + return handler.Errorf("not ready to serve traffic: %s", err).Status(http.StatusServiceUnavailable) + } + logger.Info("Readiness check passed") + fmt.Fprintln(w, "OK") + return nil + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + logger.Error("Readiness check timed out") + return handler.Errorf("readiness check timed out").Status(http.StatusServiceUnavailable) } - fmt.Fprintln(w, "OK") - return nil } // statHandler returns blob info if it exists. func (s *Server) statHandler(w http.ResponseWriter, r *http.Request) error { + ctx, cancel := context.WithTimeout(r.Context(), s.config.BackendTimeout) + defer cancel() + + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "stat") + checkLocal, err := strconv.ParseBool(httputil.GetQueryArg(r, "local", "false")) if err != nil { + logger.Errorf("Failed to parse local parameter: %s", err) return handler.Errorf("parse arg `local` as bool: %s", err) } + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { + logger.Errorf("Failed to parse namespace parameter: %s", err) return err } + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } - bi, err := s.stat(namespace, d, checkLocal) + logger = logger.With("namespace", namespace, "digest", d.Hex(), "local", checkLocal) + logger.Info("Starting blob stat") + + bi, err := s.stat(ctx, namespace, d, checkLocal) if os.IsNotExist(err) { + logger.Info("Blob not found") return handler.ErrorStatus(http.StatusNotFound) } else if err != nil { + logger.Errorf("Blob stat failed: %s", err) return fmt.Errorf("stat: %s", err) } + w.Header().Set("Content-Length", strconv.FormatInt(bi.Size, 10)) - log.Debugf("successfully check blob %s exists", d.Hex()) + logger.With("size", bi.Size).Info("Blob stat completed successfully") return nil } -func (s *Server) stat(namespace string, d core.Digest, checkLocal bool) (*core.BlobInfo, error) { +func (s *Server) stat(ctx context.Context, namespace string, d core.Digest, checkLocal bool) (*core.BlobInfo, error) { fi, err := s.cas.GetCacheFileStat(d.Hex()) if err == nil { return core.NewBlobInfo(fi.Size()), nil - } else if os.IsNotExist(err) { + } + if os.IsNotExist(err) { if !checkLocal { client, err := s.backends.GetClient(namespace) if err != nil { return nil, fmt.Errorf("get backend client: %s", err) } - if bi, err := client.Stat(namespace, d.Hex()); err == nil { - return bi, nil - } else if err == backenderrors.ErrBlobNotFound { - return nil, os.ErrNotExist - } else { - return nil, fmt.Errorf("backend stat: %s", err) + + done := make(chan struct { + bi *core.BlobInfo + err error + }, 1) + + go func() { + bi, err := client.Stat(namespace, d.Hex()) + done <- struct { + bi *core.BlobInfo + err error + }{bi, err} + }() + + select { + case result := <-done: + if result.err == nil { + return result.bi, nil + } else if result.err == backenderrors.ErrBlobNotFound { + return nil, os.ErrNotExist + } else { + return nil, fmt.Errorf("backend stat: %s", result.err) + } + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + return nil, fmt.Errorf("backend stat timed out: %s", ctx.Err()) } } return nil, err // os.ErrNotExist @@ -240,39 +357,94 @@ func (s *Server) stat(namespace string, d core.Digest, checkLocal bool) (*core.B } func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) error { + ctx, cancel := context.WithTimeout(r.Context(), s.config.DownloadTimeout) + defer cancel() + + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "download") + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { + logger.Errorf("Failed to parse namespace parameter: %s", err) return err } + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } - if err := s.downloadBlob(namespace, d, w); err != nil { - log.With("namespace", namespace).Errorf("Error downloading blob: %s", err) + + logger = logger.With("namespace", namespace, "digest", d.Hex()) + logger.Info("Starting blob download") + + // Acquire download semaphore + select { + case s.downloadSemaphore <- struct{}{}: + defer func() { <-s.downloadSemaphore }() + case <-ctx.Done(): + logger.Error("Download semaphore acquisition timed out") + return handler.Errorf("download queue full").Status(http.StatusServiceUnavailable) + } + + s.downloadCounter.Inc(1) + timer := s.downloadTimer.Start() + defer timer.Stop() + + if err := s.downloadBlob(ctx, namespace, d, w); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Download failed: %s", err) return err } + setOctetStreamContentType(w) + logger.Info("Download completed successfully") return nil } func (s *Server) replicateToRemoteHandler(w http.ResponseWriter, r *http.Request) error { + ctx, cancel := context.WithTimeout(r.Context(), s.config.ReplicationTimeout) + defer cancel() + + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "replicate_to_remote") + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { + logger.Errorf("Failed to parse namespace parameter: %s", err) return err } + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + remote, err := httputil.ParseParam(r, "remote") if err != nil { + logger.Errorf("Failed to parse remote parameter: %s", err) + return err + } + + logger = logger.With("namespace", namespace, "digest", d.Hex(), "remote", remote) + logger.Info("Starting remote replication") + + s.replicationCounter.Inc(1) + timer := s.replicationTimer.Start() + defer timer.Stop() + + if err := s.replicateToRemote(ctx, namespace, d, remote); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Remote replication failed: %s", err) return err } - return s.replicateToRemote(namespace, d, remote) + + logger.Info("Remote replication completed successfully") + return nil } -func (s *Server) replicateToRemote(namespace string, d core.Digest, remoteDNS string) error { +func (s *Server) replicateToRemote(ctx context.Context, namespace string, d core.Digest, remoteDNS string) error { f, err := s.cas.GetCacheFileReader(d.Hex()) if err != nil { if os.IsNotExist(err) { @@ -280,76 +452,157 @@ func (s *Server) replicateToRemote(namespace string, d core.Digest, remoteDNS st } return handler.Errorf("file store: %s", err) } - defer f.Close() + defer func() { + if closeErr := f.Close(); closeErr != nil { + s.resourceLeakCounter.Inc(1) + log.Errorf("Failed to close file reader: %s", closeErr) + } + }() remote, err := s.clusterProvider.Provide(remoteDNS) if err != nil { return handler.Errorf("remote cluster provider: %s", err) } - return remote.UploadBlob(namespace, d, f) + + done := make(chan error, 1) + go func() { + done <- remote.UploadBlob(namespace, d, f) + }() + + select { + case err := <-done: + return err + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + return handler.Errorf("remote replication timed out: %s", ctx.Err()) + } } // deleteBlobHandler deletes blob data. func (s *Server) deleteBlobHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "delete") + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + + logger = logger.With("digest", d.Hex()) + logger.Info("Starting blob deletion") + if err := s.deleteBlob(d); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Blob deletion failed: %s", err) return err } + setContentLength(w, 0) w.WriteHeader(http.StatusAccepted) - log.Debugf("successfully delete blob %s", d.Hex()) + logger.Info("Blob deletion completed successfully") return nil } func (s *Server) getLocationsHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "get_locations") + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + + logger = logger.With("digest", d.Hex()) + logger.Info("Getting blob locations") + locs := s.hashRing.Locations(d) w.Header().Set("Origin-Locations", strings.Join(locs, ",")) w.WriteHeader(http.StatusOK) + + logger.With("locations", locs).Info("Blob locations retrieved successfully") return nil } // getPeerContextHandler returns the Server's peer context as JSON. func (s *Server) getPeerContextHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "get_peer_context") + + logger.Info("Getting peer context") + if err := json.NewEncoder(w).Encode(s.pctx); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to encode peer context: %s", err) return handler.Errorf("error converting peer context to json: %s", err) } + + logger.Info("Peer context retrieved successfully") return nil } func (s *Server) getMetaInfoHandler(w http.ResponseWriter, r *http.Request) error { + ctx, cancel := context.WithTimeout(r.Context(), s.config.BackendTimeout) + defer cancel() + + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "get_metainfo") + namespace, err := httputil.ParseParam(r, "namespace") if err != nil { + logger.Errorf("Failed to parse namespace parameter: %s", err) return err } + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } - raw, err := s.getMetaInfo(namespace, d) + + logger = logger.With("namespace", namespace, "digest", d.Hex()) + logger.Info("Getting metainfo") + + raw, err := s.getMetaInfo(ctx, namespace, d) if err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to get metainfo: %s", err) return err } + w.Write(raw) + logger.Info("Metainfo retrieved successfully") return nil } func (s *Server) overwriteMetaInfoHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "overwrite_metainfo") + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + pieceLength, err := strconv.ParseInt(r.URL.Query().Get("piece_length"), 10, 64) if err != nil { + logger.Errorf("Failed to parse piece_length parameter: %s", err) return handler.Errorf("invalid piece_length argument: %s", err).Status(http.StatusBadRequest) } - return s.overwriteMetaInfo(d, pieceLength) + + logger = logger.With("digest", d.Hex(), "piece_length", pieceLength) + logger.Info("Overwriting metainfo") + + if err := s.overwriteMetaInfo(d, pieceLength); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to overwrite metainfo: %s", err) + return err + } + + logger.Info("Metainfo overwritten successfully") + return nil } // overwriteMetaInfo generates metainfo configured with pieceLength for d and @@ -360,13 +613,22 @@ func (s *Server) overwriteMetaInfo(d core.Digest, pieceLength int64) error { if err != nil { return handler.Errorf("get cache file: %s", err) } + defer func() { + if closeErr := f.Close(); closeErr != nil { + s.resourceLeakCounter.Inc(1) + log.Errorf("Failed to close file reader in overwriteMetaInfo: %s", closeErr) + } + }() + mi, err := core.NewMetaInfo(d, f, pieceLength) if err != nil { return handler.Errorf("create metainfo: %s", err) } + if _, err := s.cas.SetCacheFileMetadata(d.Hex(), metadata.NewTorrentMeta(mi)); err != nil { return handler.Errorf("set metainfo: %s", err) } + return nil } @@ -374,7 +636,7 @@ func (s *Server) overwriteMetaInfo(d core.Digest, pieceLength int64) error { // the blob from the storage backend configured for namespace will be initiated. // This download is asynchronous and getMetaInfo will immediately return a // "202 Accepted" server error. -func (s *Server) getMetaInfo(namespace string, d core.Digest) ([]byte, error) { +func (s *Server) getMetaInfo(ctx context.Context, namespace string, d core.Digest) ([]byte, error) { var tm metadata.TorrentMeta if err := s.cas.GetCacheFileMetadata(d.Hex(), &tm); os.IsNotExist(err) { return nil, s.startRemoteBlobDownload(namespace, d, true) @@ -390,13 +652,14 @@ type localReplicationHook struct { func (h *localReplicationHook) Run(d core.Digest) { timer := h.server.stats.Timer("replicate_blob").Start() + defer timer.Stop() + if err := h.server.replicateBlobLocally(d); err != nil { // Don't return error here as we only want to cache storage backend errors. log.With("blob", d.Hex()).Errorf("Error replicating remote blob: %s", err) h.server.stats.Counter("replicate_blob_errors").Inc(1) return } - timer.Stop() } func (s *Server) startRemoteBlobDownload( @@ -420,21 +683,42 @@ func (s *Server) startRemoteBlobDownload( } func (s *Server) replicateBlobLocally(d core.Digest) error { - return s.applyToReplicas(d, func(i int, client blobclient.Client) error { + ctx, cancel := context.WithTimeout(context.Background(), s.config.ReplicationTimeout) + defer cancel() + + return s.applyToReplicas(ctx, d, func(i int, client blobclient.Client) error { f, err := s.cas.GetCacheFileReader(d.Hex()) if err != nil { return fmt.Errorf("get cache reader: %s", err) } - if err := client.TransferBlob(d, f); err != nil { - return fmt.Errorf("transfer blob: %s", err) + defer func() { + if closeErr := f.Close(); closeErr != nil { + s.resourceLeakCounter.Inc(1) + log.Errorf("Failed to close file reader in replicateBlobLocally: %s", closeErr) + } + }() + + done := make(chan error, 1) + go func() { + done <- client.TransferBlob(d, f) + }() + + select { + case err := <-done: + if err != nil { + return fmt.Errorf("transfer blob: %s", err) + } + return nil + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + return fmt.Errorf("transfer blob timed out: %s", ctx.Err()) } - return nil }) } // applyToReplicas applies f to the replicas of d concurrently in random order, // not including the current origin. Passes the index of the iteration to f. -func (s *Server) applyToReplicas(d core.Digest, f func(i int, c blobclient.Client) error) error { +func (s *Server) applyToReplicas(ctx context.Context, d core.Digest, f func(i int, c blobclient.Client) error) error { replicas := stringset.FromSlice(s.hashRing.Locations(d)) replicas.Remove(s.addr) @@ -455,28 +739,57 @@ func (s *Server) applyToReplicas(d core.Digest, f func(i int, c blobclient.Clien }(i, replica) i++ } - wg.Wait() - return errutil.Join(errs) + // Wait for all goroutines to complete or context to be cancelled + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + return errutil.Join(errs) + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + return fmt.Errorf("replicas operation timed out: %s", ctx.Err()) + } } // downloadBlob downloads blob for d into dst. If no blob exists under d, a // download of the blob from the storage backend configured for namespace will // be initiated. This download is asynchronous and downloadBlob will immediately // return a "202 Accepted" handler error. -func (s *Server) downloadBlob(namespace string, d core.Digest, dst io.Writer) error { +func (s *Server) downloadBlob(ctx context.Context, namespace string, d core.Digest, dst io.Writer) error { f, err := s.cas.GetCacheFileReader(d.Hex()) if os.IsNotExist(err) { return s.startRemoteBlobDownload(namespace, d, true) } else if err != nil { return handler.Errorf("get cache file: %s", err) } - defer f.Close() + defer func() { + if closeErr := f.Close(); closeErr != nil { + s.resourceLeakCounter.Inc(1) + log.Errorf("Failed to close file reader in downloadBlob: %s", closeErr) + } + }() + + done := make(chan error, 1) + go func() { + _, err := io.Copy(dst, f) + done <- err + }() - if _, err := io.Copy(dst, f); err != nil { - return handler.Errorf("copy blob: %s", err) + select { + case err := <-done: + if err != nil { + return handler.Errorf("copy blob: %s", err) + } + return nil + case <-ctx.Done(): + s.timeoutCounter.Inc(1) + return handler.Errorf("download blob timed out: %s", ctx.Err()) } - return nil } func (s *Server) deleteBlob(d core.Digest) error { @@ -491,39 +804,74 @@ func (s *Server) deleteBlob(d core.Digest) error { // startTransferHandler initializes an upload for internal blob transfers. func (s *Server) startTransferHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "start_transfer") + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + + logger = logger.With("digest", d.Hex()) + logger.Info("Starting internal transfer") + if ok, err := blobExists(s.cas, d); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to check blob existence: %s", err) return handler.Errorf("check blob: %s", err) } else if ok { + logger.Info("Blob already exists") return handler.ErrorStatus(http.StatusConflict) } + uid, err := s.uploader.start(d) if err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to start upload: %s", err) return err } + setUploadLocation(w, uid) w.WriteHeader(http.StatusOK) + logger.With("upload_id", uid).Info("Internal transfer started successfully") return nil } // patchTransferHandler uploads a chunk of a blob for internal uploads. func (s *Server) patchTransferHandler(w http.ResponseWriter, r *http.Request) error { + requestID := s.getRequestID(r) + logger := log.With("request_id", requestID, "operation", "patch_transfer") + d, err := httputil.ParseDigest(r, "digest") if err != nil { + logger.Errorf("Failed to parse digest parameter: %s", err) return err } + uid, err := httputil.ParseParam(r, "uid") if err != nil { + logger.Errorf("Failed to parse uid parameter: %s", err) return err } + start, end, err := parseContentRange(r.Header) if err != nil { + logger.Errorf("Failed to parse content range: %s", err) + return err + } + + logger = logger.With("digest", d.Hex(), "upload_id", uid, "start", start, "end", end) + logger.Info("Patching internal transfer") + + if err := s.uploader.patch(d, uid, r.Body, start, end); err != nil { + s.errorCounter.Inc(1) + logger.Errorf("Failed to patch transfer: %s", err) return err } - return s.uploader.patch(d, uid, r.Body, start, end) + + logger.Info("Internal transfer patched successfully") + return nil } // commitTransferHandler commits the upload of an internal blob transfer. @@ -625,7 +973,7 @@ func (s *Server) commitClusterUploadHandler(w http.ResponseWriter, r *http.Reque if err := s.writeBack(namespace, d, 0); err != nil { return err } - err = s.applyToReplicas(d, func(i int, client blobclient.Client) error { + err = s.applyToReplicas(r.Context(), d, func(i int, client blobclient.Client) error { delay := s.config.DuplicateWriteBackStagger * time.Duration(i+1) f, err := s.cas.GetCacheFileReader(d.Hex()) if err != nil { @@ -759,3 +1107,11 @@ func (s *Server) maybeDelete(name string, ttl time.Duration) (deleted bool, err } return false, nil } + +// getRequestID extracts the request ID from the request context +func (s *Server) getRequestID(r *http.Request) string { + if id, ok := r.Context().Value("request_id").(string); ok { + return id + } + return "unknown" +} diff --git a/origin/cmd/cmd.go b/origin/cmd/cmd.go index cb0c3f80e..436d26fba 100644 --- a/origin/cmd/cmd.go +++ b/origin/cmd/cmd.go @@ -14,6 +14,7 @@ package cmd import ( + "crypto/tls" "encoding/json" "flag" "fmt" @@ -44,6 +45,7 @@ import ( "github.com/andres-erbsen/clock" "github.com/go-chi/chi" + "github.com/jmoiron/sqlx" "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -110,18 +112,43 @@ func WithLogger(l *zap.Logger) Option { // Run runs the origin. func Run(flags *Flags, opts ...Option) { + validateFlags(flags) + + var overrides options + for _, o := range opts { + o(&overrides) + } + + config := setupConfiguration(flags, &overrides) + logger := setupLogging(config, &overrides) + defer func() { + if logger != nil { + logger.Sync() + } + }() + + stats, statsCloser := setupMetrics(config, flags, &overrides) + defer statsCloser() + + hostname := setupHostname(flags) + peerIP := setupPeerIP(flags) + + components := setupCoreComponents(config, flags, hostname, peerIP, stats) + server := setupBlobServer(config, flags, hostname, components, stats) + + startServices(config, flags, server, components.scheduler) +} + +func validateFlags(flags *Flags) { if flags.PeerPort == 0 { panic("must specify non-zero peer port") } if flags.BlobServerPort == 0 { panic("must specify non-zero blob server port") } +} - var overrides options - for _, o := range opts { - o(&overrides) - } - +func setupConfiguration(flags *Flags, overrides *options) Config { var config Config if overrides.config != nil { config = *overrides.config @@ -135,26 +162,34 @@ func Run(flags *Flags, opts ...Option) { } } } + return config +} +func setupLogging(config Config, overrides *options) *zap.Logger { if overrides.logger != nil { log.SetGlobalLogger(overrides.logger.Sugar()) + return overrides.logger } else { zlog := log.ConfigureLogger(config.ZapLogging) - defer zlog.Sync() + return zlog.Desugar() } +} - stats := overrides.metrics - if stats == nil { - s, closer, err := metrics.New(config.Metrics, flags.KrakenCluster) - if err != nil { - log.Fatalf("Failed to init metrics: %s", err) - } - stats = s - defer closer.Close() +func setupMetrics(config Config, flags *Flags, overrides *options) (tally.Scope, func()) { + if overrides.metrics != nil { + return overrides.metrics, func() {} } - go metrics.EmitVersion(stats) + s, closer, err := metrics.New(config.Metrics, flags.KrakenCluster) + if err != nil { + log.Fatalf("Failed to init metrics: %s", err) + } + go metrics.EmitVersion(s) + return s, func() { closer.Close() } +} + +func setupHostname(flags *Flags) string { var hostname string if flags.BlobServerHostName == "" { var err error @@ -166,36 +201,96 @@ func Run(flags *Flags, opts ...Option) { hostname = flags.BlobServerHostName } log.Infof("Configuring origin with hostname '%s'", hostname) + return hostname +} +func setupPeerIP(flags *Flags) string { if flags.PeerIP == "" { localIP, err := netutil.GetLocalIP() if err != nil { log.Fatalf("Error getting local ip: %s", err) } - flags.PeerIP = localIP + return localIP } + return flags.PeerIP +} +type coreComponents struct { + cas *store.CAStore + pctx core.PeerContext + backendManager *backend.Manager + writeBackManager persistedretry.Manager + metaInfoGen *metainfogen.Generator + blobRefresher *blobrefresh.Refresher + scheduler scheduler.ReloadableScheduler + hashRing hashring.Ring + tls *tls.Config +} + +func setupCoreComponents(config Config, flags *Flags, hostname, peerIP string, stats tally.Scope) *coreComponents { + cas := setupCAStore(config, stats) + pctx := setupPeerContext(config, flags, peerIP) + backendManager := setupBackendManager(config, stats) + + localDB := setupLocalDB(config) + writeBackManager := setupWriteBackManager(config, stats, cas, backendManager, localDB) + metaInfoGen := setupMetaInfoGenerator(config, cas) + blobRefresher := setupBlobRefresher(config, stats, cas, backendManager, metaInfoGen) + + netevents := setupNetworkEvents(config) + schedulerInstance := setupScheduler(config, stats, pctx, cas, netevents, blobRefresher) + + cluster := setupCluster(config) + tlsConfig := setupTLS(config) + hashRing := setupHashRing(config, flags, hostname, cluster, tlsConfig, backendManager) + + return &coreComponents{ + cas: cas, + pctx: pctx, + backendManager: backendManager, + writeBackManager: writeBackManager, + metaInfoGen: metaInfoGen, + blobRefresher: blobRefresher, + scheduler: schedulerInstance, + hashRing: hashRing, + tls: tlsConfig, + } +} + +func setupCAStore(config Config, stats tally.Scope) *store.CAStore { cas, err := store.NewCAStore(config.CAStore, stats) if err != nil { log.Fatalf("Failed to create castore: %s", err) } + return cas +} +func setupPeerContext(config Config, flags *Flags, peerIP string) core.PeerContext { pctx, err := core.NewPeerContext( - config.PeerIDFactory, flags.Zone, flags.KrakenCluster, flags.PeerIP, flags.PeerPort, true) + config.PeerIDFactory, flags.Zone, flags.KrakenCluster, peerIP, flags.PeerPort, true) if err != nil { log.Fatalf("Failed to create peer context: %s", err) } + return pctx +} +func setupBackendManager(config Config, stats tally.Scope) *backend.Manager { backendManager, err := backend.NewManager(config.BackendManager, config.Backends, config.Auth, stats) if err != nil { log.Fatalf("Error creating backend manager: %s", err) } + return backendManager +} +func setupLocalDB(config Config) *sqlx.DB { localDB, err := localdb.New(config.LocalDB) if err != nil { log.Fatalf("Error creating local db: %s", err) } + return localDB +} +func setupWriteBackManager(config Config, stats tally.Scope, cas *store.CAStore, backendManager *backend.Manager, localDB *sqlx.DB) persistedretry.Manager { writeBackManager, err := persistedretry.NewManager( config.WriteBack, stats, @@ -204,35 +299,55 @@ func Run(flags *Flags, opts ...Option) { if err != nil { log.Fatalf("Error creating write-back manager: %s", err) } + return writeBackManager +} +func setupMetaInfoGenerator(config Config, cas *store.CAStore) *metainfogen.Generator { metaInfoGenerator, err := metainfogen.New(config.MetaInfoGen, cas) if err != nil { log.Fatalf("Error creating metainfo generator: %s", err) } + return metaInfoGenerator +} - blobRefresher := blobrefresh.New(config.BlobRefresh, stats, cas, backendManager, metaInfoGenerator) +func setupBlobRefresher(config Config, stats tally.Scope, cas *store.CAStore, backendManager *backend.Manager, metaInfoGen *metainfogen.Generator) *blobrefresh.Refresher { + return blobrefresh.New(config.BlobRefresh, stats, cas, backendManager, metaInfoGen) +} +func setupNetworkEvents(config Config) networkevent.Producer { netevents, err := networkevent.NewProducer(config.NetworkEvent) if err != nil { log.Fatalf("Error creating network event producer: %s", err) } + return netevents +} +func setupScheduler(config Config, stats tally.Scope, pctx core.PeerContext, cas *store.CAStore, netevents networkevent.Producer, blobRefresher *blobrefresh.Refresher) scheduler.ReloadableScheduler { sched, err := scheduler.NewOriginScheduler( config.Scheduler, stats, pctx, cas, netevents, blobRefresher) if err != nil { log.Fatalf("Error creating scheduler: %s", err) } + return sched +} +func setupCluster(config Config) hostlist.List { cluster, err := hostlist.New(config.Cluster) if err != nil { log.Fatalf("Error creating cluster host list: %s", err) } + return cluster +} +func setupTLS(config Config) *tls.Config { tls, err := config.TLS.BuildClient() if err != nil { log.Fatalf("Error building client tls config: %s", err) } + return tls +} +func setupHashRing(config Config, flags *Flags, hostname string, cluster hostlist.List, tls *tls.Config, backendManager *backend.Manager) hashring.Ring { healthCheckFilter := healthcheck.NewFilter(config.HealthCheck, healthcheck.Default(tls)) hashRing := hashring.New( @@ -242,6 +357,7 @@ func Run(flags *Flags, opts ...Option) { hashring.WithWatcher(backend.NewBandwidthWatcher(backendManager))) go hashRing.Monitor(nil) + // Validate that this origin is in the hash ring addr := fmt.Sprintf("%s:%d", hostname, flags.BlobServerPort) if !hashRing.Contains(addr) { // When DNS is used for hash ring membership, the members will be IP @@ -258,24 +374,34 @@ func Run(flags *Flags, opts ...Option) { } } + return hashRing +} + +func setupBlobServer(config Config, flags *Flags, hostname string, components *coreComponents, stats tally.Scope) *blobserver.Server { + addr := fmt.Sprintf("%s:%d", hostname, flags.BlobServerPort) + server, err := blobserver.New( config.BlobServer, stats, clock.New(), addr, - hashRing, - cas, - blobclient.NewProvider(blobclient.WithTLS(tls)), - blobclient.NewClusterProvider(blobclient.WithTLS(tls)), - pctx, - backendManager, - blobRefresher, - metaInfoGenerator, - writeBackManager) + components.hashRing, + components.cas, + blobclient.NewProvider(blobclient.WithTLS(components.tls)), + blobclient.NewClusterProvider(blobclient.WithTLS(components.tls)), + components.pctx, + components.backendManager, + components.blobRefresher, + components.metaInfoGen, + components.writeBackManager) if err != nil { log.Fatalf("Error initializing blob server: %s", err) } + return server +} + +func startServices(config Config, flags *Flags, server *blobserver.Server, sched scheduler.ReloadableScheduler) { h := addTorrentDebugEndpoints(server.Handler(), sched) go func() { log.Fatal(server.ListenAndServe(h)) }()