Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CRDT implementation #1

Merged
merged 10 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,50 @@ go get github.com/storacha/go-pail
## Usage

```go
// todo
package main

import (
"github.com/ipfs/go-cid"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/storacha/go-pail"
"github.com/storacha/go-pail/block"
"github.com/storacha/go-pail/shard"
)

func main() {
ctx := context.Background()

rootBlock, _ := shard.MarshalBlock(shard.NewRoot(nil))

blocks := block.NewMapBlockstore()
_ = blocks.Put(ctx, rootBlock)

fmt.Printf("Root: %s\n", rootBlock.Link())

key := "room-guardian.jpg"
value := cidlink.Link{Cid: cid.MustParse("bafkreigh2akiscaildcqabsyg3dfr6chu3fgpregiymsck7e7aqa4s52zy")}

fmt.Printf("Putting %s: %s", key, value)

root, diff, _ := pail.Put(ctx, blocks, rootBlock.Link(), key, value)

fmt.Printf("Root: %s\n", root)
fmt.Println("Added blocks:")
for _, b := range diff.Additions {
fmt.Printf("+ %s\n", b.Link())
_ = blocks.Put(ctx, b)
}
fmt.Println("Removed blocks:")
for _, b := range diff.Removals {
fmt.Printf("- %s\n", b.Link())
_ = blocks.Del(ctx, b)
}

fmt.Println("Entries:")
for entry, _ := range pail.Entries(ctx, blocks, root) {
fmt.Printf("%s: %s\n", entry.Key, entry.Value)
}
}
```

## Contributing
Expand Down
2 changes: 1 addition & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/ipld/go-ipld-prime"
)

var ErrNotFound = errors.New("not found")
var ErrNotFound = errors.New("block not found")

type block struct {
link ipld.Link
Expand Down
19 changes: 14 additions & 5 deletions block/mapblockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,43 @@ package block

import (
"context"
"sync"

"github.com/ipld/go-ipld-prime"
)

// MapBlockstore is a blockstore that is backed by an in memory map.
type MapBlockstore struct {
data map[string]Block
data map[ipld.Link]Block
mutex sync.RWMutex
}

func (bs *MapBlockstore) Get(ctx context.Context, link ipld.Link) (Block, error) {
b, ok := bs.data[link.String()]
bs.mutex.RLock()
defer bs.mutex.RUnlock()

b, ok := bs.data[link]
if !ok {
return nil, ErrNotFound
}
return b, nil
}

func (bs *MapBlockstore) Put(ctx context.Context, b Block) error {
bs.data[b.Link().String()] = b
bs.mutex.Lock()
bs.data[b.Link()] = b
bs.mutex.Unlock()
return nil
}

func (bs *MapBlockstore) Del(ctx context.Context, link ipld.Link) error {
delete(bs.data, link.String())
bs.mutex.Lock()
delete(bs.data, link)
bs.mutex.Unlock()
return nil
}

// NewMapBlockstore creates a new blockstore that is backed by an in memory map.
func NewMapBlockstore() *MapBlockstore {
return &MapBlockstore{map[string]Block{}}
return &MapBlockstore{map[ipld.Link]Block{}, sync.RWMutex{}}
}
5 changes: 3 additions & 2 deletions clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (
"github.com/ipld/go-ipld-prime"
"github.com/storacha/go-pail/block"
"github.com/storacha/go-pail/clock/event"
"github.com/storacha/go-pail/ipld/node"
)

// Advance the clock by adding an event.
func Advance[T any](ctx context.Context, blocks block.Fetcher, dataBinder event.NodeBinder[T], head []ipld.Link, evt ipld.Link) ([]ipld.Link, error) {
func Advance[T any](ctx context.Context, blocks block.Fetcher, dataBinder node.Binder[T], head []ipld.Link, evt ipld.Link) ([]ipld.Link, error) {
events := event.NewFetcher(blocks, dataBinder)
headmap := map[ipld.Link]struct{}{}
for _, h := range head {
Expand Down Expand Up @@ -117,7 +118,7 @@ func contains[T any](ctx context.Context, events *event.Fetcher[T], a, b ipld.Li
return false, nil
}

func Visualize[T any](ctx context.Context, blocks block.Fetcher, dataBinder event.NodeBinder[T], head []ipld.Link) iter.Seq2[string, error] {
func Visualize[T any](ctx context.Context, blocks block.Fetcher, dataBinder node.Binder[T], head []ipld.Link) iter.Seq2[string, error] {
events := event.NewFetcher(blocks, dataBinder)
return func(yield func(string, error) bool) {
if !yield("digraph clock {", nil) {
Expand Down
7 changes: 4 additions & 3 deletions clock/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/storacha/go-pail/block"
"github.com/storacha/go-pail/ipld/node"
)

type event[T any] struct {
Expand All @@ -33,7 +34,7 @@ func NewEvent[T any](data T, parents []ipld.Link) Event[T] {
}

// Unmarshal deserializes CBOR encoded bytes to an [Event].
func Unmarshal[T any](b []byte, dataBinder NodeBinder[T]) (Event[T], error) {
func Unmarshal[T any](b []byte, dataBinder node.Binder[T]) (Event[T], error) {
var e event[T]

np := basicnode.Prototype.Map
Expand Down Expand Up @@ -83,7 +84,7 @@ func Unmarshal[T any](b []byte, dataBinder NodeBinder[T]) (Event[T], error) {
}

// Marshal serializes an [Event] to CBOR encoded bytes.
func Marshal[T any](event Event[T], dataUnbinder NodeUnbinder[T]) ([]byte, error) {
func Marshal[T any](event Event[T], dataUnbinder node.Unbinder[T]) ([]byte, error) {
np := basicnode.Prototype.Any
nb := np.NewBuilder()

Expand Down Expand Up @@ -151,7 +152,7 @@ func Marshal[T any](event Event[T], dataUnbinder NodeUnbinder[T]) ([]byte, error

// MarshalBlock serializes the [Event] to CBOR encoded bytes, takes the sha2-256
// hash of the data, constructs a CID and returns a [block.Block].
func MarshalBlock[T any](e Event[T], dataUnbinder NodeUnbinder[T]) (block.BlockView[Event[T]], error) {
func MarshalBlock[T any](e Event[T], dataUnbinder node.Unbinder[T]) (block.BlockView[Event[T]], error) {
bytes, err := Marshal(e, dataUnbinder)
if err != nil {
return nil, fmt.Errorf("marshalling event: %w", err)
Expand Down
5 changes: 3 additions & 2 deletions clock/event/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (

"github.com/ipld/go-ipld-prime"
"github.com/storacha/go-pail/block"
"github.com/storacha/go-pail/ipld/node"
)

type Fetcher[T any] struct {
blocks block.Fetcher
dataBinder NodeBinder[T]
dataBinder node.Binder[T]
}

func (f *Fetcher[T]) Get(ctx context.Context, link ipld.Link) (BlockView[T], error) {
Expand All @@ -26,6 +27,6 @@ func (f *Fetcher[T]) Get(ctx context.Context, link ipld.Link) (BlockView[T], err
return block.NewBlockView(link, b.Bytes(), s), nil
}

func NewFetcher[T any](blocks block.Fetcher, dataBinder NodeBinder[T]) *Fetcher[T] {
func NewFetcher[T any](blocks block.Fetcher, dataBinder node.Binder[T]) *Fetcher[T] {
return &Fetcher[T]{blocks, dataBinder}
}
8 changes: 0 additions & 8 deletions clock/event/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,3 @@ type Event[T any] interface {
type BlockView[T any] interface {
block.BlockView[Event[T]]
}

type NodeUnbinder[T any] interface {
Unbind(T) (ipld.Node, error)
}

type NodeBinder[T any] interface {
Bind(ipld.Node) (T, error)
}
Loading
Loading