Skip to content
This repository was archived by the owner on May 15, 2024. It is now read-only.

New singularity filedeals api #229

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/filecoin-project/motion
go 1.20

require (
github.com/data-preservation-programs/singularity v0.5.9
github.com/data-preservation-programs/singularity v0.5.10
github.com/filecoin-project/go-address v1.1.0
github.com/filecoin-project/go-state-types v0.12.0
github.com/gammazero/fsutil v0.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHH
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0=
github.com/data-preservation-programs/singularity v0.5.9 h1:BuZBrNx9N2GHmCveJsoXHL0EAqdyixIdvzvFSt+oQmA=
github.com/data-preservation-programs/singularity v0.5.9/go.mod h1:p3Morz6kp3e12dRJs1rkALFCHlFjbkmM+TwnRk7sWG4=
github.com/data-preservation-programs/singularity v0.5.10 h1:uPM6xk6lWP8ddrEo3eumbvX5p0gcyY1x92taEhqrfe8=
github.com/data-preservation-programs/singularity v0.5.10/go.mod h1:p3Morz6kp3e12dRJs1rkALFCHlFjbkmM+TwnRk7sWG4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
133 changes: 103 additions & 30 deletions integration/singularity/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,24 @@ func (s *Store) Get(ctx context.Context, id blob.ID) (io.ReadSeekCloser, error)
return NewReader(s.singularityClient, uint64(fileID), getFileRes.Payload.Size), nil
}

// Describe returns a blob descriptor describing a blob's replica(s).
//
// Rules about replica composition:
//
// - If twe different ranges are in deals with two separate providers, then
// these are considered to be in separate replicas. This limitation is imposed
// to limit complexity.
//
// - In order to have a complete replica, each file range must be in a deal
// with the same provider.
//
// - If different ranges of one file are in different deals, then all of the
// deals make up one complete replica.
//
// - If every file range has 2 deals, then there are 2 complete replicas.
//
// - If some file ranges have 2 deals and some have 1, then there is one
// complete replica and one one partial replica.
func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, error) {
fileID, err := s.idMap.get(id)
if err != nil {
Expand Down Expand Up @@ -511,32 +529,77 @@ func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, err
return nil, err
}

if len(getFileDealsRes.Payload) == 0 {
dealsForRanges := getFileDealsRes.GetPayload()
if len(dealsForRanges) == 0 {
return descriptor, nil
}

replicas := make([]blob.Replica, 0, len(getFileDealsRes.Payload))
for _, deal := range getFileDealsRes.Payload {
updatedAt, err := time.Parse("2006-01-02 15:04:05-07:00", deal.LastVerifiedAt)
if err != nil {
updatedAt = time.Time{}
providerReplicas := make(map[string][]blob.Replica)
var prevDealCount, totalReplicas int

// All ranges in a replica are handled by the same provider.
for i := range dealsForRanges {
fileRange := dealsForRanges[i].FileRange
deals := dealsForRanges[i].Deals
provRangeCounts := make(map[string]int)
if i != 0 && len(deals) != prevDealCount {
logger.Warnw("File range has different number of deals that previous file range",
"range", fileRange, "deals", len(deals), "previousDeals", prevDealCount)
}
piece := blob.Piece{
Expiration: epochutil.EpochToTime(int32(deal.EndEpoch)),
LastUpdated: updatedAt,
PieceCID: deal.PieceCid,
Status: string(deal.State),
prevDealCount = len(deals)

for _, deal := range deals {
updatedAt, err := time.Parse("2006-01-02 15:04:05-07:00", deal.LastVerifiedAt)
if err != nil {
updatedAt = time.Time{}
}
piece := blob.Piece{
Expiration: epochutil.EpochToTime(int32(deal.EndEpoch)),
LastUpdated: updatedAt,
PieceCID: deal.PieceCid,
Status: string(deal.State),
}

// Deals with different providers are alrays for different replicas.
replicas := providerReplicas[deal.Provider]
// Get number of deals so far for this range for this provider.
j := provRangeCounts[deal.Provider]
if len(replicas) == j {
// Need a new replica since this file range has more deals than
// there are replicas.
//
// If there are multiple deals for the same file range, then
// these deals are for separate replicas.
providerReplicas[deal.Provider] = append(replicas, blob.Replica{
Provider: deal.Provider,
Pieces: []blob.Piece{piece},
})
provRangeCounts[deal.Provider]++
totalReplicas++
} else {
// Deal is part of an existing replica that a previous range is part of.
replicas[j].Pieces = append(replicas[j].Pieces, piece)
}
}
replicas = append(replicas, blob.Replica{
Provider: deal.Provider,
Pieces: []blob.Piece{piece},
})
}

replicas := make([]blob.Replica, 0, totalReplicas)
for prov, provReplicas := range providerReplicas {
replicas = append(replicas, provReplicas...)
logger.Infof("Provider %s has %d replicas", prov, len(provReplicas))
}

descriptor.Replicas = replicas
return descriptor, nil
}

// Returns true if the file has at least 1 deal for every SP.
// Returns true if the file has at least 1 deal for every SP, for every range of the file.
//
// - If no file ranges, returns true.
// - If no storage providers, returns true.
// - If no deals in any file range, returns false.
// - If any range does not have a deal with at least one SP, returns false.
// - If all ranges have a deal with at least one SP, return true.
func (s *Store) hasDealForAllProviders(ctx context.Context, blobID blob.ID) (bool, error) {
fileID, err := s.idMap.get(blobID)
if err != nil {
Expand All @@ -551,24 +614,34 @@ func (s *Store) hasDealForAllProviders(ctx context.Context, blobID blob.ID) (boo
return false, fmt.Errorf("failed to get file deals: %w", err)
}

// Make sure the file has at least 1 deal for every SP
for _, sp := range s.storageProviders {
foundDealForSP := false
for _, deal := range getFileDealsRes.Payload {
// Only check state for current provider
if deal.Provider != sp.String() {
continue
}
dealsForRanges := getFileDealsRes.GetPayload()

if deal.State == models.ModelDealStatePublished || deal.State == models.ModelDealStateActive {
foundDealForSP = true
break
// Make sure the file has at least 1 deal for every SP and every range of this file.
for i := range dealsForRanges {
dealsForRange := dealsForRanges[i].Deals
// Check that each SP has a deal.
for _, sp := range s.storageProviders {
// Return false if this range has no
if !storageProviderHasAnyDeal(sp, dealsForRange) {
return false, nil
}
}
if !foundDealForSP {
return false, nil
}
}

return true, nil
}

func storageProviderHasAnyDeal(sp address.Address, deals []*models.ModelDeal) bool {
// Find a deal for this SP.
for _, deal := range deals {
// Only check state for current provider
if deal.Provider != sp.String() {
continue
}
if deal.State == models.ModelDealStatePublished || deal.State == models.ModelDealStateActive {
return true
}
}
// The storage provider did not have any of the deals.
return false
}
6 changes: 3 additions & 3 deletions integration/test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestRoundTripPutStatusAndFullStorage(t *testing.T) {
var decoded api.GetStatusResponse
err = jsonResp.Decode(&decoded)
assert.NoError(c, err)
assert.Len(t, decoded.Replicas, 2)
assert.Len(t, decoded.Replicas, 1)
if len(decoded.Replicas) == 2 {
assert.Len(c, decoded.Replicas[0].Pieces, 1)
assert.Len(c, decoded.Replicas[1].Pieces, 1)
Expand Down Expand Up @@ -146,9 +146,9 @@ func TestRoundTripPutStatusAndFullStorage(t *testing.T) {
var decoded api.GetStatusResponse
err = jsonResp.Decode(&decoded)
assert.NoError(c, err)
assert.Len(c, decoded.Replicas, 2)
assert.Len(c, decoded.Replicas, 1)
for _, replica := range decoded.Replicas {
assert.Len(c, replica.Pieces, 1)
assert.Len(c, replica.Pieces, 4)
assert.Contains(c, []string{"published", "active"}, replica.Pieces[0].Status)
}
}, 2*time.Minute, 5*time.Second, "published deals")
Expand Down
2 changes: 1 addition & 1 deletion integration/test/motionlarity/.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
SINGULARITY_REF=:v0.5.9
SINGULARITY_REF=:v0.5.10
LOTUS_TEST='true'
LOTUS_API=http://lotus:1234/rpc/v1
MOTION_PRICE_PER_GIB_EPOCH=0
Expand Down