Skip to content


Merge pull request #107 from docker-library/revert-105-parallel-deploy
Browse files Browse the repository at this point in the history
Revert "Implement parallelism in deploy"
  • Loading branch information
yosifkit authored Jan 28, 2025
2 parents e25ff36 + dfd62f2 commit 4b8e582
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 232 deletions.
39 changes: 1 addition & 38 deletions .test/
Original file line number Diff line number Diff line change
Expand Up @@ -168,43 +168,6 @@ if [ -n "$doDeploy" ]; then
data: ("json string" | @json + "\n" | @base64),
# test pushing a full, actual image (tianon/true:oci@sha256:9ef42f1d602fb423fad935aac1caa0cfdbce1ad7edce64d080a4eb7b13f7cd9d), all parts
# config blob
type: "blob",
refs: [$reg+"/true"],
data: "ewoJImFyY2hpdGVjdHVyZSI6ICJhbWQ2NCIsCgkiY29uZmlnIjogewoJCSJDbWQiOiBbCgkJCSIvdHJ1ZSIKCQldCgl9LAoJImNyZWF0ZWQiOiAiMjAyMy0wMi0wMVQwNjo1MToxMVoiLAoJImhpc3RvcnkiOiBbCgkJewoJCQkiY3JlYXRlZCI6ICIyMDIzLTAyLTAxVDA2OjUxOjExWiIsCgkJCSJjcmVhdGVkX2J5IjogImh0dHBzOi8vZ2l0aHViLmNvbS90aWFub24vZG9ja2VyZmlsZXMvdHJlZS9tYXN0ZXIvdHJ1ZSIKCQl9CgldLAoJIm9zIjogImxpbnV4IiwKCSJyb290ZnMiOiB7CgkJImRpZmZfaWRzIjogWwoJCQkic2hhMjU2OjY1YjVhNDU5M2NjNjFkM2VhNmQzNTVmYjk3YzA0MzBkODIwZWUyMWFhODUzNWY1ZGU0NWU3NWMzMTk1NGI3NDMiCgkJXSwKCQkidHlwZSI6ICJsYXllcnMiCgl9Cn0K",
# layer blob
type: "blob",
refs: [$reg+"/true"],
type: "manifest",
refs: [ "oci", "latest", (range(0; 10)) | $reg+"/true:\(.)", $reg+"/foo/true:\(.)" ], # test pushing a whole bunch of tags in multiple repos
lookup: {
# a few explicit lookup entries for better code coverage (dep calculation during parallelization)
"sha256:1c51fc286aa95d9413226599576bafa38490b1e292375c90de095855b64caea6": ($reg+"/true"),
"": ($reg+"/true"),
data: {
schemaVersion: 2,
mediaType: "application/vnd.oci.image.manifest.v1+json",
config: {
mediaType: "application/vnd.oci.image.config.v1+json",
digest: "sha256:25be82253336f0b8c4347bc4ecbbcdc85d0e0f118ccf8dc2e119c0a47a0a486e",
size: 396,
layers: [ {
mediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
digest: "sha256:1c51fc286aa95d9413226599576bafa38490b1e292375c90de095855b64caea6",
size: 117,
} ],
# test blob mounting between repositories
type: "blob",
Expand Down Expand Up @@ -250,7 +213,7 @@ if [ -n "$doDeploy" ]; then
')" # stored in a variable for easier debugging ("bash -x")

time "$coverage/bin/deploy" <<<"$json"
"$coverage/bin/deploy" <<<"$json"

docker rm -vf meta-scripts-test-registry
trap - EXIT
Expand Down
54 changes: 18 additions & 36 deletions cmd/deploy/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (


Expand Down Expand Up @@ -47,14 +46,7 @@ type inputNormalized struct {
Data []byte `json:"data"`
CopyFrom *registry.Reference `json:"copyFrom"`

// if CopyFrom is nil and Type is manifest, this will be set (used by "do")
MediaType string `json:"mediaType,omitempty"`

func (normal inputNormalized) clone() inputNormalized {
// normal.Lookup is the only thing we have concurrency issues with, so it's the only thing we'll explicitly clone 😇
normal.Lookup = maps.Clone(normal.Lookup)
return normal
Do func(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) `json:"-"`

func normalizeInputRefs(deployType deployType, rawRefs []string) ([]registry.Reference, ociregistry.Digest, error) {
Expand Down Expand Up @@ -230,7 +222,6 @@ func NormalizeInput(raw inputRaw) (inputNormalized, error) {
normal.Lookup[d] = ref

// front-load some validation / data extraction for "" to work
switch normal.Type {
case typeManifest:
if normal.CopyFrom == nil {
Expand All @@ -249,42 +240,33 @@ func NormalizeInput(raw inputRaw) (inputNormalized, error) {
// and our logic for pushing children needs to know the mediaType (see the GHSAs referenced above)
return normal, fmt.Errorf("%s: pushing manifest but missing 'mediaType'", debugId)
normal.MediaType = mediaTypeHaver.MediaType

case typeBlob:
if normal.CopyFrom != nil && normal.CopyFrom.Digest == "" {
return normal, fmt.Errorf("%s: blobs are always by-digest, and thus need a digest: %s", debugId, normal.CopyFrom)

panic("unknown type: " + string(normal.Type))
// panic instead of error because this should've already been handled/normalized above (so this is a coding error, not a runtime error)

return normal, nil

// WARNING: many of these codepaths will end up writing to "normal.Lookup", which because it's a map is passed by reference, so this method is *not* safe for concurrent invocation on a single "normal" object! see "normal.clone" (above)
func (normal inputNormalized) do(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) {
switch normal.Type {
case typeManifest:
if normal.CopyFrom == nil {
// TODO panic on bad data, like MediaType being empty?
return registry.EnsureManifest(ctx, dstRef, normal.Data, normal.MediaType, normal.Lookup)
normal.Do = func(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) {
return registry.EnsureManifest(ctx, dstRef, normal.Data, mediaTypeHaver.MediaType, normal.Lookup)
} else {
return registry.CopyManifest(ctx, *normal.CopyFrom, dstRef, normal.Lookup)
normal.Do = func(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) {
return registry.CopyManifest(ctx, *normal.CopyFrom, dstRef, normal.Lookup)

case typeBlob:
if normal.CopyFrom == nil {
return registry.EnsureBlob(ctx, dstRef, int64(len(normal.Data)), bytes.NewReader(normal.Data))
normal.Do = func(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) {
return registry.EnsureBlob(ctx, dstRef, int64(len(normal.Data)), bytes.NewReader(normal.Data))
} else {
return registry.CopyBlob(ctx, *normal.CopyFrom, dstRef)
if normal.CopyFrom.Digest == "" {
return normal, fmt.Errorf("%s: blobs are always by-digest, and thus need a digest: %s", debugId, normal.CopyFrom)
normal.Do = func(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) {
return registry.CopyBlob(ctx, *normal.CopyFrom, dstRef)

panic("unknown type: " + string(normal.Type))
// panic instead of error because this should've already been handled/normalized above (so this is a coding error, not a runtime error)

return normal, nil
8 changes: 4 additions & 4 deletions cmd/deploy/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestNormalizeInput(t *testing.T) {
"refs": [ "localhost:5000/example:test" ],
"data": {"mediaType": "application/vnd.oci.image.index.v1+json"}
"manifest raw",
Expand All @@ -290,7 +290,7 @@ func TestNormalizeInput(t *testing.T) {
"refs": [ "localhost:5000/example" ],
"data": "eyJtZWRpYVR5cGUiOiAiYXBwbGljYXRpb24vdm5kLm9jaS5pbWFnZS5pbmRleC52MStqc29uIn0="

Expand All @@ -301,7 +301,7 @@ func TestNormalizeInput(t *testing.T) {
"lookup": { "sha256:9ef42f1d602fb423fad935aac1caa0cfdbce1ad7edce64d080a4eb7b13f7cd9d": "tianon/true" },
"data": {"mediaType": "application/vnd.oci.image.index.v1+json","manifests":[{"mediaType":"application/vnd.oci.image.manifest.v1+json","digest":"sha256:9ef42f1d602fb423fad935aac1caa0cfdbce1ad7edce64d080a4eb7b13f7cd9d","size":1165}],"schemaVersion":2}
Expand All @@ -311,7 +311,7 @@ func TestNormalizeInput(t *testing.T) {
"lookup": { "": "tianon/true" },
"data": {"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json","config":{"mediaType":"application/vnd.docker.container.image.v1+json","size":1471,"digest":"sha256:690912094c0165c489f874c72cee4ba208c28992c0699fa6e10d8cc59f93fec9"},"layers":[{"mediaType":"application/vnd.docker.image.rootfs.diff.tar.gzip","size":129,"digest":"sha256:4c74d744397d4bcbd3079d9c82a87b80d43da376313772978134d1288f20518c"}]}

Expand Down
142 changes: 15 additions & 127 deletions cmd/deploy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ import (


ocispec ""

func main() {
Expand All @@ -37,11 +32,6 @@ func main() {

// a set of RWMutex objects for synchronizing the pushing of "child" objects before their parents later in the list of documents
// for every RWMutex, it will be *write*-locked during push, and *read*-locked during reading (which means we won't limit the parallelization of multiple parents after a given child is pushed, but we will stop parents from being pushed before their children)
childMutexes := sync.Map{}
wg := sync.WaitGroup{}

dec := json.NewDecoder(stdout)
for dec.More() {
var raw inputRaw
Expand All @@ -58,128 +48,26 @@ func main() {
refsDigest := normal.Refs[0].Digest

var logSuffix string = " (" + string(raw.Type) + ") "
if normal.CopyFrom != nil {
// normal copy (one repo/registry to another)
logSuffix = " 🤝" + logSuffix + normal.CopyFrom.String()
// "localhost:32774/test 🤝 (manifest) tianon/test@sha256:4077658bc7e39f02f81d1682fe49f66b3db2c420813e43f5db0c53046167c12f"
if normal.CopyFrom == nil {
fmt.Printf("Pushing %s %s:\n", raw.Type, refsDigest)
} else {
// push (raw/embedded blob or manifest data)
logSuffix = " 🦾" + logSuffix + string(refsDigest)
// "localhost:32774/test 🦾 (blob) sha256:1a51828d59323e0e02522c45652b6a7a44a032b464b06d574f067d2358b0e9f1"
fmt.Printf("Copying %s %s:\n", raw.Type, *normal.CopyFrom)
startedPrefix := "❔ "
successPrefix := "✅ "
failurePrefix := "❌ "

// locks are per-digest, but refs might be 20 tags on the same digest, so we need to get one write lock per repo@digest and release it when the first tag completes, and every other tag needs a read lock
seenRefs := map[string]bool{}

for _, ref := range normal.Refs {
ref := ref //

necessaryReadLockRefs := []registry.Reference{}

// before parallelization, collect the pushing "child" mutex we need to lock for writing right away (but only for the first entry)
var mutex *sync.RWMutex
if ref.Digest != "" {
lockRef := ref
lockRef.Tag = ""
lockRefStr := lockRef.String()
if seenRefs[lockRefStr] {
// if we've already seen this specific ref for this input, we need a read lock, not a write lock (since they're per-repo@digest)
necessaryReadLockRefs = append(necessaryReadLockRefs, lockRef)
} else {
seenRefs[lockRefStr] = true
lock, _ := childMutexes.LoadOrStore(lockRefStr, &sync.RWMutex{})
mutex = lock.(*sync.RWMutex)
// if we have a "child" mutex, lock it immediately so we don't create a race between inputs
mutex.Lock() // (this gets unlocked in the goroutine below)
// this is sane to lock here because interdependent inputs are required to be in-order (children first), so if this hangs it's 100% a bug in the input order
fmt.Printf(" - %s", ref.StringWithKnownDigest(refsDigest))
desc, err := normal.Do(ctx, ref)
if err != nil {
fmt.Fprintf(os.Stderr, " -- ERROR: %v\n", err)

// make a (deep) copy of "normal" so that we can use it in a goroutine ("" is not safe for concurrent invocation)
normal := normal.clone()

go func() {
defer wg.Done()

if mutex != nil {
defer mutex.Unlock()

// before we start this job (parallelized), if it's a raw data job we need to parse the raw data and see if any of the "children" are objects we're still in the process of pushing (from a previously parallel job)
if len(normal.Data) > 2 { // needs to at least be bigger than "{}" for us to care (anything else either doesn't have data or can't have children)
// explicitly ignoring errors because this might not actually be JSON (or even a manifest at all!); this is best-effort
// TODO optimize this by checking whether normal.Data matches "^\s*{.+}\s*$" first so we have some assurance it might work before we go further?
manifestChildren, _ := registry.ParseManifestChildren(normal.Data)
childDescs := []ocispec.Descriptor{}
childDescs = append(childDescs, manifestChildren.Manifests...)
if manifestChildren.Config != nil {
childDescs = append(childDescs, *manifestChildren.Config)
childDescs = append(childDescs, manifestChildren.Layers...)
for _, childDesc := range childDescs {
childRef := ref
childRef.Digest = childDesc.Digest
necessaryReadLockRefs = append(necessaryReadLockRefs, childRef)

// these read locks are cheap, so let's be aggressive with our "lookup" refs too
if lookupRef, ok := normal.Lookup[childDesc.Digest]; ok {
lookupRef.Digest = childDesc.Digest
necessaryReadLockRefs = append(necessaryReadLockRefs, lookupRef)
if fallbackRef, ok := normal.Lookup[""]; ok {
fallbackRef.Digest = childDesc.Digest
necessaryReadLockRefs = append(necessaryReadLockRefs, fallbackRef)
// we don't *know* that all the lookup references are children, but if any of them have an explicit digest, let's treat them as potential children too (which is fair, because they *are* explicit potential references that it's sane to make sure exist)
for digest, lookupRef := range normal.Lookup {
necessaryReadLockRefs = append(necessaryReadLockRefs, lookupRef)
if digest != lookupRef.Digest {
lookupRef.Digest = digest
necessaryReadLockRefs = append(necessaryReadLockRefs, lookupRef)
// if we're going to do a copy, we need to *also* include the artifact we're copying in our list
if normal.CopyFrom != nil {
necessaryReadLockRefs = append(necessaryReadLockRefs, *normal.CopyFrom)
// ok, we've built up a list, let's start grabbing (ro) mutexes
seenChildren := map[string]bool{}
for _, lockRef := range necessaryReadLockRefs {
lockRef.Tag = ""
if lockRef.Digest == "" {
lockRefStr := lockRef.String()
if seenChildren[lockRefStr] {
seenChildren[lockRefStr] = true
lock, _ := childMutexes.LoadOrStore(lockRefStr, &sync.RWMutex{})
defer lock.(*sync.RWMutex).RUnlock()

logText := ref.StringWithKnownDigest(refsDigest) + logSuffix
fmt.Println(startedPrefix + logText)
desc, err :=, ref)
if err != nil {
fmt.Fprintf(os.Stderr, "%s%s -- ERROR: %v\n", failurePrefix, logText, err)
panic(err) // TODO exit in a more clean way (we can't use "os.Exit" because that causes *more* errors 😭)
if ref.Digest == "" && refsDigest == "" {
logText += "@" + string(desc.Digest)
fmt.Println(successPrefix + logText)
if ref.Digest == "" && refsDigest == "" {
fmt.Printf("@%s", desc.Digest)

25 changes: 0 additions & 25 deletions registry/manifest-children.go

This file was deleted.


0 comments on commit 4b8e582

Please sign in to comment.