diff --git a/go.mod b/go.mod index a296599..77da408 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/filecoin-project/motion go 1.20 require ( - github.com/data-preservation-programs/singularity v0.5.9 + github.com/data-preservation-programs/singularity v0.5.10 github.com/filecoin-project/go-address v1.1.0 github.com/filecoin-project/go-state-types v0.12.0 github.com/gammazero/fsutil v0.0.1 diff --git a/go.sum b/go.sum index 2b48ca3..ab46550 100644 --- a/go.sum +++ b/go.sum @@ -49,8 +49,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHH github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= -github.com/data-preservation-programs/singularity v0.5.9 h1:BuZBrNx9N2GHmCveJsoXHL0EAqdyixIdvzvFSt+oQmA= -github.com/data-preservation-programs/singularity v0.5.9/go.mod h1:p3Morz6kp3e12dRJs1rkALFCHlFjbkmM+TwnRk7sWG4= +github.com/data-preservation-programs/singularity v0.5.10 h1:uPM6xk6lWP8ddrEo3eumbvX5p0gcyY1x92taEhqrfe8= +github.com/data-preservation-programs/singularity v0.5.10/go.mod h1:p3Morz6kp3e12dRJs1rkALFCHlFjbkmM+TwnRk7sWG4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/integration/singularity/store.go b/integration/singularity/store.go index 8256015..1c242a0 100644 --- a/integration/singularity/store.go +++ b/integration/singularity/store.go @@ -473,6 +473,24 @@ func (s *Store) Get(ctx context.Context, id blob.ID) (io.ReadSeekCloser, error) return NewReader(s.singularityClient, uint64(fileID), getFileRes.Payload.Size), nil } +// Describe returns a blob descriptor describing a blob's replica(s). +// +// Rules about replica composition: +// +// - If twe different ranges are in deals with two separate providers, then +// these are considered to be in separate replicas. This limitation is imposed +// to limit complexity. +// +// - In order to have a complete replica, each file range must be in a deal +// with the same provider. +// +// - If different ranges of one file are in different deals, then all of the +// deals make up one complete replica. +// +// - If every file range has 2 deals, then there are 2 complete replicas. +// +// - If some file ranges have 2 deals and some have 1, then there is one +// complete replica and one one partial replica. func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, error) { fileID, err := s.idMap.get(id) if err != nil { @@ -511,32 +529,77 @@ func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, err return nil, err } - if len(getFileDealsRes.Payload) == 0 { + dealsForRanges := getFileDealsRes.GetPayload() + if len(dealsForRanges) == 0 { return descriptor, nil } - replicas := make([]blob.Replica, 0, len(getFileDealsRes.Payload)) - for _, deal := range getFileDealsRes.Payload { - updatedAt, err := time.Parse("2006-01-02 15:04:05-07:00", deal.LastVerifiedAt) - if err != nil { - updatedAt = time.Time{} + providerReplicas := make(map[string][]blob.Replica) + var prevDealCount, totalReplicas int + + // All ranges in a replica are handled by the same provider. + for i := range dealsForRanges { + fileRange := dealsForRanges[i].FileRange + deals := dealsForRanges[i].Deals + provRangeCounts := make(map[string]int) + if i != 0 && len(deals) != prevDealCount { + logger.Warnw("File range has different number of deals that previous file range", + "range", fileRange, "deals", len(deals), "previousDeals", prevDealCount) } - piece := blob.Piece{ - Expiration: epochutil.EpochToTime(int32(deal.EndEpoch)), - LastUpdated: updatedAt, - PieceCID: deal.PieceCid, - Status: string(deal.State), + prevDealCount = len(deals) + + for _, deal := range deals { + updatedAt, err := time.Parse("2006-01-02 15:04:05-07:00", deal.LastVerifiedAt) + if err != nil { + updatedAt = time.Time{} + } + piece := blob.Piece{ + Expiration: epochutil.EpochToTime(int32(deal.EndEpoch)), + LastUpdated: updatedAt, + PieceCID: deal.PieceCid, + Status: string(deal.State), + } + + // Deals with different providers are alrays for different replicas. + replicas := providerReplicas[deal.Provider] + // Get number of deals so far for this range for this provider. + j := provRangeCounts[deal.Provider] + if len(replicas) == j { + // Need a new replica since this file range has more deals than + // there are replicas. + // + // If there are multiple deals for the same file range, then + // these deals are for separate replicas. + providerReplicas[deal.Provider] = append(replicas, blob.Replica{ + Provider: deal.Provider, + Pieces: []blob.Piece{piece}, + }) + provRangeCounts[deal.Provider]++ + totalReplicas++ + } else { + // Deal is part of an existing replica that a previous range is part of. + replicas[j].Pieces = append(replicas[j].Pieces, piece) + } } - replicas = append(replicas, blob.Replica{ - Provider: deal.Provider, - Pieces: []blob.Piece{piece}, - }) } + + replicas := make([]blob.Replica, 0, totalReplicas) + for prov, provReplicas := range providerReplicas { + replicas = append(replicas, provReplicas...) + logger.Infof("Provider %s has %d replicas", prov, len(provReplicas)) + } + descriptor.Replicas = replicas return descriptor, nil } -// Returns true if the file has at least 1 deal for every SP. +// Returns true if the file has at least 1 deal for every SP, for every range of the file. +// +// - If no file ranges, returns true. +// - If no storage providers, returns true. +// - If no deals in any file range, returns false. +// - If any range does not have a deal with at least one SP, returns false. +// - If all ranges have a deal with at least one SP, return true. func (s *Store) hasDealForAllProviders(ctx context.Context, blobID blob.ID) (bool, error) { fileID, err := s.idMap.get(blobID) if err != nil { @@ -551,24 +614,34 @@ func (s *Store) hasDealForAllProviders(ctx context.Context, blobID blob.ID) (boo return false, fmt.Errorf("failed to get file deals: %w", err) } - // Make sure the file has at least 1 deal for every SP - for _, sp := range s.storageProviders { - foundDealForSP := false - for _, deal := range getFileDealsRes.Payload { - // Only check state for current provider - if deal.Provider != sp.String() { - continue - } + dealsForRanges := getFileDealsRes.GetPayload() - if deal.State == models.ModelDealStatePublished || deal.State == models.ModelDealStateActive { - foundDealForSP = true - break + // Make sure the file has at least 1 deal for every SP and every range of this file. + for i := range dealsForRanges { + dealsForRange := dealsForRanges[i].Deals + // Check that each SP has a deal. + for _, sp := range s.storageProviders { + // Return false if this range has no + if !storageProviderHasAnyDeal(sp, dealsForRange) { + return false, nil } } - if !foundDealForSP { - return false, nil - } } return true, nil } + +func storageProviderHasAnyDeal(sp address.Address, deals []*models.ModelDeal) bool { + // Find a deal for this SP. + for _, deal := range deals { + // Only check state for current provider + if deal.Provider != sp.String() { + continue + } + if deal.State == models.ModelDealStatePublished || deal.State == models.ModelDealStateActive { + return true + } + } + // The storage provider did not have any of the deals. + return false +} diff --git a/integration/test/integration_test.go b/integration/test/integration_test.go index a061a8f..6cda07d 100644 --- a/integration/test/integration_test.go +++ b/integration/test/integration_test.go @@ -100,7 +100,7 @@ func TestRoundTripPutStatusAndFullStorage(t *testing.T) { var decoded api.GetStatusResponse err = jsonResp.Decode(&decoded) assert.NoError(c, err) - assert.Len(t, decoded.Replicas, 2) + assert.Len(t, decoded.Replicas, 1) if len(decoded.Replicas) == 2 { assert.Len(c, decoded.Replicas[0].Pieces, 1) assert.Len(c, decoded.Replicas[1].Pieces, 1) @@ -146,9 +146,9 @@ func TestRoundTripPutStatusAndFullStorage(t *testing.T) { var decoded api.GetStatusResponse err = jsonResp.Decode(&decoded) assert.NoError(c, err) - assert.Len(c, decoded.Replicas, 2) + assert.Len(c, decoded.Replicas, 1) for _, replica := range decoded.Replicas { - assert.Len(c, replica.Pieces, 1) + assert.Len(c, replica.Pieces, 4) assert.Contains(c, []string{"published", "active"}, replica.Pieces[0].Status) } }, 2*time.Minute, 5*time.Second, "published deals") diff --git a/integration/test/motionlarity/.env b/integration/test/motionlarity/.env index 3605694..be5d1a4 100644 --- a/integration/test/motionlarity/.env +++ b/integration/test/motionlarity/.env @@ -1,4 +1,4 @@ -SINGULARITY_REF=:v0.5.9 +SINGULARITY_REF=:v0.5.10 LOTUS_TEST='true' LOTUS_API=http://lotus:1234/rpc/v1 MOTION_PRICE_PER_GIB_EPOCH=0