Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions agent/agentserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/lib/torrent/scheduler"
"github.com/uber/kraken/tracker/announceclient"
"github.com/uber/kraken/utils/closers"
"github.com/uber/kraken/utils/handler"
"github.com/uber/kraken/utils/httputil"

Expand Down Expand Up @@ -65,8 +66,8 @@ func New(
sched scheduler.ReloadableScheduler,
tags tagclient.Client,
ac announceclient.Client,
containerRuntime containerruntime.Factory) *Server {

containerRuntime containerruntime.Factory,
) *Server {
stats = stats.Tagged(map[string]string{
"module": "agentserver",
})
Expand Down Expand Up @@ -118,13 +119,15 @@ func (s *Server) getTagHandler(w http.ResponseWriter, r *http.Request) error {
if err != nil {
return err
}

d, err := s.tags.Get(tag)
if err == tagclient.ErrTagNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
if err != nil {
if err == tagclient.ErrTagNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("get tag: %s", err)
}

io.WriteString(w, d.String())
return nil
}
Expand All @@ -139,25 +142,39 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err
if err != nil {
return err
}

f, err := s.cads.Cache().GetFileReader(d.Hex())
if err != nil {
if os.IsNotExist(err) || s.cads.InDownloadError(err) {
if err := s.sched.Download(namespace, d); err != nil {
if err == scheduler.ErrTorrentNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("download torrent: %s", err)
}
f, err = s.cads.Cache().GetFileReader(d.Hex())
if err != nil {
return handler.Errorf("store: %s", err)
}
} else {
return handler.Errorf("store: %s", err)

if err == nil {
defer closers.Close(f)
if _, err := io.Copy(w, f); err != nil {
return fmt.Errorf("copy file: %w", err)
}
return nil
}

if !os.IsNotExist(err) && !s.cads.InDownloadError(err) {
return handler.Errorf("store: %s", err)
}

if err := s.sched.Download(namespace, d); err != nil {
if err == scheduler.ErrTorrentNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("download torrent: %s", err)
}

// Get file reader after download completes
// Use Any() to check both download and cache directories, as the file
// might still be in the process of being moved from download to cache.
f, err = s.cads.Any().GetFileReader(d.Hex())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. What happens when a file is in the download dir, but is partially downloaded? Does it get returned?
  2. If we can serve blobs directly from the download dir, what is the purpose of having a download and a cache dir separately? Aren't we violating any atomicity invariants by serving data from the download dir?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. s.sched.Download() blocks until download is complete, concurrent requests are deduplicated, so it will not be returned
  2. I guess the purpose is:
  • Download dir: Incomplete files being assembled piece-by-piece
  • Cache dir: Complete, verified files ready for serving

But Any() is just handling the microsecond window where the move operation is in flight

if err != nil {
return handler.Errorf("store: %s", err)
}
defer closers.Close(f)

if _, err := io.Copy(w, f); err != nil {
return fmt.Errorf("copy file: %s", err)
return fmt.Errorf("copy file: %w", err)
}
return nil
}
Expand Down
11 changes: 8 additions & 3 deletions lib/dockerregistry/storage_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,20 @@ func (e InvalidRequestError) Error() string {
}

func toDriverError(err error, path string) error {
// Check for "not found" errors -> return 404
if errors.Is(err, os.ErrNotExist) ||
errors.Is(err, transfer.ErrBlobNotFound) ||
errors.Is(err, transfer.ErrTagNotFound) {
transfer.IsBlobNotFound(err) ||
transfer.IsTagNotFound(err) {
return driver.PathNotFoundError{
DriverName: Name,
Path: path,
}
}
return err
log.Errorf("Storage driver error for path %s: %v", path, err)
return driver.Error{
DriverName: Name,
Enclosed: err,
}
}

type krakenStorageDriverFactory struct {
Expand Down
41 changes: 38 additions & 3 deletions lib/dockerregistry/transfer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,45 @@
// limitations under the License.
package transfer

import "errors"
import (
"errors"
"fmt"
)

// ErrBlobNotFound is returned when a blob is not found by transferer.
var ErrBlobNotFound = errors.New("blob not found")
type ErrBlobNotFound struct {
Digest string
Reason string
}

func (e ErrBlobNotFound) Error() string {
if e.Reason != "" {
return fmt.Sprintf("blob %s not found: %s", e.Digest, e.Reason)
}
return fmt.Sprintf("blob %s not found", e.Digest)
}

// ErrTagNotFound is returned when a tag is not found by transferer.
var ErrTagNotFound = errors.New("tag not found")
type ErrTagNotFound struct {
Tag string
Reason string
}

func (e ErrTagNotFound) Error() string {
if e.Reason != "" {
return fmt.Sprintf("tag %s not found: %s", e.Tag, e.Reason)
}
return fmt.Sprintf("tag %s not found", e.Tag)
}

// IsBlobNotFound checks if an error is ErrBlobNotFound.
func IsBlobNotFound(err error) bool {
var e ErrBlobNotFound
return errors.As(err, &e)
}

// IsTagNotFound checks if an error is ErrTagNotFound.
func IsTagNotFound(err error) bool {
var e ErrTagNotFound
return errors.As(err, &e)
}
104 changes: 76 additions & 28 deletions lib/dockerregistry/transfer/ro_transferer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,47 +40,92 @@ func NewReadOnlyTransferer(
stats tally.Scope,
cads *store.CADownloadStore,
tags tagclient.Client,
sched scheduler.Scheduler) *ReadOnlyTransferer {

sched scheduler.Scheduler,
) *ReadOnlyTransferer {
stats = stats.Tagged(map[string]string{
"module": "rotransferer",
})

return &ReadOnlyTransferer{stats, cads, tags, sched}
}

// mapSchedulerError converts scheduler errors to appropriate transferer errors.
func mapSchedulerError(err error, d core.Digest) error {
// torrent not found → 404
if err == scheduler.ErrTorrentNotFound {
return ErrBlobNotFound{
Digest: d.Hex(),
Reason: "torrent not found in tracker",
}
}

// All other scheduler errors → 500 with context
return fmt.Errorf("download blob %s: %w", d.Hex(), err)
}

// Stat returns blob info from local cache, and triggers download if the blob is
// not available locally.
func (t *ReadOnlyTransferer) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) {
fi, err := t.cads.Cache().GetFileStat(d.Hex())
if os.IsNotExist(err) || t.cads.InDownloadError(err) {
if err := t.sched.Download(namespace, d); err != nil {
return nil, fmt.Errorf("scheduler: %s", err)
}
fi, err = t.cads.Cache().GetFileStat(d.Hex())
if err != nil {
return nil, fmt.Errorf("stat cache: %s", err)

if err == nil {
return core.NewBlobInfo(fi.Size()), nil
}

if !os.IsNotExist(err) && !t.cads.InDownloadError(err) {
return nil, fmt.Errorf("stat cache: %w", err)
}

if err := t.sched.Download(namespace, d); err != nil {
return nil, mapSchedulerError(err, d)
}

// Stat file after download completes
// Use Any() to check both download and cache directories, as the file
// might still be in the process of being moved from download to cache.
fi, err = t.cads.Any().GetFileStat(d.Hex())
if err == nil {
return core.NewBlobInfo(fi.Size()), nil
}
if os.IsNotExist(err) {
return nil, ErrBlobNotFound{
Digest: d.Hex(),
Reason: "file not found after download",
}
} else if err != nil {
return nil, fmt.Errorf("stat cache: %s", err)
}
return core.NewBlobInfo(fi.Size()), nil
return nil, fmt.Errorf("stat cache after download: %w", err)
}

// Download downloads blobs as torrent.
func (t *ReadOnlyTransferer) Download(namespace string, d core.Digest) (store.FileReader, error) {
f, err := t.cads.Cache().GetFileReader(d.Hex())
if os.IsNotExist(err) || t.cads.InDownloadError(err) {
if err := t.sched.Download(namespace, d); err != nil {
return nil, fmt.Errorf("scheduler: %s", err)
}
f, err = t.cads.Cache().GetFileReader(d.Hex())
if err != nil {
return nil, fmt.Errorf("cache: %s", err)

if err == nil {
return f, nil
}

if !os.IsNotExist(err) && !t.cads.InDownloadError(err) {
return nil, fmt.Errorf("get cache file: %w", err)
}

if err := t.sched.Download(namespace, d); err != nil {
return nil, mapSchedulerError(err, d)
}

// Get file reader after download completes
// Use Any() to check both download and cache directories, as the file
// might still be in the process of being moved from download to cache.
f, err = t.cads.Any().GetFileReader(d.Hex())
if err != nil {
if os.IsNotExist(err) {
return nil, ErrBlobNotFound{
Digest: d.Hex(),
Reason: "file not found on disk after download",
}
}
} else if err != nil {
return nil, fmt.Errorf("cache: %s", err)
return nil, fmt.Errorf("get file reader after download: %w", err)
}

return f, nil
}

Expand All @@ -92,15 +137,18 @@ func (t *ReadOnlyTransferer) Upload(namespace string, d core.Digest, blob store.
// GetTag gets manifest digest for tag.
func (t *ReadOnlyTransferer) GetTag(tag string) (core.Digest, error) {
d, err := t.tags.Get(tag)
if err != nil {
if err == tagclient.ErrTagNotFound {
t.stats.Counter("tag_not_found").Inc(1)
return core.Digest{}, ErrTagNotFound
if err == nil {
return d, nil
}
if err == tagclient.ErrTagNotFound {
t.stats.Counter("tag_not_found").Inc(1)
return core.Digest{}, ErrTagNotFound{
Tag: tag,
Reason: "not found in build-index",
}
t.stats.Counter("get_tag_error").Inc(1)
return core.Digest{}, fmt.Errorf("client get tag: %s", err)
}
return d, nil
t.stats.Counter("get_tag_error").Inc(1)
return core.Digest{}, fmt.Errorf("client get tag: %w", err)
}

// PutTag is not supported.
Expand Down
2 changes: 1 addition & 1 deletion lib/dockerregistry/transfer/ro_transferer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestReadOnlyTransfererGetTagNotFound(t *testing.T) {

_, err := transferer.GetTag(tag)
require.Error(err)
require.Equal(ErrTagNotFound, err)
require.True(IsTagNotFound(err))
}

// TODO(codyg): This is a particularly ugly test that is a symptom of the lack
Expand Down
Loading
Loading