Skip to content

Commit

Permalink
Extend PushPull to support sync mode by adding push-only flag (#500)
Browse files Browse the repository at this point in the history
When push-only is set to true, the Server will respond with an empty
operations unless there have been actual changes to the data. In cases
where the push-pull response contains changes that have already been
made on the client side, empty changes will be sent to prevent
redoing those operations.

Due to these modifications, deep copying of changeInfo is required.

---------

Co-authored-by: Youngteac Hong <[email protected]>
  • Loading branch information
humdrum and hackerwins committed Apr 3, 2023
1 parent 6f9c5ad commit c395200
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 63 deletions.
29 changes: 29 additions & 0 deletions api/types/sync_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package types

// SyncMode is the mode of synchronization. It is used to determine whether to
// push and pull changes in PushPullChanges API.
type SyncMode int

const (
// SyncModePushPull is the mode that pushes and pulls changes.
SyncModePushPull SyncMode = iota

// SyncModePushOnly is the mode that pushes changes only.
SyncModePushOnly
)
127 changes: 85 additions & 42 deletions api/yorkie/v1/yorkie.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/yorkie/v1/yorkie.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ message PushPullChangesRequest {
bytes client_id = 1;
string document_id = 2;
ChangePack change_pack = 3;
bool push_only = 4;
}

message PushPullChangesResponse {
Expand Down
41 changes: 33 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ var (
ErrUnsupportedWatchResponseType = errors.New("unsupported watch response type")
)

// SyncOption is an option for sync. It contains the key of the document to
// sync and the sync mode.
type SyncOption struct {
key key.Key
mode types.SyncMode
}

// WithPushOnly returns a SyncOption with the sync mode set to PushOnly.
func (o SyncOption) WithPushOnly() SyncOption {
return SyncOption{
key: o.key,
mode: types.SyncModePushOnly,
}
}

// Attachment represents the document attached and peers.
type Attachment struct {
doc *document.Document
Expand Down Expand Up @@ -356,18 +371,26 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document) error {
return nil
}

// WithDocKey creates a SyncOption with the given document key.
func WithDocKey(k key.Key) SyncOption {
return SyncOption{
key: k,
mode: types.SyncModePushPull,
}
}

// Sync pushes local changes of the attached documents to the server and
// receives changes of the remote replica from the server then apply them to
// local documents.
func (c *Client) Sync(ctx context.Context, keys ...key.Key) error {
if len(keys) == 0 {
func (c *Client) Sync(ctx context.Context, options ...SyncOption) error {
if len(options) == 0 {
for _, attachment := range c.attachments {
keys = append(keys, attachment.doc.Key())
options = append(options, WithDocKey(attachment.doc.Key()))
}
}

for _, k := range keys {
if err := c.pushPull(ctx, k); err != nil {
for _, opt := range options {
if err := c.pushPullChanges(ctx, opt); err != nil {
return err
}
}
Expand Down Expand Up @@ -573,12 +596,13 @@ func (c *Client) IsActive() bool {
return c.status == activated
}

func (c *Client) pushPull(ctx context.Context, key key.Key) error {
// pushPullChanges pushes the changes of the document to the server and pulls the changes from the server.
func (c *Client) pushPullChanges(ctx context.Context, opt SyncOption) error {
if c.status != activated {
return ErrClientNotActivated
}

attachment, ok := c.attachments[key]
attachment, ok := c.attachments[opt.key]
if !ok {
return ErrDocumentNotAttached
}
Expand All @@ -589,11 +613,12 @@ func (c *Client) pushPull(ctx context.Context, key key.Key) error {
}

res, err := c.client.PushPullChanges(
withShardKey(ctx, c.options.APIKey, key.String()),
withShardKey(ctx, c.options.APIKey, opt.key.String()),
&api.PushPullChangesRequest{
ClientId: c.id.Bytes(),
DocumentId: attachment.docID.String(),
ChangePack: pbChangePack,
PushOnly: opt.mode == types.SyncModePushOnly,
},
)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions server/backend/database/change_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,15 @@ func (i *ChangeInfo) ToChange() (*change.Change, error) {

return c, nil
}

// DeepCopy returns a deep copy of this ChangeInfo.
func (i *ChangeInfo) DeepCopy() *ChangeInfo {
if i == nil {
return nil
}

clone := &ChangeInfo{}
*clone = *i

return clone
}
2 changes: 1 addition & 1 deletion server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ func (d *DB) FindChangeInfosBetweenServerSeqs(
if info.DocID != docID || info.ServerSeq > to {
break
}
infos = append(infos, info)
infos = append(infos, info.DeepCopy())
}
return infos, nil
}
Expand Down
3 changes: 2 additions & 1 deletion server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func PushPull(
clientInfo *database.ClientInfo,
docInfo *database.DocInfo,
reqPack *change.Pack,
mode types.SyncMode,
) (*ServerPack, error) {
start := gotime.Now()
defer func() {
Expand All @@ -73,7 +74,7 @@ func PushPull(
be.Metrics.AddPushPullReceivedOperations(reqPack.OperationsLen())

// 02. pull pack: pull changes or a snapshot from the database and create a response pack.
respPack, err := pullPack(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq)
respPack, err := pullPack(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq, mode)
if err != nil {
return nil, err
}
Expand Down
31 changes: 29 additions & 2 deletions server/packs/pushpull.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/server/backend"
"github.com/yorkie-team/yorkie/server/backend/database"
Expand Down Expand Up @@ -86,7 +87,17 @@ func pullPack(
reqPack *change.Pack,
cpAfterPush change.Checkpoint,
initialServerSeq int64,
mode types.SyncMode,
) (*ServerPack, error) {
// If the client is push-only, it does not need to pull changes.
// So, just return the checkpoint with server seq after pushing changes.
if mode == types.SyncModePushOnly {
return NewServerPack(docInfo.Key, change.Checkpoint{
ServerSeq: reqPack.Checkpoint.ServerSeq,
ClientSeq: cpAfterPush.ClientSeq,
}, nil, nil), nil
}

if initialServerSeq < reqPack.Checkpoint.ServerSeq {
return nil, fmt.Errorf(
"serverSeq of CP greater than serverSeq of clientInfo(clientInfo %d, cp %d): %w",
Expand Down Expand Up @@ -182,19 +193,35 @@ func pullChangeInfos(
return change.InitialCheckpoint, nil, err
}

// NOTE(hackerwins, humdrum): Remove changes from the pulled if the client already has them.
// This could happen when the client has pushed changes and the server receives the changes
// and stores them in the DB, but fails to send the response to the client.
// And it could also happen when the client sync with push-only mode and then sync with pull mode.
//
// See the following test case for more details:
// "sync option with mixed mode test" in integration/client_test.go
var filteredChanges []*database.ChangeInfo
for _, pulledChange := range pulledChanges {
if clientInfo.ID == pulledChange.ActorID && cpAfterPush.ClientSeq >= pulledChange.ClientSeq {
continue
}
filteredChanges = append(filteredChanges, pulledChange)
}

cpAfterPull := cpAfterPush.NextServerSeq(docInfo.ServerSeq)

if len(pulledChanges) > 0 {
logging.From(ctx).Infof(
"PULL: '%s' pulls %d changes(%d~%d) from '%s', cp: %s",
"PULL: '%s' pulls %d changes(%d~%d) from '%s', cp: %s, filtered changes: %d",
clientInfo.ID,
len(pulledChanges),
pulledChanges[0].ServerSeq,
pulledChanges[len(pulledChanges)-1].ServerSeq,
docInfo.Key,
cpAfterPull.String(),
len(pulledChanges)-len(filteredChanges),
)
}

return cpAfterPull, pulledChanges, nil
return cpAfterPull, filteredChanges, nil
}
Loading

0 comments on commit c395200

Please sign in to comment.