diff --git a/codegen.go b/codegen.go index a54518e9..a9d0c507 100755 --- a/codegen.go +++ b/codegen.go @@ -1,10 +1,15 @@ package shuffle import ( + "archive/tar" "archive/zip" + "bufio" "bytes" + "compress/gzip" "context" "crypto/md5" + "crypto/sha1" + "crypto/sha256" "encoding/json" "errors" "fmt" @@ -12,6 +17,7 @@ import ( "io/ioutil" "log" "net/http" + "net/url" "os" "regexp" "runtime" @@ -20,13 +26,12 @@ import ( "strings" "sync" "time" - "crypto/sha1" - "gopkg.in/yaml.v2" "cloud.google.com/go/storage" - uuid "github.com/satori/go.uuid" - "github.com/frikky/kin-openapi/openapi3" docker "github.com/docker/docker/client" + "github.com/frikky/kin-openapi/openapi3" + uuid "github.com/satori/go.uuid" + "gopkg.in/yaml.v2" ) var downloadedImages = []string{} @@ -51,6 +56,9 @@ var pythonReplacements = map[string]string{ ">": "", "'": "", } +type countingWriter struct{ + n *int64 +} func CopyFile(fromfile, tofile string) error { from, err := os.Open(fromfile) @@ -4022,6 +4030,150 @@ func RemoveJsonValues(input []byte, depth int64) ([]byte, string, error) { return input, keyToken, nil } +// TIL: +func splitRef(full string) (host, repo, tag string) { + parts := strings.SplitN(full, "/", 2) + host = parts[0] + remainder := "" + if len(parts) > 1 { remainder = parts[1] } + tag = "latest" + if i := strings.LastIndex(remainder, ":"); i != -1 { + repo = remainder[:i] + tag = remainder[i+1:] + } else { + repo = remainder + } + + log.Printf("%s -> %s %s %s", full, host, repo, tag) + return +} + +func hexOf(b []byte) string { + h := sha256.Sum256(b) + return fmt.Sprintf("%x", h[:]) +} + +// upload a single layer.tar by gzipping on the fly and streaming to registry using chunked upload. +// returns (compressedDigest, compressedSize, diffID). +func uploadLayerToRegistry(regBase, repoPath string, layer io.Reader) (string, int64, string, error) { + startURL := fmt.Sprintf("%s/v2/%s/blobs/uploads/", regBase, repoPath) + reqStart, _ := http.NewRequest(http.MethodPost, startURL, nil) + loc, err := followLocation(reqStart) + if err != nil { + return "", 0, "", fmt.Errorf("start upload: %s", err) + } + + // pipe: gzip(layer) -> (count+hash) -> PATCH + pr, pw := io.Pipe() + var wg sync.WaitGroup + var compBytes int64 + compHash := sha256.New() + diffHash := sha256.New() + + wg.Add(1) + go func() { + defer wg.Done() + defer pw.Close() + gzw, _ := gzip.NewWriterLevel(io.MultiWriter(pw, countingWriter{&compBytes}, compHash), gzip.BestSpeed) + _, copyErr := io.Copy(gzw, io.TeeReader(layer, diffHash)) + if cerr := gzw.Close(); copyErr == nil { copyErr = cerr } + if copyErr != nil { _ = pw.CloseWithError(copyErr) } + }() + + reqPatch, _ := http.NewRequest("PATCH", loc, pr) + reqPatch.Header.Set("Content-Type", "application/octet-stream") + loc, err = followLocation(reqPatch) + if err != nil { + return "", 0, "", fmt.Errorf("patch upload: %w", err) + } + + wg.Wait() + compDigest := "sha256:" + fmt.Sprintf("%x", compHash.Sum(nil)) + diffID := "sha256:" + fmt.Sprintf("%x", diffHash.Sum(nil)) + + finalURL := loc + if strings.Contains(finalURL, "?") { + finalURL = finalURL + "&digest=" + compDigest + } else { + finalURL = finalURL + "?digest=" + compDigest + } + + reqPut, _ := http.NewRequest(http.MethodPut, finalURL, nil) + if _, err := regDo(reqPut); err != nil { + return "", 0, "", fmt.Errorf("finalize upload: %w", err) + } + + return compDigest, compBytes, diffID, nil +} + +func uploadBlobOnce(regBase, repoPath string, r io.Reader, size int64, digest, contentType string) error { + startURL := fmt.Sprintf("%s/v2/%s/blobs/uploads/", regBase, repoPath) + reqStart, _ := http.NewRequest("POST", startURL, nil) + loc, err := followLocation(reqStart) + if err != nil { + return fmt.Errorf("start upload: %s", err) + } + + reqPatch, _ := http.NewRequest("PATCH", loc, r) + if contentType == "" { + contentType = "application/octet-stream" + } + + reqPatch.Header.Set("Content-Type", contentType) + loc, err = followLocation(reqPatch) + if err != nil { + return fmt.Errorf("patch upload: %s", err) + } + + finalURL := loc + if strings.Contains(finalURL, "?") { + finalURL = finalURL + "&digest=" + digest + } else { + finalURL = finalURL + "?digest=" + digest + } + + reqPut, _ := http.NewRequest("PUT", finalURL, nil) + _, err = regDo(reqPut) + return err +} + +func followLocation(req *http.Request) (string, error) { + resp, err := regDo(req) + if err != nil { + return "", err + } + + loc := resp.Header.Get("Location") + resp.Body.Close() + if loc == "" { + return "", fmt.Errorf("missing Location in response to %s %s", req.Method, req.URL.String()) + } + + u, _ := url.Parse(loc) + if !u.IsAbs() { + base := &url.URL{Scheme: req.URL.Scheme, Host: req.URL.Host, Path: loc} + return base.String(), nil + } + + return loc, nil +} + +func regDo(req *http.Request) (*http.Response, error) { + // TODO: inject Authorization for registry if required + c := &http.Client{ Timeout: 0 } + resp, err := c.Do(req) + if err != nil { + return nil, err + } + + return resp, nil +} + +func (w countingWriter) Write(p []byte) (int, error) { + *w.n += int64(len(p)) + return len(p), nil +} + func DownloadDockerImageBackend(topClient *http.Client, imageName string) error { // Check environment SHUFFLE_AUTO_IMAGE_DOWNLOAD if os.Getenv("SHUFFLE_AUTO_IMAGE_DOWNLOAD") == "false" { @@ -4067,7 +4219,6 @@ func DownloadDockerImageBackend(topClient *http.Client, imageName string) error } else { //log.Printf("[DEBUG] Downloading image as POST request WITHOUT redirects due to not being cloud") } - streamImage := false // Set request timeout to 5 min (max) topClient.Timeout = time.Minute * 10 @@ -4128,6 +4279,160 @@ func DownloadDockerImageBackend(topClient *http.Client, imageName string) error } } + if len(os.Getenv("IS_KUBERNETES")) > 0 && os.Getenv("IS_KUBERNETES") == "true" { + log.Printf("[INFO] In kubernetes pushing it to private registry") + localRegistry := os.Getenv("SHUFFLE_STREAM_PRIVATE_REGISTRY") + if localRegistry == "" { + log.Printf("[ERROR] No private registry defined") + return err + } + + imgPath := strings.TrimPrefix(strings.ReplaceAll(imageName, " ", "-"), "/") + refStr := fmt.Sprintf("%s/%s", strings.TrimSuffix(localRegistry, "/"), imgPath) + + insecure := os.Getenv("SHUFFLE_STREAM_PRIVATE_REGISTRY_INSECURE") == "true" + scheme := "https" + if insecure { + scheme = "http" + } + + regHost, repoPath, tag := splitRef(refStr) + regBase := scheme + "://" + regHost + + resp, err := topClient.Do(req.Clone(req.Context())) + if err != nil { + log.Printf("[ERROR] Bucket request failed") + return fmt.Errorf("bucket request failed: %w", err) + } + + if resp.StatusCode != 200 { + defer resp.Body.Close() + log.Printf("[ERROR] Bucket request failed bad staus code") + return fmt.Errorf("bucket bad status %s", resp.Status) + } + + br := bufio.NewReader(resp.Body) + var tarR *tar.Reader + if peek, _ := br.Peek(2); len(peek) == 2 && peek[0] == 0x1f && peek[1] == 0x8b { + gr, err := gzip.NewReader(br) + if err != nil { + resp.Body.Close(); + log.Printf("[ERROR] Gzip init failed") + return fmt.Errorf("gzip init failed: %w", err) + } + + defer gr.Close() + defer resp.Body.Close() + tarR = tar.NewReader(gr) + } else { + defer resp.Body.Close() + tarR = tar.NewReader(br) + } + + type desc struct { + Digest string + Size int64 + } + layerByDiff := map[string]desc{} + var diffOrder []string + var configJSON []byte + + // walk tar once: upload layers & capture config + for { + hdr, err := tarR.Next() + if err == io.EOF { break } + if err != nil { return fmt.Errorf("tar read: %w", err) } + name := hdr.Name + + switch { + case strings.HasSuffix(name, "/layer.tar"): + // stream this layer: compute diffID (uncompressed), gzip+upload to registry, record digest & size + compDig, compSize, diffID, err := uploadLayerToRegistry(regBase, repoPath, tarR) + if err != nil { + log.Printf("[ERROR] Failed to upload layer %s", err) + return fmt.Errorf("upload layer %s: %w", name, err) + } + + layerByDiff[diffID] = desc{Digest: compDig, Size: compSize} + + case strings.HasSuffix(name, ".json") && name != "manifest.json": + if len(configJSON) == 0 { + var buf bytes.Buffer + if _, err := io.Copy(&buf, tarR); err != nil { + log.Printf("[ERROR] Failed to read config.json") + return fmt.Errorf("read config.json: %w", err) + } + + var probe struct{ + RootFS struct{ DiffIDs []string `json:"diff_ids"` } `json:"rootfs"` + } + + if json.Unmarshal(buf.Bytes(), &probe) == nil && len(probe.RootFS.DiffIDs) > 0 { + configJSON = buf.Bytes() + for _, d := range probe.RootFS.DiffIDs { + if !strings.HasPrefix(d, "sha256:") { + d = "sha256:" + d + } + diffOrder = append(diffOrder, d) + } + } + } + + default: + } + } + + if len(configJSON) == 0 || len(diffOrder) == 0 { + log.Printf("[ERROR] Failed to docker save tar config") + return fmt.Errorf("docker save tar missing config") + } + + layers := make([]map[string]any, 0, len(diffOrder)) + for _, d := range diffOrder { + desc, ok := layerByDiff[d] + if !ok { + log.Printf("[ERROR] Failed to upload layer (2)") + return fmt.Errorf("missing uploaded layer for diffID %s", d) + } + layers = append(layers, map[string]any{ + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "size": desc.Size, + "digest": desc.Digest, + }) + } + + cfgDigest := "sha256:" + hexOf(configJSON) + if err := uploadBlobOnce(regBase, repoPath, bytes.NewReader(configJSON), int64(len(configJSON)), cfgDigest, "application/octet-stream"); err != nil { + log.Printf("[ERROR] Failed to upload config (2)") + return fmt.Errorf("upload config: %w", err) + } + + manifest := map[string]any{ + "schemaVersion": 2, + "mediaType": "application/vnd.docker.distribution.manifest.v2+json", + "config": map[string]any{ + "mediaType": "application/vnd.docker.container.image.v1+json", + "size": len(configJSON), + "digest": cfgDigest, + }, + "layers": layers, + } + + manBytes, _ := json.Marshal(manifest) + putURL := fmt.Sprintf("%s/v2/%s/manifests/%s", regBase, repoPath, tag) + reqM, _ := http.NewRequest(http.MethodPut, putURL, bytes.NewReader(manBytes)) + reqM.Header.Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json") + reqM.Header.Set("Accept", "application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json") + if _, err := regDo(reqM); err != nil { + log.Printf("[ERROR] Failed to put mainfest") + return fmt.Errorf("put manifest: %w", err) + } + + log.Printf("[INFO] Pushed image to private registry as %s", refStr) + return nil + } + + newresp, err := topClient.Do(req) if err != nil { log.Printf("[ERROR] Failed download request for %s: %s", imageName, err) @@ -4150,59 +4455,7 @@ func DownloadDockerImageBackend(topClient *http.Client, imageName string) error tar, err := os.Create(newFileName) if err != nil { log.Printf("[WARNING] Failed creating file: %s", err) - streamImage = true - } - - // @yashsinghcodes: This is for kubernetes where we cannot write into filesystem - // we can make this default at somepoint but for now it is not - // well tested. - if streamImage { - log.Printf("[INFO] Streaming image directly to dockercli") - - dockercli, err := docker.NewEnvClient() - if err != nil { - log.Printf("[ERROR] Unable to create docker client (3): %s", err) - return err - } - - defer dockercli.Close() - imageLoadResponse, err := dockercli.ImageLoad(context.Background(), newresp.Body) - if err != nil { - log.Printf("[ERROR] Failed loading docker images: %s", err) - return err - } - - defer imageLoadResponse.Body.Close() - body, err := ioutil.ReadAll(imageLoadResponse.Body) - if err != nil { - log.Printf("[ERROR] Failed reading docker image: %s", err) - return err - } - - if strings.Contains(string(body), "no such file") { - return errors.New(string(body)) - } - - if strings.Contains(strings.ToLower(string(body)), "error") { - log.Printf("[ERROR] Error loading image %s: %s", imageName, string(body)) - return errors.New(string(body)) - } - - baseTag := strings.Split(imageName, ":") - if len(baseTag) > 1 { - tag := baseTag[1] - //log.Printf("[DEBUG] Creating tag copies of downloaded containers from tag %s", tag) - - // Remapping - ctx := context.Background() - dockercli.ImageTag(ctx, imageName, fmt.Sprintf("frikky/shuffle:%s", tag)) - dockercli.ImageTag(ctx, imageName, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag)) - - downloadedImages = append(downloadedImages, fmt.Sprintf("frikky/shuffle:%s", tag)) - downloadedImages = append(downloadedImages, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag)) - } - - return nil + return err } defer tar.Close() @@ -4619,3 +4872,7 @@ func handleRunDatastoreAutomation(cacheData CacheKeyData, automation DatastoreAu return nil } + +func PushTarStreamToRegistry(r io.Reader, refStr string) error { + return nil +}