Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(operation): Introducing operation entity and thread safe operation queue. #16

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions operations/def.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2021, athena-crdt authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package operations

import (
"sort"
"sync"

"github.com/pkg/errors"

"github.com/athena-crdt/athena-core/operations/defs"
"github.com/athena-crdt/athena-core/operations/lamport"
)

type (
// Mutation is the type of operation
Mutation uint8

// Operation - individual operation entity
Operation struct {
Id *lamport.Clock
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we using pointers here?

Copy link
Member Author

@bisakhmondal bisakhmondal Oct 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prevent it from unnecessary being copied and to have the atomicity while checking and updating the counter value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at operation level the clock is fixed as is used as an id so no updates.
Suggestion:
don't use pointers here

rest looks good

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It hardly makes any difference. If you insist - value then.

Deps []*lamport.Clock
Cursor []defs.NodeId
Mutation Mutation
Value defs.Node
}

// OpStore is a thread safe Operation queue.
OpStore struct {
sync.RWMutex
Store []*Operation
}
)

const (
ASSIGN = Mutation(iota)
INSERT
DELETE
GET
)

// NewOpStore creates a new operation store.
func NewOpStore() *OpStore {
return &OpStore{
RWMutex: sync.RWMutex{},
Store: []*Operation{},
}
}

func (o *Operation) SortDeps() {
sort.Slice(o.Deps, func(i, j int) bool {
return o.Deps[i].GetTime() <= o.Deps[j].GetTime()
})
}

func (op *OpStore) Push(o *Operation) {
op.Lock()
defer op.Unlock()
op.Store = append(op.Store, o)
}

func (op *OpStore) SortedPush(o *Operation) {
op.Lock()
defer op.Unlock()
op.Store = append(op.Store, o)
sort.Slice(op.Store, func(i, j int) bool {
return op.Store[i].Id.GetTime() <= op.Store[j].Id.GetTime()
})
}

func (op *OpStore) IsEmpty() bool {
return op.Len() == 0
}

func (op *OpStore) Len() int {
op.RLock()
defer op.RUnlock()
return len(op.Store)
}

func (op *OpStore) Pop() (*Operation, error) {
op.Lock()
defer op.Unlock()
if len(op.Store) == 0 {
return nil, errors.New("empty queue")
}

elm := op.Store[0]
op.Store = op.Store[1:]
return elm, nil
}

func (op *OpStore) Sort() {
sort.Slice(op.Store, func(i, j int) bool {
return op.Store[i].Id.GetTime() <= op.Store[j].Id.GetTime()
})
}

func (op *OpStore) Serialize() ([]byte, error) {
op.RLock()
defer op.RUnlock()
panic("implement after PR #12")
}
30 changes: 17 additions & 13 deletions operations/lamport/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,33 @@

package lamport

import (
"sync/atomic"
)
import "sync/atomic"

type Clock struct {
// counter represents current lamport counter value.
counter uint64
// hostId identifies the host where the operation was generated at first place.
hostId string
}

func (clock *Clock) IsGreaterThan(obj *Clock) bool {
if atomic.LoadUint64(&obj.counter) == atomic.LoadUint64(&clock.counter) {
return clock.hostId > obj.hostId
} else {
return atomic.LoadUint64(&clock.counter) > atomic.LoadUint64(&obj.counter)
// NewClock returns a pointer to a newly created clock.
func NewClock(counter uint64, hostId string) *Clock {
return &Clock{
hostId: hostId,
counter: counter,
}
}

func (clock *Clock) IsGreaterThan(obj *Clock) bool {
return atomic.LoadUint64(&clock.counter) > atomic.LoadUint64(&obj.counter)
}

func (clock *Clock) IsLessThan(obj *Clock) bool {
if atomic.LoadUint64(&obj.counter) == atomic.LoadUint64(&clock.counter) {
return clock.hostId < obj.hostId
} else {
return atomic.LoadUint64(&clock.counter) < atomic.LoadUint64(&obj.counter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(counter, hostId) defines total ordering of operations across all replicas

Copy link
Member Author

@bisakhmondal bisakhmondal Oct 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just think logically and tell me, how does it differ when the two operations happing at two different hosts have the same Lamport counter. The ordering of the host id should matter? No. Either the host id is auto-generated at init or randomly given during bootstrapping. So why should we take that into consideration inside the programming logic? Just keep it for the identification purpose - like where the counter was set.

Copy link
Member

@noob77777 noob77777 Oct 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To resolve concurrent operations, For LWW to work we will need someone to win.

If a replica receives an operation with a counter value c
that is greater than the locally stored counter value, the local
counter is increased to the value of the incoming counter.
This ensures that if operation o1 causally happened before
o2 (that is, the replica that generated o2 had received and
processed o1 before o2 was generated), then o2 must have
a greater counter value than o1. Only concurrent operations
can have equal counter values.
We can thus define a total ordering < for Lamport
timestamps:
(c1, p1) < (c2, p2) iff (c1 < c2) ∨(c1 = c2 ∧p1 < p2).
If one operation happened before another, this ordering is
consistent with causality (the earlier operation has a lower
timestamp). If two operations are concurrent, their order
according to < is arbitrary but deterministic. This ordering
property is important for our definition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I understand. We need to think about this constructively...instead of in such a destructive way like the last writer wins. As CRDT is all about resolving conflict optimistically : )
The topic for discussion on next meet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if both have random ids which are coincidentally equal. We'll end up introducing randomness into the system

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each replica must have a unique id

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't ensure must. Instead we could focus here

Another frequently-used approach to conflict resolution
is last writer wins (LWW), which arbitrarily chooses one
among several concurrent writes as “winner” and discards
the others. LWW is used in Apache Cassandra, for example.
It does not meet our requirements, since we want no user
input to be lost due to concurrent modifications.

}
return atomic.LoadUint64(&clock.counter) < atomic.LoadUint64(&obj.counter)
}

func (clock *Clock) IsEqual(obj *Clock) bool {
return atomic.LoadUint64(&clock.counter) == atomic.LoadUint64(&obj.counter)
}

func (clock *Clock) Increment() uint64 {
Expand Down
2 changes: 1 addition & 1 deletion operations/lamport/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestClock(t *testing.T) {
clock.SetTime(20)
assert.Equal(uint64(20), clock.GetTime(), "Invalid time")
res := clock.IsGreaterThan(&Clock{20, "def"})
assert.Equal(true, res, "Greater than but concludes lesser")
assert.Equal(false, res, "Greater than but concludes lesser")
res = clock.IsGreaterThan(&Clock{25, "efg"})
assert.Equal(false, res, "Lesser than but concludes greater")
res = clock.IsLessThan(&Clock{10, "def"})
Expand Down
4 changes: 2 additions & 2 deletions proto/healthservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ message HealthCheckResponse {
}

service HealthService {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Check (HealthCheckRequest) returns (HealthCheckResponse);

rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
rpc Watch (HealthCheckRequest) returns (stream HealthCheckResponse);
}
8 changes: 4 additions & 4 deletions proto/operationservice.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion proto/operationservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ message Operation {
// Dependency array
repeated Id dependencies = 2;
// JsonPath cursor
string cursor = 3;
repeated string cursor = 3;
// Type of Operation
Mutation mutation = 4;
}
Expand Down