From a3e9cbfe506b3251b4c316505aa9dc4790a239b9 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 14 Jan 2025 11:20:27 +0000 Subject: [PATCH] feat: switch to CRDT bucket --- app.go | 16 ++++- bucket/datastore.go | 154 +++++++++++++++++++++++++++---------------- bucket/delegation.go | 4 +- bucket/head/head.go | 71 ++++++++++++++++++++ bucket/interface.go | 2 +- bucket/key.go | 4 +- go.mod | 2 +- go.sum | 4 +- 8 files changed, 189 insertions(+), 68 deletions(-) create mode 100644 bucket/head/head.go diff --git a/app.go b/app.go index 83b11fa..e9855c3 100644 --- a/app.go +++ b/app.go @@ -127,7 +127,13 @@ func (a *App) Root(params string) (string, error) { return "", err } - return marshalJSON(Bytes(bk.Root().Binary())) + root, err := bk.Root(a.ctx) + if err != nil { + log.Error(err) + return "", err + } + + return marshalJSON(Bytes(root.Binary())) } func (a *App) Put(params string) (string, error) { @@ -149,7 +155,13 @@ func (a *App) Put(params string) (string, error) { return "", err } - return marshalJSON(Bytes(bk.Root().Binary())) + root, err := bk.Root(a.ctx) + if err != nil { + log.Error(err) + return "", err + } + + return marshalJSON(Bytes(root.Binary())) } func (a *App) Del(params string) (string, error) { diff --git a/bucket/datastore.go b/bucket/datastore.go index 4be3f7e..e715e29 100644 --- a/bucket/datastore.go +++ b/bucket/datastore.go @@ -7,26 +7,25 @@ import ( "iter" "sync" - "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/storacha/go-pail" + "github.com/storacha/fam/bucket/head" + pail "github.com/storacha/go-pail" "github.com/storacha/go-pail/block" - "github.com/storacha/go-pail/shard" + "github.com/storacha/go-pail/crdt" ) var log = logging.Logger("datastore") -var rootKey = datastore.NewKey("root") +var headKey = datastore.NewKey("head") -type DsBlockFetcher struct { - dstore datastore.Datastore +type DsBlockstore struct { + data datastore.Datastore } -func (f *DsBlockFetcher) Get(ctx context.Context, link ipld.Link) (block.Block, error) { - b, err := f.dstore.Get(ctx, datastore.NewKey(link.String())) +func (bs *DsBlockstore) Get(ctx context.Context, link ipld.Link) (block.Block, error) { + b, err := bs.data.Get(ctx, datastore.NewKey(link.String())) if err != nil { if errors.Is(err, datastore.ErrNotFound) { return nil, fmt.Errorf("getting block: %s: %w", link, ErrNotFound) @@ -36,47 +35,90 @@ func (f *DsBlockFetcher) Get(ctx context.Context, link ipld.Link) (block.Block, return block.New(link, b), nil } -func NewDsBlockFetcher(dstore datastore.Datastore) *DsBlockFetcher { - return &DsBlockFetcher{dstore} +func (bs *DsBlockstore) Put(ctx context.Context, block block.Block) error { + err := bs.data.Put(ctx, datastore.NewKey(block.Link().String()), block.Bytes()) + if err != nil { + return fmt.Errorf("putting block: %w", err) + } + return nil +} + +func (bs *DsBlockstore) Del(ctx context.Context, link ipld.Link) error { + err := bs.data.Delete(ctx, datastore.NewKey(link.String())) + if err != nil { + return fmt.Errorf("deleting block: %w", err) + } + return nil +} + +func NewDsBlockstore(dstore datastore.Datastore) *DsBlockstore { + return &DsBlockstore{dstore} } type DsBucket struct { mutex sync.RWMutex - root ipld.Link - dstore datastore.Datastore - blocks block.Fetcher + head []ipld.Link + data datastore.Datastore + blocks *DsBlockstore } -func (bucket *DsBucket) Root() ipld.Link { - return bucket.root +func (bucket *DsBucket) Root(ctx context.Context) (ipld.Link, error) { + bucket.mutex.RLock() + defer bucket.mutex.RUnlock() + + if len(bucket.head) == 0 { + b, err := pail.New() + if err != nil { + return nil, fmt.Errorf("creating pail: %w", err) + } + return b.Link(), nil + } + + root, _, err := crdt.Root(ctx, bucket.blocks, bucket.head) + if err != nil { + return nil, err + } + return root, nil } func (bucket *DsBucket) Put(ctx context.Context, key string, value ipld.Link) error { bucket.mutex.Lock() defer bucket.mutex.Unlock() - r, diff, err := pail.Put(ctx, bucket.blocks, bucket.root, key, value) + res, err := crdt.Put(ctx, bucket.blocks, bucket.head, key, value) if err != nil { return fmt.Errorf("putting %s: %w", key, err) } - for _, b := range diff.Additions { + if res.Event != nil { + err = bucket.blocks.Put(ctx, res.Event) + if err != nil { + return fmt.Errorf("putting merkle clock event: %w", err) + } + } + + for _, b := range res.Additions { log.Debugf("putting put diff addition: %s", b.Link()) - err = bucket.dstore.Put(ctx, datastore.NewKey(b.Link().String()), b.Bytes()) + err = bucket.blocks.Put(ctx, b) if err != nil { return fmt.Errorf("putting diff addition: %w", err) } } - err = bucket.dstore.Put(context.Background(), rootKey, []byte(r.Binary())) + hbytes, err := head.Marshal(res.Head) + if err != nil { + return fmt.Errorf("marshalling head: %w", err) + } + + err = bucket.data.Put(ctx, headKey, hbytes) if err != nil { - return fmt.Errorf("updating root: %w", err) + return fmt.Errorf("updating head: %w", err) } - bucket.root = r + bucket.head = res.Head - for _, b := range diff.Removals { + for _, b := range res.Removals { log.Debugf("deleting put diff removal: %s", b.Link()) - err = bucket.dstore.Delete(ctx, datastore.NewKey(b.Link().String())) + err = bucket.blocks.Del(ctx, b.Link()) if err != nil { return fmt.Errorf("deleting diff removal: %w", err) } @@ -89,7 +131,7 @@ func (bucket *DsBucket) Get(ctx context.Context, key string) (ipld.Link, error) bucket.mutex.RLock() defer bucket.mutex.RUnlock() - value, err := pail.Get(ctx, bucket.blocks, bucket.root, key) + value, err := crdt.Get(ctx, bucket.blocks, bucket.head, key) if err != nil { return nil, fmt.Errorf("getting %s: %w", key, err) } @@ -101,7 +143,7 @@ func (bucket *DsBucket) Entries(ctx context.Context, opts ...EntriesOption) iter bucket.mutex.RLock() defer bucket.mutex.RUnlock() - for e, err := range pail.Entries(ctx, bucket.blocks, bucket.root, opts...) { + for e, err := range crdt.Entries(ctx, bucket.blocks, bucket.head, opts...) { if err != nil { yield(Entry[ipld.Link]{}, err) return @@ -117,28 +159,40 @@ func (bucket *DsBucket) Del(ctx context.Context, key string) error { bucket.mutex.Lock() defer bucket.mutex.Unlock() - r, diff, err := pail.Del(ctx, bucket.blocks, bucket.root, key) + res, err := crdt.Del(ctx, bucket.blocks, bucket.head, key) if err != nil { return fmt.Errorf("deleting %s: %w", key, err) } - for _, b := range diff.Additions { + if res.Event != nil { + err = bucket.blocks.Put(ctx, res.Event) + if err != nil { + return fmt.Errorf("putting merkle clock event: %w", err) + } + } + + for _, b := range res.Additions { log.Debugf("putting delete diff addition: %s", b.Link()) - err = bucket.dstore.Put(ctx, datastore.NewKey(b.Link().String()), b.Bytes()) + err = bucket.blocks.Put(ctx, b) if err != nil { return fmt.Errorf("putting diff addition: %w", err) } } - err = bucket.dstore.Put(context.Background(), rootKey, []byte(r.Binary())) + hbytes, err := head.Marshal(res.Head) if err != nil { - return fmt.Errorf("updating root: %w", err) + return fmt.Errorf("marshalling head: %w", err) } - bucket.root = r - for _, b := range diff.Removals { + err = bucket.data.Put(ctx, headKey, hbytes) + if err != nil { + return fmt.Errorf("updating head: %w", err) + } + bucket.head = res.Head + + for _, b := range res.Removals { log.Debugf("deleting delete diff removal: %s", b.Link()) - err = bucket.dstore.Delete(ctx, datastore.NewKey(b.Link().String())) + err = bucket.blocks.Del(ctx, b.Link()) if err != nil { return fmt.Errorf("deleting diff removal: %w", err) } @@ -148,37 +202,21 @@ func (bucket *DsBucket) Del(ctx context.Context, key string) error { } func NewDsBucket(dstore datastore.Datastore) (*DsBucket, error) { - var root ipld.Link - b, err := dstore.Get(context.Background(), rootKey) + var hd []ipld.Link + bs := NewDsBlockstore(dstore) + b, err := dstore.Get(context.Background(), headKey) if err != nil { if errors.Is(err, datastore.ErrNotFound) { - log.Warnln("bucket root not found, creating new bucket...") - - rs := shard.NewRoot(nil) - rb, err := shard.MarshalBlock(rs) - if err != nil { - return nil, fmt.Errorf("marshalling pail root: %w", err) - } - err = dstore.Put(context.Background(), datastore.NewKey(rb.Link().String()), rb.Bytes()) - if err != nil { - return nil, fmt.Errorf("putting pail root block: %w", err) - } - err = dstore.Put(context.Background(), rootKey, []byte(rb.Link().Binary())) - if err != nil { - return nil, fmt.Errorf("putting pail root: %w", err) - } - root = rb.Link() + log.Warnln("bucket head not found, creating new bucket...") } else { return nil, fmt.Errorf("getting root: %w", err) } } else { - c, err := cid.Cast(b) + hd, err = head.Unmarshal(b) if err != nil { - return nil, fmt.Errorf("decoding root: %w", err) + return nil, fmt.Errorf("unmarshalling head: %w", err) } - root = cidlink.Link{Cid: c} } - log.Debugf("loading bucket with root: %s", root) - blocks := NewDsBlockFetcher(dstore) - return &DsBucket{root: root, dstore: dstore, blocks: blocks}, nil + log.Debugf("loading bucket with head: %s", hd) + return &DsBucket{head: hd, data: dstore, blocks: bs}, nil } diff --git a/bucket/delegation.go b/bucket/delegation.go index 4bf9d7f..3c07147 100644 --- a/bucket/delegation.go +++ b/bucket/delegation.go @@ -18,8 +18,8 @@ type DelegationBucket struct { values datastore.Datastore } -func (db *DelegationBucket) Root() ipld.Link { - return db.bucket.Root() +func (db *DelegationBucket) Root(ctx context.Context) (ipld.Link, error) { + return db.bucket.Root(ctx) } func (db *DelegationBucket) Entries(ctx context.Context, opts ...pail.EntriesOption) iter.Seq2[Entry[delegation.Delegation], error] { diff --git a/bucket/head/head.go b/bucket/head/head.go new file mode 100644 index 0000000..dcd07cd --- /dev/null +++ b/bucket/head/head.go @@ -0,0 +1,71 @@ +package head + +import ( + "bytes" + "errors" + "fmt" + + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/node/basicnode" +) + +func Marshal(head []ipld.Link) ([]byte, error) { + np := basicnode.Prototype.Any + nb := np.NewBuilder() + la, err := nb.BeginList(int64(len(head))) + if err != nil { + return nil, err + } + for _, l := range head { + err := la.AssembleValue().AssignLink(l) + if err != nil { + return nil, err + } + } + err = la.Finish() + if err != nil { + return nil, err + } + + n := nb.Build() + buf := bytes.NewBuffer([]byte{}) + err = dagcbor.Encode(n, buf) + if err != nil { + return nil, fmt.Errorf("CBOR encoding: %w", err) + } + return buf.Bytes(), nil +} + +func Unmarshal(b []byte) ([]ipld.Link, error) { + var head []ipld.Link + + np := basicnode.Prototype.List + nb := np.NewBuilder() + err := dagcbor.Decode(nb, bytes.NewReader(b)) + if err != nil { + return nil, fmt.Errorf("decoding shard: %w", err) + } + n := nb.Build() + + values := n.ListIterator() + if values == nil { + return nil, errors.New("not a list") + } + for { + if values.Done() { + break + } + _, n, err := values.Next() + if err != nil { + return nil, fmt.Errorf("iterating links: %w", err) + } + link, err := n.AsLink() + if err != nil { + return nil, fmt.Errorf("decoding link: %w", err) + } + head = append(head, link) + } + + return head, nil +} diff --git a/bucket/interface.go b/bucket/interface.go index 1898a96..ae4e1b3 100644 --- a/bucket/interface.go +++ b/bucket/interface.go @@ -26,7 +26,7 @@ var ( type Bucket[T any] interface { // Root returns the current root CID of the bucket. - Root() ipld.Link + Root(ctx context.Context) (ipld.Link, error) Get(ctx context.Context, key string) (T, error) Put(ctx context.Context, key string, value T) error Del(ctx context.Context, key string) error diff --git a/bucket/key.go b/bucket/key.go index 6b7454b..76c5c1c 100644 --- a/bucket/key.go +++ b/bucket/key.go @@ -22,8 +22,8 @@ type KeyBucket struct { values datastore.Datastore } -func (kb *KeyBucket) Root() ipld.Link { - return kb.bucket.Root() +func (kb *KeyBucket) Root(ctx context.Context) (ipld.Link, error) { + return kb.bucket.Root(ctx) } func (kb *KeyBucket) Entries(ctx context.Context, opts ...pail.EntriesOption) iter.Seq2[Entry[principal.Signer], error] { diff --git a/go.mod b/go.mod index 67db2c4..1e2004a 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e github.com/multiformats/go-multicodec v0.9.0 github.com/multiformats/go-multihash v0.2.3 - github.com/storacha/go-pail v0.0.0-20250105155100-a4f5bd3202cf + github.com/storacha/go-pail v0.0.0-20250114110711-547618938b52 github.com/storacha/go-ucanto v0.2.0 github.com/wailsapp/wails/v2 v2.9.2 ) diff --git a/go.sum b/go.sum index b0ab80b..85fbf2c 100644 --- a/go.sum +++ b/go.sum @@ -433,8 +433,8 @@ github.com/spf13/cobra v1.2.1/go.mod h1:ExllRjgxM/piMAM+3tAZvg8fsklGAf3tPfi+i8t6 github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= -github.com/storacha/go-pail v0.0.0-20250105155100-a4f5bd3202cf h1:X9IvfGcr4/F2FqBnUt+uWyi/805yZTgIQ6RRlrhisbk= -github.com/storacha/go-pail v0.0.0-20250105155100-a4f5bd3202cf/go.mod h1:fnCw+ADcFzZlnWdJBzMicE2kxg1w8sH+IzQ9Nw3bwyI= +github.com/storacha/go-pail v0.0.0-20250114110711-547618938b52 h1:DhaMagLyVirLvOLY5kJ4eXgYkaIpdjuyJF4gzl3zvXM= +github.com/storacha/go-pail v0.0.0-20250114110711-547618938b52/go.mod h1:fnCw+ADcFzZlnWdJBzMicE2kxg1w8sH+IzQ9Nw3bwyI= github.com/storacha/go-ucanto v0.2.0 h1:P5s+B2mCKdJX0yC89f15czfqHy7lZGWdjAwmq9/Iip0= github.com/storacha/go-ucanto v0.2.0/go.mod h1:7ba9jAgqmwlF/JfyFUQcGV07uiYNlmJNu8qH4hHtrJk= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=