diff --git a/options.go b/options.go index 0ec18d426..19bcf0b3e 100644 --- a/options.go +++ b/options.go @@ -133,7 +133,8 @@ type FetchOptions struct { Progress sideband.Progress // Tags describe how the tags will be fetched from the remote repository, // by default is TagFollowing. - Tags TagFetchMode + Tags TagFetchMode + StatusChan plumbing.StatusChan } // Validate validates the fields and sets the default values. @@ -159,7 +160,8 @@ type PushOptions struct { // object. A refspec with empty src can be used to delete a reference. RefSpecs []config.RefSpec // Auth credentials, if required, to use with the remote repository. - Auth transport.AuthMethod + Auth transport.AuthMethod + StatusChan plumbing.StatusChan } // Validate validates the fields and sets the default values. diff --git a/plumbing/format/idxfile/encoder.go b/plumbing/format/idxfile/encoder.go index 40abfb830..da5587b00 100644 --- a/plumbing/format/idxfile/encoder.go +++ b/plumbing/format/idxfile/encoder.go @@ -6,6 +6,7 @@ import ( "io" "sort" + "gopkg.in/src-d/go-git.v4/plumbing" "gopkg.in/src-d/go-git.v4/utils/binary" ) @@ -23,10 +24,10 @@ func NewEncoder(w io.Writer) *Encoder { } // Encode encodes an Idxfile to the encoder writer. -func (e *Encoder) Encode(idx *Idxfile) (int, error) { +func (e *Encoder) Encode(idx *Idxfile, statusChan plumbing.StatusChan) (int, error) { idx.Entries.Sort() - flow := []func(*Idxfile) (int, error){ + flow := []func(*Idxfile, plumbing.StatusChan) (int, error){ e.encodeHeader, e.encodeFanout, e.encodeHashes, @@ -37,7 +38,7 @@ func (e *Encoder) Encode(idx *Idxfile) (int, error) { sz := 0 for _, f := range flow { - i, err := f(idx) + i, err := f(idx, statusChan) sz += i if err != nil { @@ -48,7 +49,7 @@ func (e *Encoder) Encode(idx *Idxfile) (int, error) { return sz, nil } -func (e *Encoder) encodeHeader(idx *Idxfile) (int, error) { +func (e *Encoder) encodeHeader(idx *Idxfile, _ plumbing.StatusChan) (int, error) { c, err := e.Write(idxHeader) if err != nil { return c, err @@ -57,7 +58,7 @@ func (e *Encoder) encodeHeader(idx *Idxfile) (int, error) { return c + 4, binary.WriteUint32(e, idx.Version) } -func (e *Encoder) encodeFanout(idx *Idxfile) (int, error) { +func (e *Encoder) encodeFanout(idx *Idxfile, _ plumbing.StatusChan) (int, error) { fanout := idx.calculateFanout() for _, c := range fanout { if err := binary.WriteUint32(e, c); err != nil { @@ -68,7 +69,13 @@ func (e *Encoder) encodeFanout(idx *Idxfile) (int, error) { return 1024, nil } -func (e *Encoder) encodeHashes(idx *Idxfile) (int, error) { +func (e *Encoder) encodeHashes(idx *Idxfile, statusChan plumbing.StatusChan) (int, error) { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusIndexHash, + ObjectsTotal: len(idx.Entries), + } + statusChan.SendUpdate(update) + sz := 0 for _, ent := range idx.Entries { i, err := e.Write(ent.Hash[:]) @@ -77,12 +84,20 @@ func (e *Encoder) encodeHashes(idx *Idxfile) (int, error) { if err != nil { return sz, err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return sz, nil } -func (e *Encoder) encodeCRC32(idx *Idxfile) (int, error) { +func (e *Encoder) encodeCRC32(idx *Idxfile, statusChan plumbing.StatusChan) (int, error) { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusIndexCRC, + ObjectsTotal: len(idx.Entries), + } + statusChan.SendUpdate(update) + sz := 0 for _, ent := range idx.Entries { err := binary.Write(e, ent.CRC32) @@ -91,12 +106,20 @@ func (e *Encoder) encodeCRC32(idx *Idxfile) (int, error) { if err != nil { return sz, err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return sz, nil } -func (e *Encoder) encodeOffsets(idx *Idxfile) (int, error) { +func (e *Encoder) encodeOffsets(idx *Idxfile, statusChan plumbing.StatusChan) (int, error) { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusIndexOffset, + ObjectsTotal: len(idx.Entries), + } + statusChan.SendUpdate(update) + sz := 0 var o64bits []uint64 @@ -112,6 +135,8 @@ func (e *Encoder) encodeOffsets(idx *Idxfile) (int, error) { } sz += 4 + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } for _, o := range o64bits { @@ -125,7 +150,7 @@ func (e *Encoder) encodeOffsets(idx *Idxfile) (int, error) { return sz, nil } -func (e *Encoder) encodeChecksums(idx *Idxfile) (int, error) { +func (e *Encoder) encodeChecksums(idx *Idxfile, _ plumbing.StatusChan) (int, error) { if _, err := e.Write(idx.PackfileChecksum[:]); err != nil { return 0, err } diff --git a/plumbing/format/packfile/common.go b/plumbing/format/packfile/common.go index 728cb16a4..424a89926 100644 --- a/plumbing/format/packfile/common.go +++ b/plumbing/format/packfile/common.go @@ -3,6 +3,7 @@ package packfile import ( "io" + "gopkg.in/src-d/go-git.v4/plumbing" "gopkg.in/src-d/go-git.v4/plumbing/storer" "gopkg.in/src-d/go-git.v4/utils/ioutil" ) @@ -23,9 +24,12 @@ const ( // UpdateObjectStorage updates the given storer.EncodedObjectStorer with the contents of the // packfile. -func UpdateObjectStorage(s storer.EncodedObjectStorer, packfile io.Reader) error { +func UpdateObjectStorage( + s storer.EncodedObjectStorer, + packfile io.Reader, + statusChan plumbing.StatusChan) error { if sw, ok := s.(storer.PackfileWriter); ok { - return writePackfileToObjectStorage(sw, packfile) + return writePackfileToObjectStorage(sw, packfile, statusChan) } stream := NewScanner(packfile) @@ -34,13 +38,16 @@ func UpdateObjectStorage(s storer.EncodedObjectStorer, packfile io.Reader) error return err } - _, err = d.Decode() + _, err = d.Decode(statusChan) return err } -func writePackfileToObjectStorage(sw storer.PackfileWriter, packfile io.Reader) error { +func writePackfileToObjectStorage( + sw storer.PackfileWriter, + packfile io.Reader, + statusChan plumbing.StatusChan) error { var err error - w, err := sw.PackfileWriter() + w, err := sw.PackfileWriter(statusChan) if err != nil { return err } diff --git a/plumbing/format/packfile/decoder.go b/plumbing/format/packfile/decoder.go index e49de5168..cfc5abca1 100644 --- a/plumbing/format/packfile/decoder.go +++ b/plumbing/format/packfile/decoder.go @@ -116,26 +116,39 @@ func canResolveDeltas(s *Scanner, o storer.EncodedObjectStorer) bool { // Decode reads a packfile and stores it in the value pointed to by s. The // offsets and the CRCs are calculated by this method -func (d *Decoder) Decode() (checksum plumbing.Hash, err error) { +func (d *Decoder) Decode(statusChan plumbing.StatusChan) (checksum plumbing.Hash, err error) { defer func() { d.isDecoded = true }() if d.isDecoded { return plumbing.ZeroHash, ErrAlreadyDecoded } - if err := d.doDecode(); err != nil { + if err := d.doDecode(statusChan); err != nil { return plumbing.ZeroHash, err } return d.s.Checksum() } -func (d *Decoder) doDecode() error { +func (d *Decoder) doDecode(statusChan plumbing.StatusChan) error { + statusChan.SendUpdate(plumbing.StatusUpdate{ + Stage: plumbing.StatusCount, + }) + _, count, err := d.s.Header() if err != nil { return err } + statusChan.SendUpdate(plumbing.StatusUpdate{ + Stage: plumbing.StatusCount, + ObjectsTotal: int(count), + }) + statusChan.SendUpdate(plumbing.StatusUpdate{ + Stage: plumbing.StatusFetch, + ObjectsTotal: int(count), + }) + if !d.hasBuiltIndex { d.idx = NewIndex(int(count)) } @@ -144,25 +157,35 @@ func (d *Decoder) doDecode() error { _, isTxStorer := d.o.(storer.Transactioner) switch { case d.o == nil: - return d.decodeObjects(int(count)) + return d.decodeObjects(int(count), statusChan) case isTxStorer: - return d.decodeObjectsWithObjectStorerTx(int(count)) + return d.decodeObjectsWithObjectStorerTx(int(count), statusChan) default: - return d.decodeObjectsWithObjectStorer(int(count)) + return d.decodeObjectsWithObjectStorer(int(count), statusChan) } } -func (d *Decoder) decodeObjects(count int) error { +func (d *Decoder) decodeObjects(count int, statusChan plumbing.StatusChan) error { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusFetch, + ObjectsTotal: count, + } for i := 0; i < count; i++ { if _, err := d.DecodeObject(); err != nil { return err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return nil } -func (d *Decoder) decodeObjectsWithObjectStorer(count int) error { +func (d *Decoder) decodeObjectsWithObjectStorer(count int, statusChan plumbing.StatusChan) error { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusFetch, + ObjectsTotal: count, + } for i := 0; i < count; i++ { obj, err := d.DecodeObject() if err != nil { @@ -172,12 +195,19 @@ func (d *Decoder) decodeObjectsWithObjectStorer(count int) error { if _, err := d.o.SetEncodedObject(obj); err != nil { return err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return nil } -func (d *Decoder) decodeObjectsWithObjectStorerTx(count int) error { +func (d *Decoder) decodeObjectsWithObjectStorerTx(count int, statusChan plumbing.StatusChan) error { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusFetch, + ObjectsTotal: count, + } + d.tx = d.o.(storer.Transactioner).Begin() for i := 0; i < count; i++ { @@ -196,6 +226,8 @@ func (d *Decoder) decodeObjectsWithObjectStorerTx(count int) error { return err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return d.tx.Commit() diff --git a/plumbing/format/packfile/delta_selector.go b/plumbing/format/packfile/delta_selector.go index cc0ae0fd5..9803929a0 100644 --- a/plumbing/format/packfile/delta_selector.go +++ b/plumbing/format/packfile/delta_selector.go @@ -32,22 +32,47 @@ func newDeltaSelector(s storer.EncodedObjectStorer) *deltaSelector { // ObjectsToPack creates a list of ObjectToPack from the hashes provided, // creating deltas if it's suitable, using an specific internal logic -func (dw *deltaSelector) ObjectsToPack(hashes []plumbing.Hash) ([]*ObjectToPack, error) { - otp, err := dw.objectsToPack(hashes) +func (dw *deltaSelector) ObjectsToPack( + hashes []plumbing.Hash, + statusChan plumbing.StatusChan, +) ([]*ObjectToPack, error) { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusRead, + ObjectsTotal: len(hashes), + } + statusChan.SendUpdate(update) + + otp, err := dw.objectsToPack(hashes, statusChan, update) if err != nil { return nil, err } + update = plumbing.StatusUpdate{ + Stage: plumbing.StatusSort, + ObjectsTotal: update.ObjectsTotal, + } + statusChan.SendUpdate(update) + dw.sort(otp) - if err := dw.walk(otp); err != nil { + update = plumbing.StatusUpdate{ + Stage: plumbing.StatusDelta, + ObjectsTotal: update.ObjectsTotal, + } + statusChan.SendUpdate(update) + + if err := dw.walk(otp, statusChan, update); err != nil { return nil, err } return otp, nil } -func (dw *deltaSelector) objectsToPack(hashes []plumbing.Hash) ([]*ObjectToPack, error) { +func (dw *deltaSelector) objectsToPack( + hashes []plumbing.Hash, + statusChan plumbing.StatusChan, + update plumbing.StatusUpdate, +) ([]*ObjectToPack, error) { var objectsToPack []*ObjectToPack for _, h := range hashes { o, err := dw.encodedDeltaObject(h) @@ -61,6 +86,9 @@ func (dw *deltaSelector) objectsToPack(hashes []plumbing.Hash) ([]*ObjectToPack, } objectsToPack = append(objectsToPack, otp) + + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } if err := dw.fixAndBreakChains(objectsToPack); err != nil { @@ -171,7 +199,16 @@ func (dw *deltaSelector) sort(objectsToPack []*ObjectToPack) { sort.Sort(byTypeAndSize(objectsToPack)) } -func (dw *deltaSelector) walk(objectsToPack []*ObjectToPack) error { +func (dw *deltaSelector) walk( + objectsToPack []*ObjectToPack, + statusChan plumbing.StatusChan, + update plumbing.StatusUpdate, +) error { + sendUpdate := func() { + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) + } + for i := 0; i < len(objectsToPack); i++ { target := objectsToPack[i] @@ -179,11 +216,13 @@ func (dw *deltaSelector) walk(objectsToPack []*ObjectToPack) error { // object. This happens when a delta is set to be reused from an existing // packfile. if target.IsDelta() { + sendUpdate() continue } // We only want to create deltas from specific types. if !applyDelta[target.Type()] { + sendUpdate() continue } @@ -200,6 +239,7 @@ func (dw *deltaSelector) walk(objectsToPack []*ObjectToPack) error { return err } } + sendUpdate() } return nil diff --git a/plumbing/format/packfile/encoder.go b/plumbing/format/packfile/encoder.go index 142655904..3d183b1f5 100644 --- a/plumbing/format/packfile/encoder.go +++ b/plumbing/format/packfile/encoder.go @@ -14,10 +14,10 @@ import ( // Encoder gets the data from the storage and write it into the writer in PACK // format type Encoder struct { - selector *deltaSelector - w *offsetWriter - zw *zlib.Writer - hasher plumbing.Hasher + selector *deltaSelector + w *offsetWriter + zw *zlib.Writer + hasher plumbing.Hasher // offsets is a map of object hashes to corresponding offsets in the packfile. // It is used to determine offset of the base of a delta when a OFS_DELTA is // used. @@ -47,16 +47,28 @@ func NewEncoder(w io.Writer, s storer.EncodedObjectStorer, useRefDeltas bool) *E // Encode creates a packfile containing all the objects referenced in hashes // and writes it to the writer in the Encoder. -func (e *Encoder) Encode(hashes []plumbing.Hash) (plumbing.Hash, error) { - objects, err := e.selector.ObjectsToPack(hashes) +func (e *Encoder) Encode( + hashes []plumbing.Hash, + statusChan plumbing.StatusChan, +) (plumbing.Hash, error) { + objects, err := e.selector.ObjectsToPack(hashes, statusChan) if err != nil { return plumbing.ZeroHash, err } - return e.encode(objects) + return e.encode(objects, statusChan) } -func (e *Encoder) encode(objects []*ObjectToPack) (plumbing.Hash, error) { +func (e *Encoder) encode( + objects []*ObjectToPack, + statusChan plumbing.StatusChan, +) (plumbing.Hash, error) { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusSend, + ObjectsTotal: len(objects), + } + statusChan.SendUpdate(update) + if err := e.head(len(objects)); err != nil { return plumbing.ZeroHash, err } @@ -65,6 +77,8 @@ func (e *Encoder) encode(objects []*ObjectToPack) (plumbing.Hash, error) { if err := e.entry(o); err != nil { return plumbing.ZeroHash, err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return e.footer() @@ -137,7 +151,7 @@ func (e *Encoder) writeOfsDeltaHeader(deltaOffset int64, base plumbing.Hash) err // for OFS_DELTA, offset of the base is interpreted as negative offset // relative to the type-byte of the header of the ofs-delta entry. - relativeOffset := deltaOffset-baseOffset + relativeOffset := deltaOffset - baseOffset if relativeOffset <= 0 { return fmt.Errorf("bad offset for OFS_DELTA entry: %d", relativeOffset) } diff --git a/plumbing/revlist/revlist.go b/plumbing/revlist/revlist.go index 5b2ff994e..7720bc31a 100644 --- a/plumbing/revlist/revlist.go +++ b/plumbing/revlist/revlist.go @@ -20,34 +20,41 @@ func Objects( s storer.EncodedObjectStorer, objs, ignore []plumbing.Hash, + statusChan plumbing.StatusChan, ) ([]plumbing.Hash, error) { - ignore, err := objects(s, ignore, nil, true) + ignore, err := objects(s, ignore, nil, nil, true) if err != nil { return nil, err } - return objects(s, objs, ignore, false) + return objects(s, objs, ignore, statusChan, false) } func objects( s storer.EncodedObjectStorer, objects, ignore []plumbing.Hash, + statusChan plumbing.StatusChan, allowMissingObjects bool, ) ([]plumbing.Hash, error) { seen := hashListToSet(ignore) result := make(map[plumbing.Hash]bool) + update := plumbing.StatusUpdate{Stage: plumbing.StatusCount} + statusChan.SendUpdate(update) + walkerFunc := func(h plumbing.Hash) { if !seen[h] { result[h] = true seen[h] = true + update.ObjectsTotal++ + statusChan.SendUpdateIfPossible(update) } } for _, h := range objects { - if err := processObject(s, h, seen, ignore, walkerFunc); err != nil { + if err := processObject(s, h, seen, ignore, walkerFunc, statusChan); err != nil { if allowMissingObjects && err == plumbing.ErrObjectNotFound { continue } @@ -56,7 +63,10 @@ func objects( } } - return hashSetToList(result), nil + hashes := hashSetToList(result) + update.ObjectsTotal = len(hashes) + statusChan.SendUpdate(update) + return hashes, nil } // processObject obtains the object using the hash an process it depending of its type @@ -66,6 +76,7 @@ func processObject( seen map[plumbing.Hash]bool, ignore []plumbing.Hash, walkerFunc func(h plumbing.Hash), + statusChan plumbing.StatusChan, ) error { if seen[h] { return nil @@ -88,7 +99,7 @@ func processObject( return iterateCommitTrees(seen, do, walkerFunc) case *object.Tag: walkerFunc(do.Hash) - return processObject(s, do.Target, seen, ignore, walkerFunc) + return processObject(s, do.Target, seen, ignore, walkerFunc, statusChan) case *object.Blob: walkerFunc(do.Hash) default: diff --git a/plumbing/status.go b/plumbing/status.go new file mode 100644 index 000000000..bed12ad99 --- /dev/null +++ b/plumbing/status.go @@ -0,0 +1,53 @@ +package plumbing + +type StatusStage int + +const ( + StatusCount StatusStage = iota + StatusRead + StatusSort + StatusDelta + StatusSend + StatusFetch + StatusIndexHash + StatusIndexCRC + StatusIndexOffset + StatusDone + + StatusUnknown StatusStage = -1 +) + +type StatusUpdate struct { + Stage StatusStage + + ObjectsTotal int + ObjectsDone int + + // TODO: BytesTotal and BytesDone? +} + +type StatusChan chan<- StatusUpdate + +func (sc StatusChan) SendUpdate(update StatusUpdate) { + if sc == nil { + return + } + sc <- update +} + +func (sc StatusChan) SendUpdateIfPossible(update StatusUpdate) { + if sc == nil { + return + } + if update.ObjectsDone == update.ObjectsTotal { + // We should always send the final status update, before the + // next stage change. + sc <- update + return + } + + select { + case sc <- update: + default: + } +} diff --git a/plumbing/storer/object.go b/plumbing/storer/object.go index 3f41468a7..d5aee0283 100644 --- a/plumbing/storer/object.go +++ b/plumbing/storer/object.go @@ -60,7 +60,7 @@ type PackfileWriter interface { // // If the Storer not implements PackfileWriter the objects should be written // using the Set method. - PackfileWriter() (io.WriteCloser, error) + PackfileWriter(plumbing.StatusChan) (io.WriteCloser, error) } // EncodedObjectIter is a generic closable interface for iterating over objects. diff --git a/plumbing/transport/server/server.go b/plumbing/transport/server/server.go index be36de5cf..552ab11d6 100644 --- a/plumbing/transport/server/server.go +++ b/plumbing/transport/server/server.go @@ -165,7 +165,7 @@ func (s *upSession) UploadPack(ctx context.Context, req *packp.UploadPackRequest pr, pw := io.Pipe() e := packfile.NewEncoder(pw, s.storer, false) go func() { - _, err := e.Encode(objs) + _, err := e.Encode(objs, nil) pw.CloseWithError(err) }() @@ -175,12 +175,12 @@ func (s *upSession) UploadPack(ctx context.Context, req *packp.UploadPackRequest } func (s *upSession) objectsToUpload(req *packp.UploadPackRequest) ([]plumbing.Hash, error) { - haves, err := revlist.Objects(s.storer, req.Haves, nil) + haves, err := revlist.Objects(s.storer, req.Haves, nil, nil) if err != nil { return nil, err } - return revlist.Objects(s.storer, req.Wants, haves) + return revlist.Objects(s.storer, req.Wants, haves, nil) } func (*upSession) setSupportedCapabilities(c *capability.List) error { @@ -313,7 +313,7 @@ func (s *rpSession) writePackfile(r io.ReadCloser) error { return nil } - if err := packfile.UpdateObjectStorage(s.storer, r); err != nil { + if err := packfile.UpdateObjectStorage(s.storer, r, nil); err != nil { _ = r.Close() return err } diff --git a/remote.go b/remote.go index c07c5af03..f42fdadca 100644 --- a/remote.go +++ b/remote.go @@ -140,13 +140,13 @@ func (r *Remote) PushContext(ctx context.Context, o *PushOptions) error { var hashesToPush []plumbing.Hash // Avoid the expensive revlist operation if we're only doing deletes. if !allDelete { - hashesToPush, err = revlist.Objects(r.s, objects, haves) + hashesToPush, err = revlist.Objects(r.s, objects, haves, o.StatusChan) if err != nil { return err } } - rs, err := pushHashes(ctx, s, r.s, req, hashesToPush) + rs, err := pushHashes(ctx, s, r.s, req, hashesToPush, o.StatusChan) if err != nil { return err } @@ -155,6 +155,13 @@ func (r *Remote) PushContext(ctx context.Context, o *PushOptions) error { return err } + defer func() { + o.StatusChan.SendUpdate(plumbing.StatusUpdate{ + Stage: plumbing.StatusDone, + ObjectsTotal: len(hashesToPush), + }) + }() + return r.updateRemoteReferenceStorage(req, rs) } @@ -267,6 +274,12 @@ func (r *Remote) fetch(ctx context.Context, o *FetchOptions) (storer.ReferenceSt return nil, err } + defer func() { + o.StatusChan.SendUpdate(plumbing.StatusUpdate{ + Stage: plumbing.StatusDone, + }) + }() + if !updated { return remoteRefs, NoErrAlreadyUpToDate } @@ -322,6 +335,7 @@ func (r *Remote) fetchPack(ctx context.Context, o *FetchOptions, s transport.Upl if err = packfile.UpdateObjectStorage(r.s, buildSidebandIfSupported(req.Capabilities, reader, o.Progress), + o.StatusChan, ); err != nil { return err } @@ -498,7 +512,9 @@ func calculateRefs(spec []config.RefSpec, }) } -func getWants(localStorer storage.Storer, refs memory.ReferenceStorage) ([]plumbing.Hash, error) { +func getWants( + localStorer storage.Storer, + refs memory.ReferenceStorage) ([]plumbing.Hash, error) { wants := map[plumbing.Hash]bool{} for _, ref := range refs { hash := ref.Hash() @@ -742,6 +758,7 @@ func pushHashes( sto storer.EncodedObjectStorer, req *packp.ReferenceUpdateRequest, hs []plumbing.Hash, + statusChan plumbing.StatusChan, ) (*packp.ReportStatus, error) { rd, wr := io.Pipe() @@ -749,7 +766,7 @@ func pushHashes( done := make(chan error) go func() { e := packfile.NewEncoder(wr, sto, false) - if _, err := e.Encode(hs); err != nil { + if _, err := e.Encode(hs, statusChan); err != nil { done <- wr.CloseWithError(err) return } diff --git a/storage/filesystem/internal/dotgit/dotgit.go b/storage/filesystem/internal/dotgit/dotgit.go index 2840bc74d..56c392879 100644 --- a/storage/filesystem/internal/dotgit/dotgit.go +++ b/storage/filesystem/internal/dotgit/dotgit.go @@ -137,8 +137,8 @@ func (d *DotGit) Shallow() (billy.File, error) { // NewObjectPack return a writer for a new packfile, it saves the packfile to // disk and also generates and save the index for the given packfile. -func (d *DotGit) NewObjectPack() (*PackWriter, error) { - return newPackWrite(d.fs) +func (d *DotGit) NewObjectPack(statusChan plumbing.StatusChan) (*PackWriter, error) { + return newPackWrite(d.fs, statusChan) } // ObjectPacks returns the list of availables packfiles diff --git a/storage/filesystem/internal/dotgit/writers.go b/storage/filesystem/internal/dotgit/writers.go index 46d361982..2349be042 100644 --- a/storage/filesystem/internal/dotgit/writers.go +++ b/storage/filesystem/internal/dotgit/writers.go @@ -22,15 +22,16 @@ import ( type PackWriter struct { Notify func(plumbing.Hash, *packfile.Index) - fs billy.Filesystem - fr, fw billy.File - synced *syncedReader - checksum plumbing.Hash - index *packfile.Index - result chan error + fs billy.Filesystem + fr, fw billy.File + synced *syncedReader + checksum plumbing.Hash + index *packfile.Index + result chan error + statusChan plumbing.StatusChan } -func newPackWrite(fs billy.Filesystem) (*PackWriter, error) { +func newPackWrite(fs billy.Filesystem, statusChan plumbing.StatusChan) (*PackWriter, error) { fw, err := fs.TempFile(fs.Join(objectsPath, packPath), "tmp_pack_") if err != nil { return nil, err @@ -42,11 +43,12 @@ func newPackWrite(fs billy.Filesystem) (*PackWriter, error) { } writer := &PackWriter{ - fs: fs, - fw: fw, - fr: fr, - synced: newSyncedReader(fw, fr), - result: make(chan error), + fs: fs, + fw: fw, + fr: fr, + synced: newSyncedReader(fw, fr), + result: make(chan error), + statusChan: statusChan, } go writer.buildIndex() @@ -61,7 +63,7 @@ func (w *PackWriter) buildIndex() { return } - checksum, err := d.Decode() + checksum, err := d.Decode(w.statusChan) if err != nil { w.result <- err return @@ -149,7 +151,7 @@ func (w *PackWriter) encodeIdx(writer io.Writer) error { idx.PackfileChecksum = w.checksum idx.Version = idxfile.VersionSupported e := idxfile.NewEncoder(writer) - _, err := e.Encode(idx) + _, err := e.Encode(idx, w.statusChan) return err } diff --git a/storage/filesystem/object.go b/storage/filesystem/object.go index 5073a3827..c6a0da989 100644 --- a/storage/filesystem/object.go +++ b/storage/filesystem/object.go @@ -77,12 +77,12 @@ func (s *ObjectStorage) NewEncodedObject() plumbing.EncodedObject { return &plumbing.MemoryObject{} } -func (s *ObjectStorage) PackfileWriter() (io.WriteCloser, error) { +func (s *ObjectStorage) PackfileWriter(statusChan plumbing.StatusChan) (io.WriteCloser, error) { if err := s.requireIndex(); err != nil { return nil, err } - w, err := s.dir.NewObjectPack() + w, err := s.dir.NewObjectPack(statusChan) if err != nil { return nil, err }