Skip to content

Commit

Permalink
feat: switch to CRDT bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
alanshaw committed Jan 14, 2025
1 parent adbb520 commit a3e9cbf
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 68 deletions.
16 changes: 14 additions & 2 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ func (a *App) Root(params string) (string, error) {
return "", err
}

return marshalJSON(Bytes(bk.Root().Binary()))
root, err := bk.Root(a.ctx)
if err != nil {
log.Error(err)
return "", err
}

return marshalJSON(Bytes(root.Binary()))
}

func (a *App) Put(params string) (string, error) {
Expand All @@ -149,7 +155,13 @@ func (a *App) Put(params string) (string, error) {
return "", err
}

return marshalJSON(Bytes(bk.Root().Binary()))
root, err := bk.Root(a.ctx)
if err != nil {
log.Error(err)
return "", err
}

return marshalJSON(Bytes(root.Binary()))
}

func (a *App) Del(params string) (string, error) {
Expand Down
154 changes: 96 additions & 58 deletions bucket/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,25 @@ import (
"iter"
"sync"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/storacha/go-pail"
"github.com/storacha/fam/bucket/head"
pail "github.com/storacha/go-pail"
"github.com/storacha/go-pail/block"
"github.com/storacha/go-pail/shard"
"github.com/storacha/go-pail/crdt"
)

var log = logging.Logger("datastore")

var rootKey = datastore.NewKey("root")
var headKey = datastore.NewKey("head")

type DsBlockFetcher struct {
dstore datastore.Datastore
type DsBlockstore struct {
data datastore.Datastore
}

func (f *DsBlockFetcher) Get(ctx context.Context, link ipld.Link) (block.Block, error) {
b, err := f.dstore.Get(ctx, datastore.NewKey(link.String()))
func (bs *DsBlockstore) Get(ctx context.Context, link ipld.Link) (block.Block, error) {
b, err := bs.data.Get(ctx, datastore.NewKey(link.String()))
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
return nil, fmt.Errorf("getting block: %s: %w", link, ErrNotFound)
Expand All @@ -36,47 +35,90 @@ func (f *DsBlockFetcher) Get(ctx context.Context, link ipld.Link) (block.Block,
return block.New(link, b), nil
}

func NewDsBlockFetcher(dstore datastore.Datastore) *DsBlockFetcher {
return &DsBlockFetcher{dstore}
func (bs *DsBlockstore) Put(ctx context.Context, block block.Block) error {
err := bs.data.Put(ctx, datastore.NewKey(block.Link().String()), block.Bytes())
if err != nil {
return fmt.Errorf("putting block: %w", err)
}
return nil
}

func (bs *DsBlockstore) Del(ctx context.Context, link ipld.Link) error {
err := bs.data.Delete(ctx, datastore.NewKey(link.String()))
if err != nil {
return fmt.Errorf("deleting block: %w", err)
}
return nil
}

func NewDsBlockstore(dstore datastore.Datastore) *DsBlockstore {
return &DsBlockstore{dstore}
}

type DsBucket struct {
mutex sync.RWMutex
root ipld.Link
dstore datastore.Datastore
blocks block.Fetcher
head []ipld.Link
data datastore.Datastore
blocks *DsBlockstore
}

func (bucket *DsBucket) Root() ipld.Link {
return bucket.root
func (bucket *DsBucket) Root(ctx context.Context) (ipld.Link, error) {
bucket.mutex.RLock()
defer bucket.mutex.RUnlock()

if len(bucket.head) == 0 {
b, err := pail.New()
if err != nil {
return nil, fmt.Errorf("creating pail: %w", err)
}
return b.Link(), nil
}

root, _, err := crdt.Root(ctx, bucket.blocks, bucket.head)
if err != nil {
return nil, err
}
return root, nil
}

func (bucket *DsBucket) Put(ctx context.Context, key string, value ipld.Link) error {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()

r, diff, err := pail.Put(ctx, bucket.blocks, bucket.root, key, value)
res, err := crdt.Put(ctx, bucket.blocks, bucket.head, key, value)
if err != nil {
return fmt.Errorf("putting %s: %w", key, err)
}

for _, b := range diff.Additions {
if res.Event != nil {
err = bucket.blocks.Put(ctx, res.Event)
if err != nil {
return fmt.Errorf("putting merkle clock event: %w", err)
}
}

for _, b := range res.Additions {
log.Debugf("putting put diff addition: %s", b.Link())
err = bucket.dstore.Put(ctx, datastore.NewKey(b.Link().String()), b.Bytes())
err = bucket.blocks.Put(ctx, b)
if err != nil {
return fmt.Errorf("putting diff addition: %w", err)
}
}

err = bucket.dstore.Put(context.Background(), rootKey, []byte(r.Binary()))
hbytes, err := head.Marshal(res.Head)
if err != nil {
return fmt.Errorf("marshalling head: %w", err)
}

err = bucket.data.Put(ctx, headKey, hbytes)
if err != nil {
return fmt.Errorf("updating root: %w", err)
return fmt.Errorf("updating head: %w", err)
}
bucket.root = r
bucket.head = res.Head

for _, b := range diff.Removals {
for _, b := range res.Removals {
log.Debugf("deleting put diff removal: %s", b.Link())
err = bucket.dstore.Delete(ctx, datastore.NewKey(b.Link().String()))
err = bucket.blocks.Del(ctx, b.Link())
if err != nil {
return fmt.Errorf("deleting diff removal: %w", err)
}
Expand All @@ -89,7 +131,7 @@ func (bucket *DsBucket) Get(ctx context.Context, key string) (ipld.Link, error)
bucket.mutex.RLock()
defer bucket.mutex.RUnlock()

value, err := pail.Get(ctx, bucket.blocks, bucket.root, key)
value, err := crdt.Get(ctx, bucket.blocks, bucket.head, key)
if err != nil {
return nil, fmt.Errorf("getting %s: %w", key, err)
}
Expand All @@ -101,7 +143,7 @@ func (bucket *DsBucket) Entries(ctx context.Context, opts ...EntriesOption) iter
bucket.mutex.RLock()
defer bucket.mutex.RUnlock()

for e, err := range pail.Entries(ctx, bucket.blocks, bucket.root, opts...) {
for e, err := range crdt.Entries(ctx, bucket.blocks, bucket.head, opts...) {
if err != nil {
yield(Entry[ipld.Link]{}, err)
return
Expand All @@ -117,28 +159,40 @@ func (bucket *DsBucket) Del(ctx context.Context, key string) error {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()

r, diff, err := pail.Del(ctx, bucket.blocks, bucket.root, key)
res, err := crdt.Del(ctx, bucket.blocks, bucket.head, key)
if err != nil {
return fmt.Errorf("deleting %s: %w", key, err)
}

for _, b := range diff.Additions {
if res.Event != nil {
err = bucket.blocks.Put(ctx, res.Event)
if err != nil {
return fmt.Errorf("putting merkle clock event: %w", err)
}
}

for _, b := range res.Additions {
log.Debugf("putting delete diff addition: %s", b.Link())
err = bucket.dstore.Put(ctx, datastore.NewKey(b.Link().String()), b.Bytes())
err = bucket.blocks.Put(ctx, b)
if err != nil {
return fmt.Errorf("putting diff addition: %w", err)
}
}

err = bucket.dstore.Put(context.Background(), rootKey, []byte(r.Binary()))
hbytes, err := head.Marshal(res.Head)
if err != nil {
return fmt.Errorf("updating root: %w", err)
return fmt.Errorf("marshalling head: %w", err)
}
bucket.root = r

for _, b := range diff.Removals {
err = bucket.data.Put(ctx, headKey, hbytes)
if err != nil {
return fmt.Errorf("updating head: %w", err)
}
bucket.head = res.Head

for _, b := range res.Removals {
log.Debugf("deleting delete diff removal: %s", b.Link())
err = bucket.dstore.Delete(ctx, datastore.NewKey(b.Link().String()))
err = bucket.blocks.Del(ctx, b.Link())
if err != nil {
return fmt.Errorf("deleting diff removal: %w", err)
}
Expand All @@ -148,37 +202,21 @@ func (bucket *DsBucket) Del(ctx context.Context, key string) error {
}

func NewDsBucket(dstore datastore.Datastore) (*DsBucket, error) {
var root ipld.Link
b, err := dstore.Get(context.Background(), rootKey)
var hd []ipld.Link
bs := NewDsBlockstore(dstore)
b, err := dstore.Get(context.Background(), headKey)
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
log.Warnln("bucket root not found, creating new bucket...")

rs := shard.NewRoot(nil)
rb, err := shard.MarshalBlock(rs)
if err != nil {
return nil, fmt.Errorf("marshalling pail root: %w", err)
}
err = dstore.Put(context.Background(), datastore.NewKey(rb.Link().String()), rb.Bytes())
if err != nil {
return nil, fmt.Errorf("putting pail root block: %w", err)
}
err = dstore.Put(context.Background(), rootKey, []byte(rb.Link().Binary()))
if err != nil {
return nil, fmt.Errorf("putting pail root: %w", err)
}
root = rb.Link()
log.Warnln("bucket head not found, creating new bucket...")
} else {
return nil, fmt.Errorf("getting root: %w", err)
}
} else {
c, err := cid.Cast(b)
hd, err = head.Unmarshal(b)
if err != nil {
return nil, fmt.Errorf("decoding root: %w", err)
return nil, fmt.Errorf("unmarshalling head: %w", err)
}
root = cidlink.Link{Cid: c}
}
log.Debugf("loading bucket with root: %s", root)
blocks := NewDsBlockFetcher(dstore)
return &DsBucket{root: root, dstore: dstore, blocks: blocks}, nil
log.Debugf("loading bucket with head: %s", hd)
return &DsBucket{head: hd, data: dstore, blocks: bs}, nil
}
4 changes: 2 additions & 2 deletions bucket/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type DelegationBucket struct {
values datastore.Datastore
}

func (db *DelegationBucket) Root() ipld.Link {
return db.bucket.Root()
func (db *DelegationBucket) Root(ctx context.Context) (ipld.Link, error) {
return db.bucket.Root(ctx)
}

func (db *DelegationBucket) Entries(ctx context.Context, opts ...pail.EntriesOption) iter.Seq2[Entry[delegation.Delegation], error] {
Expand Down
71 changes: 71 additions & 0 deletions bucket/head/head.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package head

import (
"bytes"
"errors"
"fmt"

"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/node/basicnode"
)

func Marshal(head []ipld.Link) ([]byte, error) {
np := basicnode.Prototype.Any
nb := np.NewBuilder()
la, err := nb.BeginList(int64(len(head)))
if err != nil {
return nil, err
}
for _, l := range head {
err := la.AssembleValue().AssignLink(l)
if err != nil {
return nil, err
}
}
err = la.Finish()
if err != nil {
return nil, err
}

n := nb.Build()
buf := bytes.NewBuffer([]byte{})
err = dagcbor.Encode(n, buf)
if err != nil {
return nil, fmt.Errorf("CBOR encoding: %w", err)
}
return buf.Bytes(), nil
}

func Unmarshal(b []byte) ([]ipld.Link, error) {
var head []ipld.Link

np := basicnode.Prototype.List
nb := np.NewBuilder()
err := dagcbor.Decode(nb, bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("decoding shard: %w", err)
}
n := nb.Build()

values := n.ListIterator()
if values == nil {
return nil, errors.New("not a list")
}
for {
if values.Done() {
break
}
_, n, err := values.Next()
if err != nil {
return nil, fmt.Errorf("iterating links: %w", err)
}
link, err := n.AsLink()
if err != nil {
return nil, fmt.Errorf("decoding link: %w", err)
}
head = append(head, link)
}

return head, nil
}
2 changes: 1 addition & 1 deletion bucket/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var (

type Bucket[T any] interface {
// Root returns the current root CID of the bucket.
Root() ipld.Link
Root(ctx context.Context) (ipld.Link, error)
Get(ctx context.Context, key string) (T, error)
Put(ctx context.Context, key string, value T) error
Del(ctx context.Context, key string) error
Expand Down
4 changes: 2 additions & 2 deletions bucket/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type KeyBucket struct {
values datastore.Datastore
}

func (kb *KeyBucket) Root() ipld.Link {
return kb.bucket.Root()
func (kb *KeyBucket) Root(ctx context.Context) (ipld.Link, error) {
return kb.bucket.Root(ctx)
}

func (kb *KeyBucket) Entries(ctx context.Context, opts ...pail.EntriesOption) iter.Seq2[Entry[principal.Signer], error] {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e
github.com/multiformats/go-multicodec v0.9.0
github.com/multiformats/go-multihash v0.2.3
github.com/storacha/go-pail v0.0.0-20250105155100-a4f5bd3202cf
github.com/storacha/go-pail v0.0.0-20250114110711-547618938b52
github.com/storacha/go-ucanto v0.2.0
github.com/wailsapp/wails/v2 v2.9.2
)
Expand Down
Loading

0 comments on commit a3e9cbf

Please sign in to comment.