Skip to content

Commit

Permalink
Merge pull request #116 from planetscale/exit-earlier
Browse files Browse the repository at this point in the history
Exit sync once stop cursor is passed
  • Loading branch information
notfelineit authored Jan 29, 2025
2 parents e0e088c + 51f5ca5 commit 907e108
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ steps:
- docker-compose#v3.9.0:
run: ci

- name: "Verify dependency licenses %n"
- name: "Verify dependency licenses"
command: "go get -v ./... && license_finder"
env:
DOCKER_BUILDKIT: 1
Expand Down
6 changes: 3 additions & 3 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# syntax=docker/dockerfile:1

ARG GO_VERSION=1.22.2
ARG GO_VERSION=1.22.11
FROM pscale.dev/wolfi-prod/go:${GO_VERSION} AS build

RUN apk add --no-cache openssl libxcrypt ruby-3.0 && \
gem install license_finder
RUN apk add --no-cache openssl openssl-dev libxcrypt ruby-3.2 ruby-3.2-dev
RUN gem install openssl license_finder

ENTRYPOINT []
WORKDIR /airbyte-source
Expand Down
12 changes: 6 additions & 6 deletions cmd/airbyte-source/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ func ReadCommand(ch *Helper) *cobra.Command {
}

if len(catalog.Streams) == 0 {
ch.Logger.Log(internal.LOGLEVEL_ERROR, "catalog has no streams")
ch.Logger.Log(internal.LOGLEVEL_ERROR, "Catalog has no streams")
return
}

state := ""
if stateFilePath != "" {
ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("state file detected, parsing provided file %s", stateFilePath))
ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("State file detected, parsing provided file %s", stateFilePath))
b, err := os.ReadFile(stateFilePath)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to read state : %v", err))
Expand Down Expand Up @@ -111,9 +111,9 @@ func ReadCommand(ch *Helper) *cobra.Command {
var tc *psdbconnectv1alpha1.TableCursor

tc, err = shardState.SerializedCursorToTableCursor(table)
ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("using serialized cursor for stream %s", streamStateKey))
ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Using serialized cursor for stream %s", streamStateKey))
if err != nil {
ch.Logger.Error(fmt.Sprintf("invalid serialized cursor for stream %v, failed with [%v]", streamStateKey, err))
ch.Logger.Error(fmt.Sprintf("Invalid serialized cursor for stream %v, failed with [%v]", streamStateKey, err))
os.Exit(1)
}

Expand Down Expand Up @@ -160,13 +160,13 @@ func readState(state string, psc internal.PlanetScaleSource, streams []internal.
keyspaceOrDatabase = psc.Database
}
stateKey := keyspaceOrDatabase + ":" + s.Stream.Name
logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("syncing stream %s with sync mode %s", s.Stream.Name, s.SyncMode))
logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Syncing stream %s with sync mode %s", s.Stream.Name, s.SyncMode))
ignoreCurrentCursor := !s.IncrementalSyncRequested()

// if no table cursor was found in the state, or we want to ignore the current cursor,
// Send along an empty cursor for each shard.
if _, ok := syncState.Streams[stateKey]; !ok || ignoreCurrentCursor {
logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("ignoring current cursor since incremental sync is disabled, or no cursor was found for key %s", stateKey))
logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Ignoring current cursor since incremental sync is disabled, or no cursor was found for key %s", stateKey))
initialState, err := psc.GetInitialState(keyspaceOrDatabase, shards)
if err != nil {
return syncState, err
Expand Down
95 changes: 69 additions & 26 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
clientoptions "github.com/planetscale/psdb/core/pool/options"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
vtmysql "vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
Expand Down Expand Up @@ -177,26 +178,25 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
tabletType = psdbconnect.TabletType_replica
}

p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("Syncing from tabletType \"%v\"", TabletTypeToString(tabletType)))

currentPosition := lastKnownPosition
table := s.Stream
readDuration := 1 * time.Minute
preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", table.Namespace, TabletTypeToString(tabletType), table.Name, currentPosition.Shard)

for {
p.Logger.Log(LOGLEVEL_INFO, preamble+"peeking to see if there's any new rows")
p.Logger.Log(LOGLEVEL_INFO, preamble+"Peeking to see if there's any new rows")
latestCursorPosition, lcErr := p.getLatestCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType)
if lcErr != nil {
return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position")
}

// the current vgtid is the same as the last synced vgtid, no new rows.
if latestCursorPosition == currentPosition.Position {
p.Logger.Log(LOGLEVEL_INFO, preamble+"no new rows found, exiting")
// the last synced VGTID is not at least, or after the current VGTID
if currentPosition.Position != "" && !positionAtLeast(latestCursorPosition, currentPosition.Position) {
p.Logger.Log(LOGLEVEL_INFO, preamble+"No new rows found, exiting")
return TableCursorToSerializedCursor(currentPosition)
}
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"new rows found, syncing rows for %v", readDuration))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"syncing rows with cursor [%v]", currentPosition))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"New rows found, syncing rows for %v", readDuration))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"Syncing rows from cursor [%v]", currentPosition))

currentPosition, recordCount, err := p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration)
if currentPosition.Position != "" {
Expand All @@ -210,16 +210,16 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
if s, ok := status.FromError(err); ok {
// if the error is anything other than server timeout, keep going
if s.Code() != codes.DeadlineExceeded {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%vGot error [%v], Returning with cursor :[%v] after server timeout", preamble, s.Code(), currentPosition))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%vGot error [%v], returning with cursor [%v] after server timeout", preamble, s.Code(), currentPosition))
return currentSerializedCursor, nil
} else {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%v%v records synced. Continuing with cursor after server timeout", preamble, recordCount))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%v%v records synced. Continuing with cursor after recoverable error %+v", preamble, recordCount, err))
}
} else if errors.Is(err, io.EOF) {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%vFinished reading %v records for table [%v]", preamble, recordCount, table.Name))
return currentSerializedCursor, nil
} else {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%vnon-grpc error [%v]]", preamble, err))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%vNon-grpc error [%v]]", preamble, err))
return currentSerializedCursor, err
}
}
Expand Down Expand Up @@ -259,22 +259,24 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
}
}

// If there is a LastKnownPk, that means we were in a copy phase
// and want to resume the copy phase
if tc.LastKnownPk != nil {
tc.Position = ""
}

p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sSyncing with cursor position: [%v], using last known PK: %v, stop cursor is: [%v]", preamble, tc.Position, tc.LastKnownPk != nil, stopPosition))

sReq := &psdbconnect.SyncRequest{
TableName: s.Name,
Cursor: tc,
TabletType: tabletType,
Cells: []string{"planetscale_operator_default"},
}
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sDEBUG: SyncRequest.Cells = %v", preamble, sReq.GetCells()))

p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sRequesting to sync from cursor position [%v] to stop cursor position [%v] in cells %v; using last known PK: %v", preamble, tc.Position, stopPosition, sReq.GetCells(), tc.LastKnownPk != nil))

c, err := client.Sync(ctx, sReq)
if err != nil {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync due to client sync error: %+v", preamble, err))
return tc, 0, err
}

Expand All @@ -283,44 +285,47 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
keyspaceOrDatabase = ps.Database
}

// stop when we've reached the well known stop position for this sync session.
// Stop when we've reached the well known stop position for this sync session.
watchForVgGtidChange := false
resultCount := 0

for {

res, err := c.Recv()
if err != nil {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync and flushing records due to error: %+v", preamble, err))
return tc, resultCount, err
}

if res.Cursor != nil {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sAdvancing cursor to %+v", preamble, res.Cursor))
tc = res.Cursor
}

// Because of the ordering of events in a vstream
// we receive the vgtid event first and then the rows.
// the vgtid event might repeat, but they're ordered.
// so we once we reach the desired stop vgtid, we stop the sync session
// if we get a newer vgtid.
watchForVgGtidChange = watchForVgGtidChange || tc.Position == stopPosition

// Heartbeats and other non-DML queries can create binlog events with the same VGTID, but no rows.
// These no-row results can have the same VGTID as a subsequent result with rows.
if len(res.Result) > 0 {
resultCount += len(res.Result)
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sFound %+v results", preamble, len(res.Result)))
// Watch for VGTID change as soon as we encounter records from some VGTID that is equal to, or after the stop position we're looking for.
// We watch for a VGTID that is equal to or after (not just equal to) the stop position, because by the time the first sync for records occurs,
// the current VGTID may have already advanced past the stop position.
watchForVgGtidChange = watchForVgGtidChange || positionAtLeast(tc.Position, stopPosition)
for _, result := range res.Result {
qr := sqltypes.Proto3ToResult(result)
for _, row := range qr.Rows {
resultCount += 1
sqlResult := &sqltypes.Result{
Fields: result.Fields,
}
sqlResult.Rows = append(sqlResult.Rows, row)
// print AirbyteRecord messages to stdout here.
// Results queued to Airbyte here, and flushed at the end of sync()
p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name)
}
}
}

if watchForVgGtidChange && tc.Position != stopPosition {
// Exit sync and flush records once the VGTID position is past the desired stop position
if watchForVgGtidChange && positionAfter(tc.Position, stopPosition) {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync and flushing records because current position %+v has passed stop position %+v", preamble, tc.Position, stopPosition))
return tc, resultCount, io.EOF
}
}
Expand Down Expand Up @@ -394,3 +399,41 @@ func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, tableName
p.Logger.Record(tableNamespace, tableName, record)
}
}

// positionAtLeast returns true if position `a` is equal to or after position `b`
func positionAtLeast(a string, b string) bool {
if a == "" || b == "" {
return false
}

parsedA, err := vtmysql.DecodePosition(a)
if err != nil {
return false
}

parsedB, err := vtmysql.DecodePosition(b)
if err != nil {
return false
}

return parsedA.AtLeast(parsedB)
}

// positionAfter returns true if position `a` is after position `b`
func positionAfter(a string, b string) bool {
if a == "" || b == "" {
return false
}

parsedA, err := vtmysql.DecodePosition(a)
if err != nil {
return false
}

parsedB, err := vtmysql.DecodePosition(b)
if err != nil {
return false
}

return !parsedA.Equal(parsedB) && parsedA.AtLeast(parsedB)
}
Loading

0 comments on commit 907e108

Please sign in to comment.