diff --git a/agent/agentserver/server.go b/agent/agentserver/server.go index 93376021b..e3e0fb89b 100644 --- a/agent/agentserver/server.go +++ b/agent/agentserver/server.go @@ -32,6 +32,7 @@ import ( "github.com/uber/kraken/lib/store" "github.com/uber/kraken/lib/torrent/scheduler" "github.com/uber/kraken/tracker/announceclient" + "github.com/uber/kraken/utils/closers" "github.com/uber/kraken/utils/handler" "github.com/uber/kraken/utils/httputil" @@ -65,8 +66,8 @@ func New( sched scheduler.ReloadableScheduler, tags tagclient.Client, ac announceclient.Client, - containerRuntime containerruntime.Factory) *Server { - + containerRuntime containerruntime.Factory, +) *Server { stats = stats.Tagged(map[string]string{ "module": "agentserver", }) @@ -118,13 +119,15 @@ func (s *Server) getTagHandler(w http.ResponseWriter, r *http.Request) error { if err != nil { return err } + d, err := s.tags.Get(tag) + if err == tagclient.ErrTagNotFound { + return handler.ErrorStatus(http.StatusNotFound) + } if err != nil { - if err == tagclient.ErrTagNotFound { - return handler.ErrorStatus(http.StatusNotFound) - } return handler.Errorf("get tag: %s", err) } + io.WriteString(w, d.String()) return nil } @@ -139,25 +142,39 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err if err != nil { return err } + f, err := s.cads.Cache().GetFileReader(d.Hex()) - if err != nil { - if os.IsNotExist(err) || s.cads.InDownloadError(err) { - if err := s.sched.Download(namespace, d); err != nil { - if err == scheduler.ErrTorrentNotFound { - return handler.ErrorStatus(http.StatusNotFound) - } - return handler.Errorf("download torrent: %s", err) - } - f, err = s.cads.Cache().GetFileReader(d.Hex()) - if err != nil { - return handler.Errorf("store: %s", err) - } - } else { - return handler.Errorf("store: %s", err) + + if err == nil { + defer closers.Close(f) + if _, err := io.Copy(w, f); err != nil { + return fmt.Errorf("copy file: %w", err) } + return nil } + + if !os.IsNotExist(err) && !s.cads.InDownloadError(err) { + return handler.Errorf("store: %s", err) + } + + if err := s.sched.Download(namespace, d); err != nil { + if err == scheduler.ErrTorrentNotFound { + return handler.ErrorStatus(http.StatusNotFound) + } + return handler.Errorf("download torrent: %s", err) + } + + // Get file reader after download completes + // Use Any() to check both download and cache directories, as the file + // might still be in the process of being moved from download to cache. + f, err = s.cads.Any().GetFileReader(d.Hex()) + if err != nil { + return handler.Errorf("store: %s", err) + } + defer closers.Close(f) + if _, err := io.Copy(w, f); err != nil { - return fmt.Errorf("copy file: %s", err) + return fmt.Errorf("copy file: %w", err) } return nil } diff --git a/lib/dockerregistry/storage_driver.go b/lib/dockerregistry/storage_driver.go index 9f90eb075..8972144be 100644 --- a/lib/dockerregistry/storage_driver.go +++ b/lib/dockerregistry/storage_driver.go @@ -77,15 +77,20 @@ func (e InvalidRequestError) Error() string { } func toDriverError(err error, path string) error { + // Check for "not found" errors -> return 404 if errors.Is(err, os.ErrNotExist) || - errors.Is(err, transfer.ErrBlobNotFound) || - errors.Is(err, transfer.ErrTagNotFound) { + transfer.IsBlobNotFound(err) || + transfer.IsTagNotFound(err) { return driver.PathNotFoundError{ DriverName: Name, Path: path, } } - return err + log.Errorf("Storage driver error for path %s: %v", path, err) + return driver.Error{ + DriverName: Name, + Enclosed: err, + } } type krakenStorageDriverFactory struct { diff --git a/lib/dockerregistry/transfer/errors.go b/lib/dockerregistry/transfer/errors.go index 9aff892f9..5e219de13 100644 --- a/lib/dockerregistry/transfer/errors.go +++ b/lib/dockerregistry/transfer/errors.go @@ -13,10 +13,45 @@ // limitations under the License. package transfer -import "errors" +import ( + "errors" + "fmt" +) // ErrBlobNotFound is returned when a blob is not found by transferer. -var ErrBlobNotFound = errors.New("blob not found") +type ErrBlobNotFound struct { + Digest string + Reason string +} + +func (e ErrBlobNotFound) Error() string { + if e.Reason != "" { + return fmt.Sprintf("blob %s not found: %s", e.Digest, e.Reason) + } + return fmt.Sprintf("blob %s not found", e.Digest) +} // ErrTagNotFound is returned when a tag is not found by transferer. -var ErrTagNotFound = errors.New("tag not found") +type ErrTagNotFound struct { + Tag string + Reason string +} + +func (e ErrTagNotFound) Error() string { + if e.Reason != "" { + return fmt.Sprintf("tag %s not found: %s", e.Tag, e.Reason) + } + return fmt.Sprintf("tag %s not found", e.Tag) +} + +// IsBlobNotFound checks if an error is ErrBlobNotFound. +func IsBlobNotFound(err error) bool { + var e ErrBlobNotFound + return errors.As(err, &e) +} + +// IsTagNotFound checks if an error is ErrTagNotFound. +func IsTagNotFound(err error) bool { + var e ErrTagNotFound + return errors.As(err, &e) +} diff --git a/lib/dockerregistry/transfer/ro_transferer.go b/lib/dockerregistry/transfer/ro_transferer.go index 5913034de..21503477a 100644 --- a/lib/dockerregistry/transfer/ro_transferer.go +++ b/lib/dockerregistry/transfer/ro_transferer.go @@ -40,8 +40,8 @@ func NewReadOnlyTransferer( stats tally.Scope, cads *store.CADownloadStore, tags tagclient.Client, - sched scheduler.Scheduler) *ReadOnlyTransferer { - + sched scheduler.Scheduler, +) *ReadOnlyTransferer { stats = stats.Tagged(map[string]string{ "module": "rotransferer", }) @@ -49,38 +49,83 @@ func NewReadOnlyTransferer( return &ReadOnlyTransferer{stats, cads, tags, sched} } +// mapSchedulerError converts scheduler errors to appropriate transferer errors. +func mapSchedulerError(err error, d core.Digest) error { + // torrent not found → 404 + if err == scheduler.ErrTorrentNotFound { + return ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "torrent not found in tracker", + } + } + + // All other scheduler errors → 500 with context + return fmt.Errorf("download blob %s: %w", d.Hex(), err) +} + // Stat returns blob info from local cache, and triggers download if the blob is // not available locally. func (t *ReadOnlyTransferer) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) { fi, err := t.cads.Cache().GetFileStat(d.Hex()) - if os.IsNotExist(err) || t.cads.InDownloadError(err) { - if err := t.sched.Download(namespace, d); err != nil { - return nil, fmt.Errorf("scheduler: %s", err) - } - fi, err = t.cads.Cache().GetFileStat(d.Hex()) - if err != nil { - return nil, fmt.Errorf("stat cache: %s", err) + + if err == nil { + return core.NewBlobInfo(fi.Size()), nil + } + + if !os.IsNotExist(err) && !t.cads.InDownloadError(err) { + return nil, fmt.Errorf("stat cache: %w", err) + } + + if err := t.sched.Download(namespace, d); err != nil { + return nil, mapSchedulerError(err, d) + } + + // Stat file after download completes + // Use Any() to check both download and cache directories, as the file + // might still be in the process of being moved from download to cache. + fi, err = t.cads.Any().GetFileStat(d.Hex()) + if err == nil { + return core.NewBlobInfo(fi.Size()), nil + } + if os.IsNotExist(err) { + return nil, ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "file not found after download", } - } else if err != nil { - return nil, fmt.Errorf("stat cache: %s", err) } - return core.NewBlobInfo(fi.Size()), nil + return nil, fmt.Errorf("stat cache after download: %w", err) } // Download downloads blobs as torrent. func (t *ReadOnlyTransferer) Download(namespace string, d core.Digest) (store.FileReader, error) { f, err := t.cads.Cache().GetFileReader(d.Hex()) - if os.IsNotExist(err) || t.cads.InDownloadError(err) { - if err := t.sched.Download(namespace, d); err != nil { - return nil, fmt.Errorf("scheduler: %s", err) - } - f, err = t.cads.Cache().GetFileReader(d.Hex()) - if err != nil { - return nil, fmt.Errorf("cache: %s", err) + + if err == nil { + return f, nil + } + + if !os.IsNotExist(err) && !t.cads.InDownloadError(err) { + return nil, fmt.Errorf("get cache file: %w", err) + } + + if err := t.sched.Download(namespace, d); err != nil { + return nil, mapSchedulerError(err, d) + } + + // Get file reader after download completes + // Use Any() to check both download and cache directories, as the file + // might still be in the process of being moved from download to cache. + f, err = t.cads.Any().GetFileReader(d.Hex()) + if err != nil { + if os.IsNotExist(err) { + return nil, ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "file not found on disk after download", + } } - } else if err != nil { - return nil, fmt.Errorf("cache: %s", err) + return nil, fmt.Errorf("get file reader after download: %w", err) } + return f, nil } @@ -92,15 +137,18 @@ func (t *ReadOnlyTransferer) Upload(namespace string, d core.Digest, blob store. // GetTag gets manifest digest for tag. func (t *ReadOnlyTransferer) GetTag(tag string) (core.Digest, error) { d, err := t.tags.Get(tag) - if err != nil { - if err == tagclient.ErrTagNotFound { - t.stats.Counter("tag_not_found").Inc(1) - return core.Digest{}, ErrTagNotFound + if err == nil { + return d, nil + } + if err == tagclient.ErrTagNotFound { + t.stats.Counter("tag_not_found").Inc(1) + return core.Digest{}, ErrTagNotFound{ + Tag: tag, + Reason: "not found in build-index", } - t.stats.Counter("get_tag_error").Inc(1) - return core.Digest{}, fmt.Errorf("client get tag: %s", err) } - return d, nil + t.stats.Counter("get_tag_error").Inc(1) + return core.Digest{}, fmt.Errorf("client get tag: %w", err) } // PutTag is not supported. diff --git a/lib/dockerregistry/transfer/ro_transferer_test.go b/lib/dockerregistry/transfer/ro_transferer_test.go index 5ffb61ede..08a06430f 100644 --- a/lib/dockerregistry/transfer/ro_transferer_test.go +++ b/lib/dockerregistry/transfer/ro_transferer_test.go @@ -143,7 +143,7 @@ func TestReadOnlyTransfererGetTagNotFound(t *testing.T) { _, err := transferer.GetTag(tag) require.Error(err) - require.Equal(ErrTagNotFound, err) + require.True(IsTagNotFound(err)) } // TODO(codyg): This is a particularly ugly test that is a symptom of the lack diff --git a/lib/dockerregistry/transfer/rw_transferer.go b/lib/dockerregistry/transfer/rw_transferer.go index e79fafd90..ed393f61f 100644 --- a/lib/dockerregistry/transfer/rw_transferer.go +++ b/lib/dockerregistry/transfer/rw_transferer.go @@ -42,7 +42,6 @@ func NewReadWriteTransferer( tags tagclient.Client, originCluster blobclient.ClusterClient, cas *store.CAStore) *ReadWriteTransferer { - stats = stats.Tagged(map[string]string{ "module": "rwtransferer", }) @@ -53,66 +52,72 @@ func NewReadWriteTransferer( // Stat returns blob info from origin cluster or local cache. func (t *ReadWriteTransferer) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) { fi, err := t.cas.GetCacheFileStat(d.Hex()) - if err != nil { - if os.IsNotExist(err) { - return t.originStat(namespace, d) - } - return nil, fmt.Errorf("stat cache file: %s", err) + if err == nil { + return core.NewBlobInfo(fi.Size()), nil + } + if os.IsNotExist(err) { + return t.originStat(namespace, d) } - return core.NewBlobInfo(fi.Size()), nil + return nil, fmt.Errorf("stat cache file: %w", err) } func (t *ReadWriteTransferer) originStat(namespace string, d core.Digest) (*core.BlobInfo, error) { bi, err := t.originCluster.Stat(namespace, d) - if err != nil { - // `docker push` stats blobs before uploading them. If the blob is not - // found, it will upload it. However if remote blob storage is unavailable, - // this will be a 5XX error, and will short-circuit push. We must consider - // this class of error to be a 404 to allow pushes to succeed while remote - // storage is down (write-back will eventually persist the blobs). - if err != blobclient.ErrBlobNotFound { - log.With("digest", d).Info("Error stat-ing origin blob: %s", err) - } - return nil, ErrBlobNotFound + if err == nil { + return bi, nil + } + // `docker push` stats blobs before uploading them. If the blob is not + // found, it will upload it. However if remote blob storage is unavailable, + // this will be a 5XX error, and will short-circuit push. We must consider + // this class of error to be a 404 to allow pushes to succeed while remote + // storage is down (write-back will eventually persist the blobs). + if err != blobclient.ErrBlobNotFound { + log.With("digest", d).Info("Error stat-ing origin blob: %s", err) + } + return nil, ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "not found in origin cluster", } - return bi, nil } // Download downloads the blob of name into the file store and returns a reader // to the newly downloaded file. func (t *ReadWriteTransferer) Download(namespace string, d core.Digest) (store.FileReader, error) { blob, err := t.cas.GetCacheFileReader(d.Hex()) - if err != nil { - if os.IsNotExist(err) { - return t.downloadFromOrigin(namespace, d) - } - return nil, fmt.Errorf("get cache file: %s", err) + if err == nil { + return blob, nil } - return blob, nil + if os.IsNotExist(err) { + return t.downloadFromOrigin(namespace, d) + } + return nil, fmt.Errorf("get cache file: %w", err) } func (t *ReadWriteTransferer) downloadFromOrigin(namespace string, d core.Digest) (store.FileReader, error) { tmp := fmt.Sprintf("%s.%s", d.Hex(), uuid.Generate().String()) if err := t.cas.CreateUploadFile(tmp, 0); err != nil { - return nil, fmt.Errorf("create upload file: %s", err) + return nil, fmt.Errorf("create upload file: %w", err) } w, err := t.cas.GetUploadFileReadWriter(tmp) if err != nil { - return nil, fmt.Errorf("get upload writer: %s", err) + return nil, fmt.Errorf("get upload writer: %w", err) } defer w.Close() if err := t.originCluster.DownloadBlob(namespace, d, w); err != nil { if err == blobclient.ErrBlobNotFound { - return nil, ErrBlobNotFound + return nil, ErrBlobNotFound{ + Digest: d.Hex(), + Reason: "not found in origin cluster", + } } - return nil, fmt.Errorf("origin: %s", err) + return nil, fmt.Errorf("origin download: %w", err) } if err := t.cas.MoveUploadFileToCache(tmp, d.Hex()); err != nil && !os.IsExist(err) { - return nil, fmt.Errorf("move upload file to cache: %s", err) + return nil, fmt.Errorf("move upload file to cache: %w", err) } blob, err := t.cas.GetCacheFileReader(d.Hex()) if err != nil { - return nil, fmt.Errorf("get cache file: %s", err) + return nil, fmt.Errorf("get cache file: %w", err) } return blob, nil } @@ -120,27 +125,29 @@ func (t *ReadWriteTransferer) downloadFromOrigin(namespace string, d core.Digest // Upload uploads blob to the origin cluster. func (t *ReadWriteTransferer) Upload( namespace string, d core.Digest, blob store.FileReader) error { - return t.originCluster.UploadBlob(namespace, d, blob) } // GetTag returns the manifest digest for tag. func (t *ReadWriteTransferer) GetTag(tag string) (core.Digest, error) { d, err := t.tags.Get(tag) - if err != nil { - if err == tagclient.ErrTagNotFound { - return core.Digest{}, ErrTagNotFound + if err == nil { + return d, nil + } + if err == tagclient.ErrTagNotFound { + return core.Digest{}, ErrTagNotFound{ + Tag: tag, + Reason: "not found in build-index", } - return core.Digest{}, fmt.Errorf("client get tag: %s", err) } - return d, nil + return core.Digest{}, fmt.Errorf("client get tag: %w", err) } // PutTag uploads d as the manifest digest for tag. func (t *ReadWriteTransferer) PutTag(tag string, d core.Digest) error { if err := t.tags.PutAndReplicate(tag, d); err != nil { t.stats.Counter("put_tag_error").Inc(1) - return fmt.Errorf("put and replicate tag: %s", err) + return fmt.Errorf("put and replicate tag: %w", err) } return nil } diff --git a/lib/dockerregistry/transfer/rw_transferer_test.go b/lib/dockerregistry/transfer/rw_transferer_test.go index 681949fe0..e7c5169cb 100644 --- a/lib/dockerregistry/transfer/rw_transferer_test.go +++ b/lib/dockerregistry/transfer/rw_transferer_test.go @@ -115,7 +115,7 @@ func TestReadWriteTransfererGetTagNotFound(t *testing.T) { _, err := transferer.GetTag(tag) require.Error(err) - require.Equal(ErrTagNotFound, err) + require.True(IsTagNotFound(err)) } func TestReadWriteTransfererPutTag(t *testing.T) { @@ -191,5 +191,5 @@ func TestReadWriteTransfererStatNotFoundOnAnyOriginError(t *testing.T) { mocks.originCluster.EXPECT().Stat(namespace, blob.Digest).Return(nil, errors.New("any error")) _, err := transferer.Stat(namespace, blob.Digest) - require.Equal(ErrBlobNotFound, err) + require.True(IsBlobNotFound(err)) } diff --git a/lib/dockerregistry/transfer/testing.go b/lib/dockerregistry/transfer/testing.go index 362b5be52..b6ae68bd0 100644 --- a/lib/dockerregistry/transfer/testing.go +++ b/lib/dockerregistry/transfer/testing.go @@ -67,7 +67,10 @@ func (t *testTransferer) GetTag(tag string) (core.Digest, error) { } d, ok := t.tags[p] if !ok { - return core.Digest{}, ErrTagNotFound + return core.Digest{}, ErrTagNotFound{ + Tag: tag, + Reason: "not found in test transferer", + } } return d, nil } diff --git a/lib/store/base/errors.go b/lib/store/base/errors.go index e5785ac16..b28868d23 100644 --- a/lib/store/base/errors.go +++ b/lib/store/base/errors.go @@ -13,7 +13,10 @@ // limitations under the License. package base -import "fmt" +import ( + "fmt" + "path/filepath" +) // FileStateError represents errors related to file state. // It's used when a file is not in the state it was supposed to be in. @@ -25,8 +28,8 @@ type FileStateError struct { } func (e *FileStateError) Error() string { - return fmt.Sprintf("failed to perform \"%s\" on %s/%s: %s", - e.Op, e.State.GetDirectory(), e.Name, e.Msg) + return fmt.Sprintf("failed to perform \"%s\" on %s: %s", + e.Op, filepath.Join(e.State.GetDirectory(), e.Name), e.Msg) } // IsFileStateError returns true if the param is of FileStateError type. diff --git a/lib/store/base/file_entry.go b/lib/store/base/file_entry.go index db0431999..15bc55ce6 100644 --- a/lib/store/base/file_entry.go +++ b/lib/store/base/file_entry.go @@ -304,7 +304,7 @@ func (entry *localFileEntry) Create(targetState FileState, size int64) error { return err } - return f.Close() + return nil } // Reload tries to reload a file that doesn't exist in memory from disk.