From adda1768e8c2c64449e45553facd7acfad16e679 Mon Sep 17 00:00:00 2001 From: Bisakh Mondal Date: Sat, 30 Oct 2021 21:11:41 +0530 Subject: [PATCH 1/2] operation defs --- operations/def.go | 115 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 operations/def.go diff --git a/operations/def.go b/operations/def.go new file mode 100644 index 0000000..4df80e3 --- /dev/null +++ b/operations/def.go @@ -0,0 +1,115 @@ +// Copyright 2021, athena-crdt authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package operations + +import ( + "sort" + "sync" + + "github.com/pkg/errors" + + "github.com/athena-crdt/athena-core/operations/defs" + "github.com/athena-crdt/athena-core/operations/lamport" +) + +type ( + // Mutation is the type of operation + Mutation uint8 + + // Operation - individual operation entity + Operation struct { + Id *lamport.Clock + Deps []*lamport.Clock + Cursor []defs.NodeId + Mutation Mutation + Value defs.Node + } + + // OpStore is a thread safe Operation queue. + OpStore struct { + sync.RWMutex + Store []*Operation + } +) + +const ( + ASSIGN = Mutation(iota) + INSERT + DELETE + GET +) + +// NewOpStore creates a new operation store. +func NewOpStore() *OpStore { + return &OpStore{ + RWMutex: sync.RWMutex{}, + Store: []*Operation{}, + } +} + +func (o *Operation) SortDeps() { + sort.Slice(o.Deps, func(i, j int) bool { + return o.Deps[i].GetTime() <= o.Deps[j].GetTime() + }) +} + +func (op *OpStore) Push(o *Operation) { + op.Lock() + defer op.Unlock() + op.Store = append(op.Store, o) +} + +func (op *OpStore) SortedPush(o *Operation) { + op.Lock() + defer op.Unlock() + op.Store = append(op.Store, o) + sort.Slice(op.Store, func(i, j int) bool { + return op.Store[i].Id.GetTime() <= op.Store[j].Id.GetTime() + }) +} + +func (op *OpStore) IsEmpty() bool { + return op.Len() == 0 +} + +func (op *OpStore) Len() int { + op.RLock() + defer op.RUnlock() + return len(op.Store) +} + +func (op *OpStore) Pop() (*Operation, error) { + op.Lock() + defer op.Unlock() + if len(op.Store) == 0 { + return nil, errors.New("empty queue") + } + + elm := op.Store[0] + op.Store = op.Store[1:] + return elm, nil +} + +func (op *OpStore) Sort() { + sort.Slice(op.Store, func(i, j int) bool { + return op.Store[i].Id.GetTime() <= op.Store[j].Id.GetTime() + }) +} + +func (op *OpStore) Serialize() ([]byte, error) { + op.RLock() + defer op.RUnlock() + panic("implement after PR #12") +} From 14871c38693c679954f49f7d406f022d6a736115 Mon Sep 17 00:00:00 2001 From: Bisakh Mondal Date: Sat, 30 Oct 2021 21:14:01 +0530 Subject: [PATCH 2/2] gRPC operation schema update --- operations/lamport/clock.go | 30 +++++++++++++++++------------- operations/lamport/clock_test.go | 2 +- proto/healthservice.proto | 4 ++-- proto/operationservice.pb.go | 8 ++++---- proto/operationservice.proto | 2 +- 5 files changed, 25 insertions(+), 21 deletions(-) diff --git a/operations/lamport/clock.go b/operations/lamport/clock.go index 0171539..304d50c 100644 --- a/operations/lamport/clock.go +++ b/operations/lamport/clock.go @@ -14,29 +14,33 @@ package lamport -import ( - "sync/atomic" -) +import "sync/atomic" type Clock struct { + // counter represents current lamport counter value. counter uint64 + // hostId identifies the host where the operation was generated at first place. hostId string } -func (clock *Clock) IsGreaterThan(obj *Clock) bool { - if atomic.LoadUint64(&obj.counter) == atomic.LoadUint64(&clock.counter) { - return clock.hostId > obj.hostId - } else { - return atomic.LoadUint64(&clock.counter) > atomic.LoadUint64(&obj.counter) +// NewClock returns a pointer to a newly created clock. +func NewClock(counter uint64, hostId string) *Clock { + return &Clock{ + hostId: hostId, + counter: counter, } } +func (clock *Clock) IsGreaterThan(obj *Clock) bool { + return atomic.LoadUint64(&clock.counter) > atomic.LoadUint64(&obj.counter) +} + func (clock *Clock) IsLessThan(obj *Clock) bool { - if atomic.LoadUint64(&obj.counter) == atomic.LoadUint64(&clock.counter) { - return clock.hostId < obj.hostId - } else { - return atomic.LoadUint64(&clock.counter) < atomic.LoadUint64(&obj.counter) - } + return atomic.LoadUint64(&clock.counter) < atomic.LoadUint64(&obj.counter) +} + +func (clock *Clock) IsEqual(obj *Clock) bool { + return atomic.LoadUint64(&clock.counter) == atomic.LoadUint64(&obj.counter) } func (clock *Clock) Increment() uint64 { diff --git a/operations/lamport/clock_test.go b/operations/lamport/clock_test.go index 25da0b2..bbaff67 100644 --- a/operations/lamport/clock_test.go +++ b/operations/lamport/clock_test.go @@ -42,7 +42,7 @@ func TestClock(t *testing.T) { clock.SetTime(20) assert.Equal(uint64(20), clock.GetTime(), "Invalid time") res := clock.IsGreaterThan(&Clock{20, "def"}) - assert.Equal(true, res, "Greater than but concludes lesser") + assert.Equal(false, res, "Greater than but concludes lesser") res = clock.IsGreaterThan(&Clock{25, "efg"}) assert.Equal(false, res, "Lesser than but concludes greater") res = clock.IsLessThan(&Clock{10, "def"}) diff --git a/proto/healthservice.proto b/proto/healthservice.proto index 9af3e3b..cad6bcc 100644 --- a/proto/healthservice.proto +++ b/proto/healthservice.proto @@ -32,7 +32,7 @@ message HealthCheckResponse { } service HealthService { - rpc Check(HealthCheckRequest) returns (HealthCheckResponse); + rpc Check (HealthCheckRequest) returns (HealthCheckResponse); - rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); + rpc Watch (HealthCheckRequest) returns (stream HealthCheckResponse); } diff --git a/proto/operationservice.pb.go b/proto/operationservice.pb.go index 9a86812..97dbcbe 100644 --- a/proto/operationservice.pb.go +++ b/proto/operationservice.pb.go @@ -146,7 +146,7 @@ type Operation struct { // Dependency array Dependencies []*Operation_Id `protobuf:"bytes,2,rep,name=dependencies,proto3" json:"dependencies,omitempty"` // JsonPath cursor - Cursor string `protobuf:"bytes,3,opt,name=cursor,proto3" json:"cursor,omitempty"` + Cursor []string `protobuf:"bytes,3,rep,name=cursor,proto3" json:"cursor,omitempty"` // Type of Operation Mutation Mutation `protobuf:"varint,4,opt,name=mutation,proto3,enum=operations.Mutation" json:"mutation,omitempty"` } @@ -197,11 +197,11 @@ func (x *Operation) GetDependencies() []*Operation_Id { return nil } -func (x *Operation) GetCursor() string { +func (x *Operation) GetCursor() []string { if x != nil { return x.Cursor } - return "" + return nil } func (x *Operation) GetMutation() Mutation { @@ -284,7 +284,7 @@ var file_proto_operationservice_proto_rawDesc = []byte{ 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x49, 0x64, 0x52, 0x0c, 0x64, 0x65, 0x70, 0x65, 0x6e, 0x64, 0x65, 0x6e, 0x63, 0x69, 0x65, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x08, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x08, 0x6d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x4d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x6d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x42, diff --git a/proto/operationservice.proto b/proto/operationservice.proto index 70d9a08..ef97f9c 100644 --- a/proto/operationservice.proto +++ b/proto/operationservice.proto @@ -38,7 +38,7 @@ message Operation { // Dependency array repeated Id dependencies = 2; // JsonPath cursor - string cursor = 3; + repeated string cursor = 3; // Type of Operation Mutation mutation = 4; }