diff --git a/cmd/airbyte-source/check.go b/cmd/airbyte-source/check.go index 91ba09d..094dfc7 100644 --- a/cmd/airbyte-source/check.go +++ b/cmd/airbyte-source/check.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/planetscale/connect-sdk/lib" "os" "github.com/planetscale/airbyte-source/cmd/internal" @@ -43,13 +44,7 @@ func CheckCommand(ch *Helper) *cobra.Command { return } - defer func() { - if err := ch.Database.Close(); err != nil { - fmt.Fprintf(cmd.OutOrStdout(), "Unable to close connection to PlanetScale Database, failed with %v", err) - } - }() - - cs, _ := checkConnectionStatus(ch.Database, psc) + cs, _ := checkConnectionStatus(ch.ConnectClient, ch.Source) ch.Logger.ConnectionStatus(cs) }, } @@ -70,9 +65,9 @@ func parseSource(reader FileReader, configFilePath string) (internal.PlanetScale return psc, nil } -func checkConnectionStatus(database internal.PlanetScaleDatabase, psc internal.PlanetScaleSource) (internal.ConnectionStatus, error) { +func checkConnectionStatus(connectClient lib.ConnectClient, psc lib.PlanetScaleSource) (internal.ConnectionStatus, error) { - if err := database.CanConnect(context.Background(), psc); err != nil { + if err := connectClient.CanConnect(context.Background(), psc); err != nil { return internal.ConnectionStatus{ Status: "FAILED", Message: fmt.Sprintf("Unable to connect to PlanetScale database %v at host %v with username %v. Failed with \n %v", psc.Database, psc.Host, psc.Username, err), diff --git a/cmd/airbyte-source/check_test.go b/cmd/airbyte-source/check_test.go index 5e4e4d1..6f7080b 100644 --- a/cmd/airbyte-source/check_test.go +++ b/cmd/airbyte-source/check_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/planetscale/connect-sdk/lib" "os" "testing" @@ -27,7 +28,6 @@ func TestCheckInvalidCatalogJSON(t *testing.T) { content: []byte("i am not json"), } checkCommand := CheckCommand(&Helper{ - Database: internal.PlanetScaleEdgeDatabase{}, FileReader: tfr, Logger: internal.NewLogger(os.Stdout), }) @@ -50,16 +50,16 @@ func TestCheckCredentialsInvalid(t *testing.T) { content: []byte("{\"host\": \"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\"}"), } - td := testDatabase{ + td := testConnectClient{ connectResponse: canConnectResponse{ err: fmt.Errorf("[%v] is invalid", "username"), }, } checkCommand := CheckCommand(&Helper{ - Database: td, - FileReader: tfr, - Logger: internal.NewLogger(os.Stdout), + ConnectClient: td, + FileReader: tfr, + Logger: internal.NewLogger(os.Stdout), }) b := bytes.NewBufferString("") checkCommand.SetOut(b) @@ -80,16 +80,21 @@ func TestCheckExecuteSuccessful(t *testing.T) { content: []byte("{\"host\": \"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\"}"), } - td := testDatabase{ + td := testConnectClient{ connectResponse: canConnectResponse{ err: nil, }, } checkCommand := CheckCommand(&Helper{ - Database: td, - FileReader: tfr, - Logger: internal.NewLogger(os.Stdout), + ConnectClient: td, + FileReader: tfr, + Source: lib.PlanetScaleSource{ + Host: "something.us-east-3.psdb.cloud", + Database: "database", + Username: "username", + }, + Logger: internal.NewLogger(os.Stdout), }) b := bytes.NewBufferString("") checkCommand.SetOut(b) diff --git a/cmd/airbyte-source/discover.go b/cmd/airbyte-source/discover.go index 7d78a65..fa485d2 100644 --- a/cmd/airbyte-source/discover.go +++ b/cmd/airbyte-source/discover.go @@ -40,24 +40,24 @@ func DiscoverCommand(ch *Helper) *cobra.Command { return } - cs, err := checkConnectionStatus(ch.Database, psc) + cs, err := checkConnectionStatus(ch.ConnectClient, ch.Source) if err != nil { ch.Logger.ConnectionStatus(cs) return } - defer func() { - if err := ch.Database.Close(); err != nil { - fmt.Fprintf(cmd.OutOrStdout(), "Unable to close connection to PlanetScale Database, failed with %v", err) - } - }() + if err := ch.EnsureDB(psc); err != nil { + fmt.Fprintln(cmd.OutOrStdout(), "Unable to connect to PlanetScale Database") + return + } - c, err := ch.Database.DiscoverSchema(context.Background(), psc) - if err != nil { + sb := internal.NewSchemaBuilder() + if err := ch.MysqlClient.BuildSchema(context.Background(), ch.Source, sb); err != nil { ch.Logger.Log(internal.LOGLEVEL_ERROR, fmt.Sprintf("Unable to discover database, failed with [%v]", err)) return } + c := sb.(*internal.SchemaBuilder).GetCatalog() ch.Logger.Catalog(c) }, } diff --git a/cmd/airbyte-source/discover_test.go b/cmd/airbyte-source/discover_test.go index 90253b6..a6b7371 100644 --- a/cmd/airbyte-source/discover_test.go +++ b/cmd/airbyte-source/discover_test.go @@ -15,7 +15,7 @@ func TestDiscoverInvalidSource(t *testing.T) { tfr := testFileReader{ content: []byte("{\"host\": \"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\"}"), } - td := testDatabase{ + td := testConnectClient{ connectResponse: canConnectResponse{ err: fmt.Errorf("[%v] is invalid", "username"), }, @@ -23,9 +23,9 @@ func TestDiscoverInvalidSource(t *testing.T) { b := bytes.NewBufferString("") discover := DiscoverCommand(&Helper{ - Database: td, - FileReader: tfr, - Logger: internal.NewLogger(b), + ConnectClient: td, + FileReader: tfr, + Logger: internal.NewLogger(b), }) discover.SetArgs([]string{"config source.json"}) @@ -46,19 +46,22 @@ func TestDiscoverFailed(t *testing.T) { tfr := testFileReader{ content: []byte("{\"host\": \"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\"}"), } - td := testDatabase{ + td := testConnectClient{ connectResponse: canConnectResponse{ err: nil, }, - discoverSchemaResponse: discoverSchemaResponse{ + } + tmc := testMysqlClient{ + buildSchemaResponse: buildSchemaResponse{ err: fmt.Errorf("unable to get catalog for %v", "keyspace"), }, } b := bytes.NewBufferString("") discover := DiscoverCommand(&Helper{ - Database: td, - FileReader: tfr, - Logger: internal.NewLogger(b), + ConnectClient: td, + MysqlClient: tmc, + FileReader: tfr, + Logger: internal.NewLogger(b), }) discover.SetArgs([]string{"config source.json"}) diff --git a/cmd/airbyte-source/helper.go b/cmd/airbyte-source/helper.go index 3e01f9f..5e34885 100644 --- a/cmd/airbyte-source/helper.go +++ b/cmd/airbyte-source/helper.go @@ -2,14 +2,17 @@ package airbyte_source import ( "github.com/planetscale/airbyte-source/cmd/internal" + "github.com/planetscale/connect-sdk/lib" "io" "os" ) type Helper struct { - Database internal.PlanetScaleDatabase - FileReader FileReader - Logger internal.AirbyteLogger + MysqlClient lib.MysqlClient + ConnectClient lib.ConnectClient + Source lib.PlanetScaleSource + FileReader FileReader + Logger internal.AirbyteLogger } type FileReader interface { @@ -31,18 +34,23 @@ func DefaultHelper(w io.Writer) *Helper { } func (h *Helper) EnsureDB(psc internal.PlanetScaleSource) error { - if h.Database != nil { + if h.ConnectClient != nil { return nil } - mysql, err := internal.NewMySQL(&psc) + h.Source = lib.PlanetScaleSource{ + UseReplica: true, + Username: psc.Username, + Database: psc.Database, + Host: psc.Host, + Password: psc.Password, + } + var err error + h.MysqlClient, err = lib.NewMySQL(&h.Source) if err != nil { return err } - h.Database = internal.PlanetScaleEdgeDatabase{ - Logger: h.Logger, - Mysql: mysql, - } + h.ConnectClient = lib.NewConnectClient(&h.MysqlClient) return nil } diff --git a/cmd/airbyte-source/read.go b/cmd/airbyte-source/read.go index 77c50fa..2c22248 100644 --- a/cmd/airbyte-source/read.go +++ b/cmd/airbyte-source/read.go @@ -4,10 +4,11 @@ import ( "context" "encoding/json" "fmt" - "os" - "github.com/planetscale/airbyte-source/cmd/internal" + psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" + "github.com/planetscale/connect-sdk/lib" "github.com/spf13/cobra" + "os" ) var ( @@ -49,13 +50,7 @@ func ReadCommand(ch *Helper) *cobra.Command { return } - defer func() { - if err := ch.Database.Close(); err != nil { - fmt.Fprintf(cmd.OutOrStdout(), "Unable to close connection to PlanetScale Database, failed with %v", err) - } - }() - - cs, err := checkConnectionStatus(ch.Database, psc) + cs, err := checkConnectionStatus(ch.ConnectClient, ch.Source) if err != nil { ch.Logger.ConnectionStatus(cs) return @@ -81,7 +76,8 @@ func ReadCommand(ch *Helper) *cobra.Command { } state = string(b) } - shards, err := ch.Database.ListShards(context.Background(), psc) + + shards, err := ch.ConnectClient.ListShards(context.Background(), ch.Source) if err != nil { ch.Logger.Error(fmt.Sprintf("Unable to list shards : %v", err)) os.Exit(1) @@ -93,6 +89,9 @@ func ReadCommand(ch *Helper) *cobra.Command { os.Exit(1) } + allColumns := []string{} + rb := internal.NewResultBuilder(ch.Logger) + irb := rb.(*internal.ResultBuilder) for _, table := range catalog.Streams { keyspaceOrDatabase := table.Stream.Namespace if keyspaceOrDatabase == "" { @@ -104,25 +103,38 @@ func ReadCommand(ch *Helper) *cobra.Command { ch.Logger.Error(fmt.Sprintf("Unable to read state for stream %v", streamStateKey)) os.Exit(1) } + irb.SetKeyspace(keyspaceOrDatabase) + irb.SetTable(table.Stream.Name) for shardName, shardState := range streamState.Shards { - tc, err := shardState.SerializedCursorToTableCursor(table) + tc, err := shardState.DeserializeTableCursor() if err != nil { ch.Logger.Error(fmt.Sprintf("invalid cursor for stream %v, failed with [%v]", streamStateKey, err)) os.Exit(1) } + irb.HandleOnCursor = func(tc *psdbconnect.TableCursor) error { + sc, err := lib.SerializeTableCursor(tc) + if err != nil { + return err + } + syncState.Streams[streamStateKey].Shards[shardName] = sc + ch.Logger.State(syncState) + ch.Logger.Flush() + return nil + } - sc, err := ch.Database.Read(context.Background(), cmd.OutOrStdout(), psc, table, tc) + sc, err := ch.ConnectClient.Read(context.Background(), ch.Logger, ch.Source, table.Stream.Name, allColumns, tc, rb) if err != nil { ch.Logger.Error(err.Error()) os.Exit(1) } - if sc != nil { // if we get any new state, we assign it here. // otherwise, the older state is round-tripped back to Airbyte. syncState.Streams[streamStateKey].Shards[shardName] = sc } + + ch.Logger.Flush() ch.Logger.State(syncState) } } @@ -150,6 +162,7 @@ func readState(state string, psc internal.PlanetScaleSource, streams []internal. } for _, s := range streams { + keyspaceOrDatabase := s.Stream.Namespace if keyspaceOrDatabase == "" { keyspaceOrDatabase = psc.Database diff --git a/cmd/airbyte-source/test_types.go b/cmd/airbyte-source/test_types.go index da8230f..63811b2 100644 --- a/cmd/airbyte-source/test_types.go +++ b/cmd/airbyte-source/test_types.go @@ -2,10 +2,8 @@ package airbyte_source import ( "context" - "io" - - "github.com/planetscale/airbyte-source/cmd/internal" psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" + "github.com/planetscale/connect-sdk/lib" ) type testFileReader struct { @@ -21,37 +19,50 @@ type canConnectResponse struct { err error } -type discoverSchemaResponse struct { - catalog internal.Catalog - err error +type testConnectClient struct { + connectResponse canConnectResponse } -type testDatabase struct { - connectResponse canConnectResponse - discoverSchemaResponse discoverSchemaResponse +func (td testConnectClient) CanConnect(ctx context.Context, ps lib.PlanetScaleSource) error { + return td.connectResponse.err } -func (td testDatabase) CanConnect(ctx context.Context, ps internal.PlanetScaleSource) error { - return td.connectResponse.err +func (td testConnectClient) Read(context.Context, lib.DatabaseLogger, lib.PlanetScaleSource, string, []string, *psdbconnect.TableCursor, lib.ResultBuilder) (*lib.SerializedCursor, error) { + // TODO implement me + panic("implement me") } -func (td testDatabase) HasTabletType(ctx context.Context, psc internal.PlanetScaleSource, tt psdbconnect.TabletType) (bool, error) { - return true, nil +func (td testConnectClient) Close() error { + return nil } -func (td testDatabase) DiscoverSchema(ctx context.Context, ps internal.PlanetScaleSource) (internal.Catalog, error) { - return td.discoverSchemaResponse.catalog, td.discoverSchemaResponse.err +func (td testConnectClient) ListShards(ctx context.Context, ps lib.PlanetScaleSource) ([]string, error) { + panic("implement me") } -func (td testDatabase) Read(ctx context.Context, w io.Writer, ps internal.PlanetScaleSource, s internal.ConfiguredStream, tc *psdbconnect.TableCursor) (*internal.SerializedCursor, error) { - // TODO implement me +type buildSchemaResponse struct { + err error +} + +type testMysqlClient struct { + buildSchemaResponse buildSchemaResponse +} + +func (tmc testMysqlClient) BuildSchema(ctx context.Context, psc lib.PlanetScaleSource, schemaBuilder lib.SchemaBuilder) error { + return tmc.buildSchemaResponse.err +} + +func (tmc testMysqlClient) PingContext(ctx context.Context, source lib.PlanetScaleSource) error { + //TODO implement me panic("implement me") } -func (td testDatabase) Close() error { - return nil +func (tmc testMysqlClient) GetVitessShards(ctx context.Context, psc lib.PlanetScaleSource) ([]string, error) { + //TODO implement me + panic("implement me") } -func (td testDatabase) ListShards(ctx context.Context, ps internal.PlanetScaleSource) ([]string, error) { +func (tmc testMysqlClient) Close() error { + //TODO implement me panic("implement me") } diff --git a/cmd/internal/logger.go b/cmd/internal/logger.go index 49f060b..4f6ab06 100644 --- a/cmd/internal/logger.go +++ b/cmd/internal/logger.go @@ -14,6 +14,7 @@ type AirbyteLogger interface { Flush() State(syncState SyncState) Error(error string) + Info(message string) } const MaxBatchSize = 10000 @@ -91,6 +92,16 @@ func (a *airbyteLogger) Error(error string) { }) } +func (a *airbyteLogger) Info(message string) { + a.recordEncoder.Encode(AirbyteMessage{ + Type: LOG, + Log: &AirbyteLogMessage{ + Level: LOGLEVEL_INFO, + Message: message, + }, + }) +} + func (a *airbyteLogger) ConnectionStatus(status ConnectionStatus) { a.recordEncoder.Encode(AirbyteMessage{ Type: CONNECTION_STATUS, diff --git a/cmd/internal/mock_types.go b/cmd/internal/mock_types.go deleted file mode 100644 index 88809f8..0000000 --- a/cmd/internal/mock_types.go +++ /dev/null @@ -1,120 +0,0 @@ -package internal - -import ( - "context" - "database/sql" - psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" - "google.golang.org/grpc" - "io" -) - -type testAirbyteLogger struct { - logMessages map[string][]string - records map[string][]map[string]interface{} -} - -func (tal *testAirbyteLogger) Log(level, message string) { - if tal.logMessages == nil { - tal.logMessages = map[string][]string{} - } - tal.logMessages[level] = append(tal.logMessages[level], message) -} - -func (testAirbyteLogger) Catalog(catalog Catalog) { - //TODO implement me - panic("implement me") -} - -func (testAirbyteLogger) ConnectionStatus(status ConnectionStatus) { - //TODO implement me - panic("implement me") -} - -func (tal *testAirbyteLogger) Record(tableNamespace, tableName string, data map[string]interface{}) { - if tal.records == nil { - tal.records = map[string][]map[string]interface{}{} - } - key := tableNamespace + "." + tableName - tal.records[key] = append(tal.records[key], data) -} - -func (testAirbyteLogger) Flush() { -} - -func (testAirbyteLogger) State(syncState SyncState) { - //TODO implement me - panic("implement me") -} - -func (testAirbyteLogger) Error(error string) { - //TODO implement me - panic("implement me") -} - -type clientConnectionMock struct { - syncFn func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) - syncFnInvoked bool - syncFnInvokedCount int -} - -type connectSyncClientMock struct { - lastResponseSent int - syncResponses []*psdbconnect.SyncResponse - grpc.ClientStream -} - -func (x *connectSyncClientMock) Recv() (*psdbconnect.SyncResponse, error) { - if x.lastResponseSent >= len(x.syncResponses) { - return nil, io.EOF - } - x.lastResponseSent += 1 - return x.syncResponses[x.lastResponseSent-1], nil -} -func (c *clientConnectionMock) Sync(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { - c.syncFnInvoked = true - c.syncFnInvokedCount += 1 - return c.syncFn(ctx, in, opts...) -} - -type mysqlAccessMock struct { - PingContextFn func(ctx context.Context, source PlanetScaleSource) error - PingContextFnInvoked bool - GetVitessTabletsFn func(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) - GetVitessTabletsFnInvoked bool -} - -func (tma *mysqlAccessMock) PingContext(ctx context.Context, source PlanetScaleSource) error { - tma.PingContextFnInvoked = true - return tma.PingContextFn(ctx, source) -} - -func (mysqlAccessMock) GetTableNames(ctx context.Context, source PlanetScaleSource) ([]string, error) { - //TODO implement me - panic("implement me") -} - -func (mysqlAccessMock) GetTableSchema(ctx context.Context, source PlanetScaleSource, s string) (map[string]PropertyType, error) { - //TODO implement me - panic("implement me") -} - -func (mysqlAccessMock) GetTablePrimaryKeys(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) { - //TODO implement me - panic("implement me") -} - -func (mysqlAccessMock) QueryContext(ctx context.Context, psc PlanetScaleSource, query string, args ...interface{}) (*sql.Rows, error) { - //TODO implement me - panic("implement me") -} - -func (tma *mysqlAccessMock) GetVitessTablets(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) { - tma.GetVitessTabletsFnInvoked = true - return tma.GetVitessTabletsFn(ctx, psc) -} - -func (mysqlAccessMock) GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error) { - //TODO implement me - panic("implement me") -} -func (mysqlAccessMock) Close() error { return nil } diff --git a/cmd/internal/planetscale_connection.go b/cmd/internal/planetscale_connection.go index 40b2298..31f354d 100644 --- a/cmd/internal/planetscale_connection.go +++ b/cmd/internal/planetscale_connection.go @@ -2,9 +2,8 @@ package internal import ( "fmt" - "github.com/go-sql-driver/mysql" psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" - "os" + "github.com/planetscale/connect-sdk/lib" "strings" ) @@ -23,34 +22,11 @@ type CustomSourceOptions struct { DoNotTreatTinyIntAsBoolean bool `json:"do_not_treat_tiny_int_as_boolean"` } -// DSN returns a DataSource that mysql libraries can use to connect to a PlanetScale database. -func (psc PlanetScaleSource) DSN() string { - config := mysql.NewConfig() - config.Net = "tcp" - config.Addr = psc.Host - config.User = psc.Username - config.DBName = psc.Database - config.Passwd = psc.Password - - tt := psdbconnect.TabletType_primary - if psc.UseReplica { - tt = psdbconnect.TabletType_replica - } - - if useSecureConnection() { - config.TLSConfig = "true" - config.DBName = fmt.Sprintf("%v@%v", psc.Database, TabletTypeToString(tt)) - } else { - config.TLSConfig = "skip-verify" - } - return config.FormatDSN() -} - // GetInitialState will return the initial/blank state for a given keyspace in all of its shards. // This state can be round-tripped safely with Airbyte. func (psc PlanetScaleSource) GetInitialState(keyspaceOrDatabase string, shards []string) (ShardStates, error) { shardCursors := ShardStates{ - Shards: map[string]*SerializedCursor{}, + Shards: map[string]*lib.SerializedCursor{}, } if len(psc.Shards) > 0 { @@ -82,23 +58,3 @@ func (psc PlanetScaleSource) GetInitialState(keyspaceOrDatabase string, shards [ return shardCursors, nil } - -func useSecureConnection() bool { - e2eTestRun, found := os.LookupEnv("PS_END_TO_END_TEST_RUN") - if found && (e2eTestRun == "yes" || - e2eTestRun == "y" || - e2eTestRun == "true" || - e2eTestRun == "1") { - return false - } - - return true -} - -func TabletTypeToString(t psdbconnect.TabletType) string { - if t == psdbconnect.TabletType_replica { - return "replica" - } - - return "primary" -} diff --git a/cmd/internal/planetscale_connection_test.go b/cmd/internal/planetscale_connection_test.go index 17d8204..8fc0796 100644 --- a/cmd/internal/planetscale_connection_test.go +++ b/cmd/internal/planetscale_connection_test.go @@ -2,33 +2,11 @@ package internal import ( psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" + "github.com/planetscale/connect-sdk/lib" "github.com/stretchr/testify/assert" "testing" ) -func TestCanGenerateSecureDSN(t *testing.T) { - psc := PlanetScaleSource{ - Host: "useast.psdb.connect", - Username: "usernameus-east-4", - Password: "pscale_password", - Database: "connect-test", - } - dsn := psc.DSN() - assert.Equal(t, "usernameus-east-4:pscale_password@tcp(useast.psdb.connect)/connect-test@primary?tls=true", dsn) -} - -func TestCanGenerateInsecureDSN(t *testing.T) { - psc := PlanetScaleSource{ - Host: "useast.psdb.connect", - Username: "usernameus-east-4", - Password: "pscale_password", - Database: "connect-test", - } - t.Setenv("PS_END_TO_END_TEST_RUN", "true") - dsn := psc.DSN() - assert.Equal(t, "usernameus-east-4:pscale_password@tcp(useast.psdb.connect)/connect-test?tls=skip-verify", dsn) -} - func TestCanGenerateInitialState_Sharded(t *testing.T) { psc := PlanetScaleSource{ Host: "useast.psdb.connect", @@ -45,7 +23,7 @@ func TestCanGenerateInitialState_Sharded(t *testing.T) { shardStates, err := psc.GetInitialState("connect-test", shards) assert.NoError(t, err) expectedShardStates := ShardStates{ - Shards: map[string]*SerializedCursor{}, + Shards: map[string]*lib.SerializedCursor{}, } for _, shard := range shards { @@ -82,7 +60,7 @@ func TestCanGenerateInitialState_CustomShards(t *testing.T) { assert.Equal(t, len(configuredShards), len(shardStates.Shards)) expectedShardStates := ShardStates{ - Shards: map[string]*SerializedCursor{}, + Shards: map[string]*lib.SerializedCursor{}, } for _, shard := range configuredShards { @@ -112,7 +90,7 @@ func TestCanGenerateInitialState_Unsharded(t *testing.T) { shardStates, err := psc.GetInitialState("connect-test", shards) assert.NoError(t, err) expectedShardStates := ShardStates{ - Shards: map[string]*SerializedCursor{}, + Shards: map[string]*lib.SerializedCursor{}, } for _, shard := range shards { diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go deleted file mode 100644 index 3f10b26..0000000 --- a/cmd/internal/planetscale_edge_database.go +++ /dev/null @@ -1,390 +0,0 @@ -package internal - -import ( - "context" - "fmt" - "io" - "net/http" - "strings" - "time" - - "github.com/pkg/errors" - psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" - "github.com/planetscale/psdb/auth" - grpcclient "github.com/planetscale/psdb/core/pool" - clientoptions "github.com/planetscale/psdb/core/pool/options" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "vitess.io/vitess/go/sqltypes" - _ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient" - _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" -) - -// PlanetScaleDatabase is a general purpose interface -// that defines all the data access methods needed for the PlanetScale Airbyte source to function. -type PlanetScaleDatabase interface { - CanConnect(ctx context.Context, ps PlanetScaleSource) error - DiscoverSchema(ctx context.Context, ps PlanetScaleSource) (Catalog, error) - ListShards(ctx context.Context, ps PlanetScaleSource) ([]string, error) - Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) - Close() error -} - -// PlanetScaleEdgeDatabase is an implementation of the PlanetScaleDatabase interface defined above. -// It uses the mysql interface provided by PlanetScale for all schema/shard/tablet discovery and -// the grpc API for incrementally syncing rows from PlanetScale. -type PlanetScaleEdgeDatabase struct { - Logger AirbyteLogger - Mysql PlanetScaleEdgeMysqlAccess - clientFn func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) -} - -func (p PlanetScaleEdgeDatabase) CanConnect(ctx context.Context, psc PlanetScaleSource) error { - if err := p.checkEdgePassword(ctx, psc); err != nil { - return errors.Wrap(err, "Unable to initialize Connect Session") - } - - return p.Mysql.PingContext(ctx, psc) -} - -func (p PlanetScaleEdgeDatabase) checkEdgePassword(ctx context.Context, psc PlanetScaleSource) error { - if !strings.HasSuffix(psc.Host, ".connect.psdb.cloud") { - return errors.New("This password is not connect-enabled, please ensure that your organization is enrolled in the Connect beta.") - } - reqCtx, cancel := context.WithTimeout(ctx, 2*time.Second) - defer cancel() - req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fmt.Sprintf("https://%v", psc.Host), nil) - if err != nil { - return err - } - - _, err = http.DefaultClient.Do(req) - if err != nil { - return errors.New(fmt.Sprintf("The database %q, hosted at %q, is inaccessible from this process", psc.Database, psc.Host)) - } - - return nil -} - -func (p PlanetScaleEdgeDatabase) DiscoverSchema(ctx context.Context, psc PlanetScaleSource) (Catalog, error) { - var c Catalog - - tables, err := p.Mysql.GetTableNames(ctx, psc) - if err != nil { - return c, errors.Wrap(err, "Unable to query database for schema") - } - - for _, tableName := range tables { - stream, err := p.getStreamForTable(ctx, psc, tableName) - if err != nil { - return c, errors.Wrapf(err, "unable to get stream for table %v", tableName) - } - c.Streams = append(c.Streams, stream) - } - return c, nil -} - -func (p PlanetScaleEdgeDatabase) getStreamForTable(ctx context.Context, psc PlanetScaleSource, tableName string) (Stream, error) { - schema := StreamSchema{ - Type: "object", - Properties: map[string]PropertyType{}, - } - stream := Stream{ - Name: tableName, - Schema: schema, - SupportedSyncModes: []string{"full_refresh", "incremental"}, - Namespace: psc.Database, - } - - var err error - stream.Schema.Properties, err = p.Mysql.GetTableSchema(ctx, psc, tableName) - if err != nil { - return stream, errors.Wrapf(err, "Unable to get column names & types for table %v", tableName) - } - - // need this otherwise Airbyte will fail schema discovery for views - // without primary keys. - stream.PrimaryKeys = [][]string{} - stream.DefaultCursorFields = []string{} - - primaryKeys, err := p.Mysql.GetTablePrimaryKeys(ctx, psc, tableName) - if err != nil { - return stream, errors.Wrapf(err, "unable to iterate primary keys for table %s", tableName) - } - for _, key := range primaryKeys { - stream.PrimaryKeys = append(stream.PrimaryKeys, []string{key}) - } - - // pick the last key field as the default cursor field. - if len(primaryKeys) > 0 { - stream.DefaultCursorFields = append(stream.DefaultCursorFields, primaryKeys[len(primaryKeys)-1]) - } - - stream.SourceDefinedCursor = true - return stream, nil -} - -// Convert columnType to Airbyte type. -func getJsonSchemaType(mysqlType string, treatTinyIntAsBoolean bool) PropertyType { - // Support custom airbyte types documented here : - // https://docs.airbyte.com/understanding-airbyte/supported-data-types/#the-types - switch { - case strings.HasPrefix(mysqlType, "tinyint(1)"): - if treatTinyIntAsBoolean { - return PropertyType{Type: "boolean"} - } - return PropertyType{Type: "number", AirbyteType: "integer"} - case strings.HasPrefix(mysqlType, "int"), strings.HasPrefix(mysqlType, "smallint"), strings.HasPrefix(mysqlType, "mediumint"), strings.HasPrefix(mysqlType, "bigint"), strings.HasPrefix(mysqlType, "tinyint"): - return PropertyType{Type: "number", AirbyteType: "integer"} - case strings.HasPrefix(mysqlType, "decimal"), strings.HasPrefix(mysqlType, "double"), strings.HasPrefix(mysqlType, "float"): - return PropertyType{Type: "number"} - case strings.HasPrefix(mysqlType, "datetime"), strings.HasPrefix(mysqlType, "timestamp"): - return PropertyType{Type: "string", CustomFormat: "date-time", AirbyteType: "timestamp_without_timezone"} - case strings.HasPrefix(mysqlType, "date"): - return PropertyType{Type: "string", CustomFormat: "date", AirbyteType: "date"} - case strings.HasPrefix(mysqlType, "time"): - return PropertyType{Type: "string", CustomFormat: "time", AirbyteType: "time_without_timezone"} - default: - return PropertyType{Type: "string"} - } -} - -func (p PlanetScaleEdgeDatabase) Close() error { - return p.Mysql.Close() -} - -func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScaleSource) ([]string, error) { - return p.Mysql.GetVitessShards(ctx, psc) -} - -// Read streams rows from a table given a starting cursor. -// 1. We will get the latest vgtid for a given table in a shard when a sync session starts. -// 2. This latest vgtid is now the stopping point for this sync session. -// 3. Ask vstream to stream from the last known vgtid -// 4. When we reach the stopping point, read all rows available at this vgtid -// 5. End the stream when (a) a vgtid newer than latest vgtid is encountered or (b) the timeout kicks in. -func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, lastKnownPosition *psdbconnect.TableCursor) (*SerializedCursor, error) { - var ( - err error - sErr error - currentSerializedCursor *SerializedCursor - ) - - tabletType := psdbconnect.TabletType_primary - if ps.UseReplica { - tabletType = psdbconnect.TabletType_replica - } - - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("Syncing from tabletType \"%v\"", TabletTypeToString(tabletType))) - - currentPosition := lastKnownPosition - table := s.Stream - readDuration := 1 * time.Minute - preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", table.Namespace, TabletTypeToString(tabletType), table.Name, currentPosition.Shard) - for { - p.Logger.Log(LOGLEVEL_INFO, preamble+"peeking to see if there's any new rows") - latestCursorPosition, lcErr := p.getLatestCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType) - if lcErr != nil { - return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position") - } - - // the current vgtid is the same as the last synced vgtid, no new rows. - if latestCursorPosition == currentPosition.Position { - p.Logger.Log(LOGLEVEL_INFO, preamble+"no new rows found, exiting") - return TableCursorToSerializedCursor(currentPosition) - } - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("new rows found, syncing rows for %v", readDuration)) - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"syncing rows with cursor [%v]", currentPosition)) - - currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration) - if currentPosition.Position != "" { - currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition) - if sErr != nil { - // if we failed to serialize here, we should bail. - return currentSerializedCursor, errors.Wrap(sErr, "unable to serialize current position") - } - } - if err != nil { - if s, ok := status.FromError(err); ok { - // if the error is anything other than server timeout, keep going - if s.Code() != codes.DeadlineExceeded { - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%v Got error [%v], Returning with cursor :[%v] after server timeout", preamble, s.Code(), currentPosition)) - return currentSerializedCursor, nil - } else { - p.Logger.Log(LOGLEVEL_INFO, preamble+"Continuing with cursor after server timeout") - } - } else if errors.Is(err, io.EOF) { - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%vFinished reading all rows for table [%v]", preamble, table.Name)) - return currentSerializedCursor, nil - } else { - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("non-grpc error [%v]]", err)) - return currentSerializedCursor, err - } - } - } -} - -func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, error) { - defer p.Logger.Flush() - ctx, cancel := context.WithTimeout(ctx, readDuration) - defer cancel() - - var ( - err error - client psdbconnect.ConnectClient - ) - - if p.clientFn == nil { - conn, err := grpcclient.Dial(ctx, ps.Host, - clientoptions.WithDefaultTLSConfig(), - clientoptions.WithCompression(true), - clientoptions.WithConnectionPool(1), - clientoptions.WithExtraCallOption( - auth.NewBasicAuth(ps.Username, ps.Password).CallOption(), - ), - ) - if err != nil { - return tc, err - } - defer conn.Close() - client = psdbconnect.NewConnectClient(conn) - } else { - client, err = p.clientFn(ctx, ps) - if err != nil { - return tc, err - } - } - - if tc.LastKnownPk != nil { - tc.Position = "" - } - - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("Syncing with cursor position : [%v], using last known PK : %v, stop cursor is : [%v]", tc.Position, tc.LastKnownPk != nil, stopPosition)) - - sReq := &psdbconnect.SyncRequest{ - TableName: s.Name, - Cursor: tc, - TabletType: tabletType, - Cells: []string{"planetscale_operator_default"}, - } - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("DEBUG: SyncRequest.Cells = %v", sReq.GetCells())) - - c, err := client.Sync(ctx, sReq) - if err != nil { - return tc, err - } - - keyspaceOrDatabase := s.Namespace - if keyspaceOrDatabase == "" { - keyspaceOrDatabase = ps.Database - } - - // stop when we've reached the well known stop position for this sync session. - watchForVgGtidChange := false - - for { - - res, err := c.Recv() - if err != nil { - return tc, err - } - - if res.Cursor != nil { - tc = res.Cursor - } - - // Because of the ordering of events in a vstream - // we receive the vgtid event first and then the rows. - // the vgtid event might repeat, but they're ordered. - // so we once we reach the desired stop vgtid, we stop the sync session - // if we get a newer vgtid. - watchForVgGtidChange = watchForVgGtidChange || tc.Position == stopPosition - - if len(res.Result) > 0 { - for _, result := range res.Result { - qr := sqltypes.Proto3ToResult(result) - for _, row := range qr.Rows { - sqlResult := &sqltypes.Result{ - Fields: result.Fields, - } - sqlResult.Rows = append(sqlResult.Rows, row) - // print AirbyteRecord messages to stdout here. - p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name) - } - } - } - - if watchForVgGtidChange && tc.Position != stopPosition { - return tc, io.EOF - } - } -} - -func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) { - defer p.Logger.Flush() - timeout := 45 * time.Second - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - var ( - err error - client psdbconnect.ConnectClient - ) - - if p.clientFn == nil { - conn, err := grpcclient.Dial(ctx, ps.Host, - clientoptions.WithDefaultTLSConfig(), - clientoptions.WithCompression(true), - clientoptions.WithConnectionPool(1), - clientoptions.WithExtraCallOption( - auth.NewBasicAuth(ps.Username, ps.Password).CallOption(), - ), - ) - if err != nil { - return "", err - } - defer conn.Close() - client = psdbconnect.NewConnectClient(conn) - } else { - client, err = p.clientFn(ctx, ps) - if err != nil { - return "", err - } - } - - sReq := &psdbconnect.SyncRequest{ - TableName: s.Name, - Cursor: &psdbconnect.TableCursor{ - Shard: shard, - Keyspace: keyspace, - Position: "current", - }, - TabletType: tabletType, - Cells: []string{"planetscale_operator_default"}, - } - - c, err := client.Sync(ctx, sReq) - if err != nil { - return "", nil - } - - for { - res, err := c.Recv() - if err != nil { - return "", err - } - - if res.Cursor != nil { - return res.Cursor.Position, nil - } - } -} - -// printQueryResult will pretty-print an AirbyteRecordMessage to the logger. -// Copied from vtctl/query.go -func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, tableNamespace, tableName string) { - data := QueryResultToRecords(qr) - - for _, record := range data { - p.Logger.Record(tableNamespace, tableName, record) - } -} diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go deleted file mode 100644 index b492148..0000000 --- a/cmd/internal/planetscale_edge_database_test.go +++ /dev/null @@ -1,721 +0,0 @@ -package internal - -import ( - "bytes" - "context" - "fmt" - "os" - "testing" - - psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" - "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/proto/query" -) - -func TestRead_CanPeekBeforeRead(t *testing.T) { - tma := getTestMysqlAccess() - b := bytes.NewBufferString("") - ped := PlanetScaleEdgeDatabase{ - Logger: NewLogger(b), - Mysql: tma, - } - tc := &psdbconnect.TableCursor{ - Shard: "-", - Position: "THIS_IS_A_SHARD_GTID", - Keyspace: "connect-test", - } - - syncClient := &connectSyncClientMock{ - syncResponses: []*psdbconnect.SyncResponse{ - { - Cursor: tc, - }, - { - Cursor: tc, - }, - }, - } - - cc := clientConnectionMock{ - syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { - return syncClient, nil - }, - } - ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { - return &cc, nil - } - ps := PlanetScaleSource{} - cs := ConfiguredStream{ - Stream: Stream{ - Name: "customers", - Namespace: "connect-test", - }, - } - sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc) - assert.NoError(t, err) - esc, err := TableCursorToSerializedCursor(tc) - assert.NoError(t, err) - assert.Equal(t, esc, sc) - assert.Equal(t, 1, cc.syncFnInvokedCount) - assert.False(t, tma.PingContextFnInvoked) - assert.False(t, tma.GetVitessTabletsFnInvoked) -} - -func TestRead_CanEarlyExitIfNoNewVGtidInPeek(t *testing.T) { - tma := getTestMysqlAccess() - b := bytes.NewBufferString("") - ped := PlanetScaleEdgeDatabase{ - Logger: NewLogger(b), - Mysql: tma, - } - tc := &psdbconnect.TableCursor{ - Shard: "-", - Position: "THIS_IS_A_SHARD_GTID", - Keyspace: "connect-test", - } - - syncClient := &connectSyncClientMock{ - syncResponses: []*psdbconnect.SyncResponse{ - {Cursor: tc}, - }, - } - - cc := clientConnectionMock{ - syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { - return syncClient, nil - }, - } - ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { - return &cc, nil - } - ps := PlanetScaleSource{} - cs := ConfiguredStream{ - Stream: Stream{ - Name: "customers", - Namespace: "connect-test", - }, - } - sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc) - assert.NoError(t, err) - esc, err := TableCursorToSerializedCursor(tc) - assert.NoError(t, err) - assert.Equal(t, esc, sc) - assert.Equal(t, 1, cc.syncFnInvokedCount) -} - -func TestRead_CanPickPrimaryForShardedKeyspaces(t *testing.T) { - tma := getTestMysqlAccess() - b := bytes.NewBufferString("") - ped := PlanetScaleEdgeDatabase{ - Logger: NewLogger(b), - Mysql: tma, - } - tc := &psdbconnect.TableCursor{ - Shard: "40-80", - Position: "THIS_IS_A_SHARD_GTID", - Keyspace: "connect-test", - } - - syncClient := &connectSyncClientMock{ - syncResponses: []*psdbconnect.SyncResponse{ - {Cursor: tc}, - }, - } - - cc := clientConnectionMock{ - syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { - assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType) - assert.Contains(t, in.Cells, "planetscale_operator_default") - return syncClient, nil - }, - } - ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { - return &cc, nil - } - ps := PlanetScaleSource{ - Database: "connect-test", - } - cs := ConfiguredStream{ - Stream: Stream{ - Name: "customers", - Namespace: "connect-test", - }, - } - sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc) - assert.NoError(t, err) - esc, err := TableCursorToSerializedCursor(tc) - assert.NoError(t, err) - assert.Equal(t, esc, sc) - assert.Equal(t, 1, cc.syncFnInvokedCount) - assert.False(t, tma.PingContextFnInvoked) - assert.False(t, tma.GetVitessTabletsFnInvoked) -} - -func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) { - tma := getTestMysqlAccess() - b := bytes.NewBufferString("") - ped := PlanetScaleEdgeDatabase{ - Logger: NewLogger(b), - Mysql: tma, - } - tc := &psdbconnect.TableCursor{ - Shard: "40-80", - Position: "THIS_IS_A_SHARD_GTID", - Keyspace: "connect-test", - } - - syncClient := &connectSyncClientMock{ - syncResponses: []*psdbconnect.SyncResponse{ - {Cursor: tc}, - }, - } - - cc := clientConnectionMock{ - syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { - assert.Equal(t, psdbconnect.TabletType_replica, in.TabletType) - assert.Contains(t, in.Cells, "planetscale_operator_default") - return syncClient, nil - }, - } - ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { - return &cc, nil - } - ps := PlanetScaleSource{ - Database: "connect-test", - UseReplica: true, - } - cs := ConfiguredStream{ - Stream: Stream{ - Name: "customers", - Namespace: "connect-test", - }, - } - sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc) - assert.NoError(t, err) - esc, err := TableCursorToSerializedCursor(tc) - assert.NoError(t, err) - assert.Equal(t, esc, sc) - assert.Equal(t, 1, cc.syncFnInvokedCount) - assert.False(t, tma.PingContextFnInvoked) - assert.False(t, tma.GetVitessTabletsFnInvoked) -} - -func TestDiscover_CanPickRightAirbyteType(t *testing.T) { - var tests = []struct { - MysqlType string - JSONSchemaType string - AirbyteType string - TreatTinyIntAsBoolean bool - }{ - { - MysqlType: "int(11)", - JSONSchemaType: "number", - AirbyteType: "integer", - }, - { - MysqlType: "smallint(4)", - JSONSchemaType: "number", - AirbyteType: "integer", - }, - { - MysqlType: "mediumint(8)", - JSONSchemaType: "number", - AirbyteType: "integer", - }, - { - MysqlType: "tinyint", - JSONSchemaType: "number", - AirbyteType: "integer", - TreatTinyIntAsBoolean: true, - }, - { - MysqlType: "tinyint(1)", - JSONSchemaType: "boolean", - AirbyteType: "", - TreatTinyIntAsBoolean: true, - }, - { - MysqlType: "tinyint(1) unsigned", - JSONSchemaType: "boolean", - AirbyteType: "", - TreatTinyIntAsBoolean: true, - }, - { - MysqlType: "tinyint(1)", - JSONSchemaType: "number", - AirbyteType: "integer", - TreatTinyIntAsBoolean: false, - }, - { - MysqlType: "tinyint(1) unsigned", - JSONSchemaType: "number", - AirbyteType: "integer", - TreatTinyIntAsBoolean: false, - }, - { - MysqlType: "bigint(16)", - JSONSchemaType: "number", - AirbyteType: "integer", - }, - { - MysqlType: "bigint unsigned", - JSONSchemaType: "number", - AirbyteType: "integer", - }, - { - MysqlType: "bigint zerofill", - JSONSchemaType: "number", - AirbyteType: "integer", - }, - { - MysqlType: "datetime", - JSONSchemaType: "string", - AirbyteType: "timestamp_without_timezone", - }, - { - MysqlType: "datetime(6)", - JSONSchemaType: "string", - AirbyteType: "timestamp_without_timezone", - }, - { - MysqlType: "time", - JSONSchemaType: "string", - AirbyteType: "time_without_timezone", - }, - { - MysqlType: "time(6)", - JSONSchemaType: "string", - AirbyteType: "time_without_timezone", - }, - { - MysqlType: "date", - JSONSchemaType: "string", - AirbyteType: "date", - }, - { - MysqlType: "text", - JSONSchemaType: "string", - AirbyteType: "", - }, - { - MysqlType: "varchar(256)", - JSONSchemaType: "string", - AirbyteType: "", - }, - { - MysqlType: "decimal(12,5)", - JSONSchemaType: "number", - AirbyteType: "", - }, - { - MysqlType: "double", - JSONSchemaType: "number", - AirbyteType: "", - }, - { - MysqlType: "float(30)", - JSONSchemaType: "number", - AirbyteType: "", - }, - } - - for _, typeTest := range tests { - - t.Run(fmt.Sprintf("mysql_type_%v", typeTest.MysqlType), func(t *testing.T) { - p := getJsonSchemaType(typeTest.MysqlType, typeTest.TreatTinyIntAsBoolean) - assert.Equal(t, typeTest.AirbyteType, p.AirbyteType) - assert.Equal(t, typeTest.JSONSchemaType, p.Type) - }) - } -} -func TestRead_CanPickPrimaryForUnshardedKeyspaces(t *testing.T) { - tma := getTestMysqlAccess() - b := bytes.NewBufferString("") - ped := PlanetScaleEdgeDatabase{ - Logger: NewLogger(b), - Mysql: tma, - } - tc := &psdbconnect.TableCursor{ - Shard: "-", - Position: "THIS_IS_A_SHARD_GTID", - Keyspace: "connect-test", - } - - syncClient := &connectSyncClientMock{ - syncResponses: []*psdbconnect.SyncResponse{ - { - Cursor: tc, - }, - }, - } - - cc := clientConnectionMock{ - syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { - assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType) - assert.Contains(t, in.Cells, "planetscale_operator_default") - return syncClient, nil - }, - } - ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { - return &cc, nil - } - ps := PlanetScaleSource{ - Database: "connect-test", - } - cs := ConfiguredStream{ - Stream: Stream{ - Name: "customers", - Namespace: "connect-test", - }, - } - sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc) - assert.NoError(t, err) - esc, err := TableCursorToSerializedCursor(tc) - assert.NoError(t, err) - assert.Equal(t, esc, sc) - assert.Equal(t, 1, cc.syncFnInvokedCount) - assert.False(t, tma.PingContextFnInvoked) - assert.False(t, tma.GetVitessTabletsFnInvoked) -} - -func TestRead_CanPickReplicaForUnshardedKeyspaces(t *testing.T) { - tma := getTestMysqlAccess() - b := bytes.NewBufferString("") - ped := PlanetScaleEdgeDatabase{ - Logger: NewLogger(b), - Mysql: tma, - } - tc := &psdbconnect.TableCursor{ - Shard: "-", - Position: "THIS_IS_A_SHARD_GTID", - Keyspace: "connect-test", - } - - syncClient := &connectSyncClientMock{ - syncResponses: []*psdbconnect.SyncResponse{ - { - Cursor: tc, - }, - }, - } - - cc := clientConnectionMock{ - syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { - assert.Equal(t, psdbconnect.TabletType_replica, in.TabletType) - assert.Contains(t, in.Cells, "planetscale_operator_default") - return syncClient, nil - }, - } - ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { - return &cc, nil - } - ps := PlanetScaleSource{ - Database: "connect-test", - UseReplica: true, - } - cs := ConfiguredStream{ - Stream: Stream{ - Name: "customers", - Namespace: "connect-test", - }, - } - sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc) - assert.NoError(t, err) - esc, err := TableCursorToSerializedCursor(tc) - assert.NoError(t, err) - assert.Equal(t, esc, sc) - assert.Equal(t, 1, cc.syncFnInvokedCount) - assert.False(t, tma.PingContextFnInvoked) - assert.False(t, tma.GetVitessTabletsFnInvoked) -} - -func TestRead_CanReturnOriginalCursorIfNoNewFound(t *testing.T) { - tma := getTestMysqlAccess() - b := bytes.NewBufferString("") - ped := PlanetScaleEdgeDatabase{ - Logger: NewLogger(b), - Mysql: tma, - } - tc := &psdbconnect.TableCursor{ - Shard: "-", - Position: "THIS_IS_A_SHARD_GTID", - Keyspace: "connect-test", - } - - syncClient := &connectSyncClientMock{ - syncResponses: []*psdbconnect.SyncResponse{ - {Cursor: tc}, - }, - } - - cc := clientConnectionMock{ - syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { - assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType) - return syncClient, nil - }, - } - ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { - return &cc, nil - } - ps := PlanetScaleSource{ - Database: "connect-test", - } - cs := ConfiguredStream{ - Stream: Stream{ - Name: "customers", - Namespace: "connect-test", - }, - } - sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc) - assert.NoError(t, err) - esc, err := TableCursorToSerializedCursor(tc) - assert.NoError(t, err) - assert.Equal(t, esc, sc) - assert.Equal(t, 1, cc.syncFnInvokedCount) -} - -func TestRead_CanReturnNewCursorIfNewFound(t *testing.T) { - tma := getTestMysqlAccess() - b := bytes.NewBufferString("") - ped := PlanetScaleEdgeDatabase{ - Logger: NewLogger(b), - Mysql: tma, - } - tc := &psdbconnect.TableCursor{ - Shard: "-", - Position: "THIS_IS_A_SHARD_GTID", - Keyspace: "connect-test", - } - newTC := &psdbconnect.TableCursor{ - Shard: "-", - Position: "I_AM_FARTHER_IN_THE_BINLOG", - Keyspace: "connect-test", - } - - syncClient := &connectSyncClientMock{ - syncResponses: []*psdbconnect.SyncResponse{ - {Cursor: newTC}, - {Cursor: newTC}, - }, - } - - cc := clientConnectionMock{ - syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { - assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType) - return syncClient, nil - }, - } - ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { - return &cc, nil - } - ps := PlanetScaleSource{ - Database: "connect-test", - } - cs := ConfiguredStream{ - Stream: Stream{ - Name: "customers", - Namespace: "connect-test", - }, - } - sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc) - assert.NoError(t, err) - esc, err := TableCursorToSerializedCursor(newTC) - assert.NoError(t, err) - assert.Equal(t, esc, sc) - assert.Equal(t, 2, cc.syncFnInvokedCount) -} - -func TestRead_CanStopAtWellKnownCursor(t *testing.T) { - tma := getTestMysqlAccess() - tal := testAirbyteLogger{} - ped := PlanetScaleEdgeDatabase{ - Logger: &tal, - Mysql: tma, - } - - numResponses := 10 - // when the client tries to get the "current" vgtid, - // we return the ante-penultimate element of the array. - currentVGtidPosition := (numResponses * 3) - 4 - // this is the next vgtid that should stop the sync session. - nextVGtidPosition := currentVGtidPosition + 1 - responses := make([]*psdbconnect.SyncResponse, 0, numResponses) - for i := 0; i < numResponses; i++ { - // this simulates multiple events being returned, for the same vgtid, from vstream - for x := 0; x < 3; x++ { - var result []*query.QueryResult - if x == 2 { - result = []*query.QueryResult{ - sqltypes.ResultToProto3(sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "pid|description", - "int64|varbinary"), - fmt.Sprintf("%v|keyboard", i+1), - fmt.Sprintf("%v|monitor", i+2), - )), - } - } - - vgtid := fmt.Sprintf("e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-%v", i) - responses = append(responses, &psdbconnect.SyncResponse{ - Cursor: &psdbconnect.TableCursor{ - Shard: "-", - Keyspace: "connect-test", - Position: vgtid, - }, - Result: result, - }) - } - } - - syncClient := &connectSyncClientMock{ - syncResponses: responses, - } - - getCurrentVGtidClient := &connectSyncClientMock{ - syncResponses: []*psdbconnect.SyncResponse{ - responses[currentVGtidPosition], - }, - } - - cc := clientConnectionMock{ - syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { - assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType) - if in.Cursor.Position == "current" { - return getCurrentVGtidClient, nil - } - - return syncClient, nil - }, - } - - ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { - return &cc, nil - } - ps := PlanetScaleSource{ - Database: "connect-test", - } - cs := ConfiguredStream{ - Stream: Stream{ - Name: "customers", - Namespace: "connect-test", - }, - } - - sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, responses[0].Cursor) - assert.NoError(t, err) - // sync should start at the first vgtid - esc, err := TableCursorToSerializedCursor(responses[nextVGtidPosition].Cursor) - assert.NoError(t, err) - assert.Equal(t, esc, sc) - assert.Equal(t, 2, cc.syncFnInvokedCount) - - logLines := tal.logMessages[LOGLEVEL_INFO] - assert.Equal(t, "[connect-test:primary:customers shard : -] Finished reading all rows for table [customers]", logLines[len(logLines)-1]) - records := tal.records["connect-test.customers"] - assert.Equal(t, 2*(nextVGtidPosition/3), len(records)) -} - -func TestRead_CanLogResults(t *testing.T) { - tma := getTestMysqlAccess() - tal := testAirbyteLogger{} - ped := PlanetScaleEdgeDatabase{ - Logger: &tal, - Mysql: tma, - } - tc := &psdbconnect.TableCursor{ - Shard: "-", - Position: "THIS_IS_A_SHARD_GTID", - Keyspace: "connect-test", - } - newTC := &psdbconnect.TableCursor{ - Shard: "-", - Position: "I_AM_FARTHER_IN_THE_BINLOG", - Keyspace: "connect-test", - } - - result := []*query.QueryResult{ - sqltypes.ResultToProto3(sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "pid|description", - "int64|varbinary"), - "1|keyboard", - "2|monitor", - )), - } - - syncClient := &connectSyncClientMock{ - syncResponses: []*psdbconnect.SyncResponse{ - {Cursor: newTC, Result: result}, - {Cursor: newTC, Result: result}, - }, - } - - cc := clientConnectionMock{ - syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { - assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType) - return syncClient, nil - }, - } - ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { - return &cc, nil - } - ps := PlanetScaleSource{ - Database: "connect-test", - } - cs := ConfiguredStream{ - Stream: Stream{ - Name: "products", - Namespace: "connect-test", - }, - } - sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc) - assert.NoError(t, err) - assert.NotNil(t, sc) - assert.Equal(t, 2, len(tal.records["connect-test.products"])) - records := tal.records["connect-test.products"] - keyboardFound := false - monitorFound := false - for _, r := range records { - id, err := r["pid"].(sqltypes.Value).ToInt64() - assert.NoError(t, err) - if id == 1 { - assert.False(t, keyboardFound, "should not find keyboard twice") - keyboardFound = true - assert.Equal(t, "keyboard", r["description"].(sqltypes.Value).ToString()) - } - - if id == 2 { - assert.False(t, monitorFound, "should not find monitor twice") - monitorFound = true - assert.Equal(t, "monitor", r["description"].(sqltypes.Value).ToString()) - } - } - assert.True(t, keyboardFound) - assert.True(t, monitorFound) -} - -func getTestMysqlAccess() *mysqlAccessMock { - tma := mysqlAccessMock{ - PingContextFn: func(ctx context.Context, source PlanetScaleSource) error { - return nil - }, - GetVitessTabletsFn: func(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) { - return []VitessTablet{ - { - Cell: "test_cell_primary", - Keyspace: "connect-test", - TabletType: TabletTypeToString(psdbconnect.TabletType_primary), - State: "SERVING", - }, - { - Cell: "test_cell_replica", - Keyspace: "connect-test", - TabletType: TabletTypeToString(psdbconnect.TabletType_replica), - State: "SERVING", - }, - }, nil - }, - } - return &tma -} diff --git a/cmd/internal/planetscale_edge_mysql.go b/cmd/internal/planetscale_edge_mysql.go deleted file mode 100644 index 8950dbc..0000000 --- a/cmd/internal/planetscale_edge_mysql.go +++ /dev/null @@ -1,194 +0,0 @@ -package internal - -import ( - "context" - "database/sql" - "fmt" - "github.com/pkg/errors" - "strings" - "time" -) - -type VitessTablet struct { - Cell string - Keyspace string - Shard string - TabletType string - State string - Alias string - Hostname string - PrimaryTermStartTime string -} -type PlanetScaleEdgeMysqlAccess interface { - PingContext(context.Context, PlanetScaleSource) error - GetTableNames(context.Context, PlanetScaleSource) ([]string, error) - GetTableSchema(context.Context, PlanetScaleSource, string) (map[string]PropertyType, error) - GetTablePrimaryKeys(context.Context, PlanetScaleSource, string) ([]string, error) - GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error) - GetVitessTablets(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) - Close() error -} - -func NewMySQL(psc *PlanetScaleSource) (PlanetScaleEdgeMysqlAccess, error) { - db, err := sql.Open("mysql", psc.DSN()) - if err != nil { - return nil, err - } - - return planetScaleEdgeMySQLAccess{ - db: db, - }, nil -} - -type planetScaleEdgeMySQLAccess struct { - db *sql.DB -} - -func (p planetScaleEdgeMySQLAccess) Close() error { - return p.db.Close() -} - -func (p planetScaleEdgeMySQLAccess) GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error) { - var shards []string - - // TODO: is there a prepared statement equivalent? - shardNamesQR, err := p.db.QueryContext( - ctx, - `show vitess_shards like "%`+psc.Database+`%";`, - ) - if err != nil { - return shards, errors.Wrap(err, "Unable to query database for shards") - } - - for shardNamesQR.Next() { - var name string - if err = shardNamesQR.Scan(&name); err != nil { - return shards, errors.Wrap(err, "unable to get shard names") - } - - shards = append(shards, strings.TrimPrefix(name, psc.Database+"/")) - } - - if err := shardNamesQR.Err(); err != nil { - return shards, errors.Wrapf(err, "unable to iterate shard names for %s", psc.Database) - } - return shards, nil -} - -func (p planetScaleEdgeMySQLAccess) GetVitessTablets(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) { - var tablets []VitessTablet - - tabletsQR, err := p.db.QueryContext(ctx, "Show vitess_tablets") - if err != nil { - return tablets, err - } - - for tabletsQR.Next() { - vt := VitessTablet{} - // output is of the form : - //aws_useast1c_5 connect-test - PRIMARY SERVING aws_useast1c_5-2797914161 10.200.131.217 2022-05-09T14:11:56Z - //aws_useast1c_5 connect-test - REPLICA SERVING aws_useast1c_5-1559247072 10.200.178.136 - //aws_useast1c_5 connect-test - PRIMARY SERVING aws_useast1c_5-2797914161 10.200.131.217 2022-05-09T14:11:56Z - //aws_useast1c_5 connect-test - REPLICA SERVING aws_useast1c_5-1559247072 10.200.178.136 - err := tabletsQR.Scan(&vt.Cell, &vt.Keyspace, &vt.Shard, &vt.TabletType, &vt.State, &vt.Alias, &vt.Hostname, &vt.PrimaryTermStartTime) - if err != nil { - return tablets, err - } - tablets = append(tablets, vt) - } - if err := tabletsQR.Err(); err != nil { - return tablets, errors.Wrapf(err, "unable to iterate tablets for %s", psc.Database) - } - return tablets, nil -} - -func (p planetScaleEdgeMySQLAccess) PingContext(ctx context.Context, psc PlanetScaleSource) error { - - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - return p.db.PingContext(ctx) -} - -func (p planetScaleEdgeMySQLAccess) GetTableNames(ctx context.Context, psc PlanetScaleSource) ([]string, error) { - var tables []string - - tableNamesQR, err := p.db.Query(fmt.Sprintf("show tables from `%s`;", psc.Database)) - if err != nil { - return tables, errors.Wrap(err, "Unable to query database for schema") - } - - for tableNamesQR.Next() { - var name string - if err = tableNamesQR.Scan(&name); err != nil { - return tables, errors.Wrap(err, "unable to get table names") - } - - tables = append(tables, name) - } - - if err := tableNamesQR.Err(); err != nil { - return tables, errors.Wrap(err, "unable to iterate table rows") - } - - return tables, err -} - -func (p planetScaleEdgeMySQLAccess) GetTableSchema(ctx context.Context, psc PlanetScaleSource, tableName string) (map[string]PropertyType, error) { - properties := map[string]PropertyType{} - - columnNamesQR, err := p.db.QueryContext( - ctx, - "select column_name, column_type from information_schema.columns where table_name=? AND table_schema=?;", - tableName, psc.Database, - ) - if err != nil { - return properties, errors.Wrapf(err, "Unable to get column names & types for table %v", tableName) - } - - for columnNamesQR.Next() { - var ( - name string - columnType string - ) - if err = columnNamesQR.Scan(&name, &columnType); err != nil { - return properties, errors.Wrapf(err, "Unable to scan row for column names & types of table %v", tableName) - } - - properties[name] = getJsonSchemaType(columnType, !psc.Options.DoNotTreatTinyIntAsBoolean) - } - - if err := columnNamesQR.Err(); err != nil { - return properties, errors.Wrapf(err, "unable to iterate columns for table %s", tableName) - } - - return properties, nil -} - -func (p planetScaleEdgeMySQLAccess) GetTablePrimaryKeys(ctx context.Context, psc PlanetScaleSource, tableName string) ([]string, error) { - var primaryKeys []string - - primaryKeysQR, err := p.db.QueryContext( - ctx, - "select column_name from information_schema.columns where table_schema=? AND table_name=? AND column_key='PRI';", - psc.Database, tableName, - ) - - if err != nil { - return primaryKeys, errors.Wrapf(err, "Unable to scan row for primary keys of table %v", tableName) - } - - for primaryKeysQR.Next() { - var name string - if err = primaryKeysQR.Scan(&name); err != nil { - return primaryKeys, errors.Wrapf(err, "Unable to scan row for primary keys of table %v", tableName) - } - - primaryKeys = append(primaryKeys, name) - } - - if err := primaryKeysQR.Err(); err != nil { - return primaryKeys, errors.Wrapf(err, "unable to iterate primary keys for table %s", tableName) - } - - return primaryKeys, nil -} diff --git a/cmd/internal/result_builder.go b/cmd/internal/result_builder.go new file mode 100644 index 0000000..f9e5302 --- /dev/null +++ b/cmd/internal/result_builder.go @@ -0,0 +1,49 @@ +package internal + +import ( + "github.com/pkg/errors" + psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" + "github.com/planetscale/connect-sdk/lib" + "vitess.io/vitess/go/sqltypes" +) + +type ResultBuilder struct { + keyspace string + table string + logger AirbyteLogger + HandleOnCursor func(tc *psdbconnect.TableCursor) error +} + +func NewResultBuilder(logger AirbyteLogger) lib.ResultBuilder { + return &ResultBuilder{ + logger: logger, + } +} + +func (rb *ResultBuilder) OnResult(result *sqltypes.Result, operation lib.Operation) error { + data := QueryResultToRecords(result) + for _, record := range data { + rb.logger.Record(rb.keyspace, rb.table, record) + } + return nil +} + +func (ResultBuilder) OnUpdate(row *lib.UpdatedRow) error { + return nil +} + +func (rb *ResultBuilder) OnCursor(cursor *psdbconnect.TableCursor) error { + if rb.HandleOnCursor == nil { + return errors.New("Unhandled onCursor event") + } + + return rb.HandleOnCursor(cursor) +} + +func (rb *ResultBuilder) SetKeyspace(keyspace string) { + rb.keyspace = keyspace +} + +func (rb *ResultBuilder) SetTable(table string) { + rb.table = table +} diff --git a/cmd/internal/schema_builder.go b/cmd/internal/schema_builder.go new file mode 100644 index 0000000..9367c78 --- /dev/null +++ b/cmd/internal/schema_builder.go @@ -0,0 +1,112 @@ +package internal + +import ( + "github.com/planetscale/connect-sdk/lib" + "regexp" + "strings" +) + +const ( + gCTableNameExpression string = `^_vt_(HOLD|PURGE|EVAC|DROP)_([0-f]{32})_([0-9]{14})$` +) + +var gcTableNameRegexp = regexp.MustCompile(gCTableNameExpression) + +type SchemaBuilder struct { + streams map[string]map[string]*Stream +} + +func NewSchemaBuilder() lib.SchemaBuilder { + return &SchemaBuilder{} +} + +func (sb *SchemaBuilder) OnKeyspace(_ string) { + // no-op as Airbyte has schemas as a flat list. +} + +func (sb *SchemaBuilder) OnTable(keyspaceName, tableName string) { + // skip any that are Vitess's GC tables. + if gcTableNameRegexp.MatchString(tableName) { + return + } + + schema := StreamSchema{ + Type: "object", + Properties: map[string]PropertyType{}, + } + + stream := &Stream{ + Name: tableName, + Schema: schema, + SupportedSyncModes: []string{"full_refresh", "incremental"}, + Namespace: keyspaceName, + } + + if sb.streams == nil { + sb.streams = make(map[string]map[string]*Stream) + } + + if _, ok := sb.streams[keyspaceName]; !ok { + sb.streams[keyspaceName] = make(map[string]*Stream) + } + + sb.streams[keyspaceName][tableName] = stream +} + +func (sb *SchemaBuilder) OnColumns(keyspaceName, tableName string, columns []lib.MysqlColumn) { + if _, ok := sb.streams[keyspaceName]; !ok { + return + } + + if _, ok := sb.streams[keyspaceName][tableName]; !ok { + return + } + + table := sb.streams[keyspaceName][tableName] + table.PrimaryKeys = [][]string{} + table.DefaultCursorFields = []string{} + + for _, column := range columns { + if column.IsPrimaryKey { + table.PrimaryKeys = append(table.PrimaryKeys, []string{column.Name}) + table.DefaultCursorFields = append(table.DefaultCursorFields, column.Name) + } + + table.Schema.Properties[column.Name] = getAirbyteDataType(column.Type, true) + } +} + +func (sb *SchemaBuilder) GetCatalog() Catalog { + c := Catalog{} + for _, keyspace := range sb.streams { + for _, table := range keyspace { + c.Streams = append(c.Streams, *table) + } + } + return c +} + +// Convert columnType to Airbyte type. +func getAirbyteDataType(mysqlType string, treatTinyIntAsBoolean bool) PropertyType { + // Support custom airbyte types documented here : + // https://docs.airbyte.com/understanding-airbyte/supported-data-types/#the-types + switch { + case strings.HasPrefix(mysqlType, "tinyint(1)"): + if treatTinyIntAsBoolean { + return PropertyType{Type: "boolean"} + } + return PropertyType{Type: "number", AirbyteType: "integer"} + case strings.HasPrefix(mysqlType, "int"), strings.HasPrefix(mysqlType, "smallint"), strings.HasPrefix(mysqlType, "mediumint"), strings.HasPrefix(mysqlType, "bigint"), strings.HasPrefix(mysqlType, "tinyint"): + return PropertyType{Type: "number", AirbyteType: "integer"} + case strings.HasPrefix(mysqlType, "decimal"), strings.HasPrefix(mysqlType, "double"), strings.HasPrefix(mysqlType, "float"): + return PropertyType{Type: "number"} + case strings.HasPrefix(mysqlType, "datetime"), strings.HasPrefix(mysqlType, "timestamp"): + return PropertyType{Type: "string", CustomFormat: "date-time", AirbyteType: "timestamp_without_timezone"} + case strings.HasPrefix(mysqlType, "date"): + return PropertyType{Type: "string", CustomFormat: "date", AirbyteType: "date"} + case strings.HasPrefix(mysqlType, "time"): + return PropertyType{Type: "string", CustomFormat: "time", AirbyteType: "time_without_timezone"} + default: + return PropertyType{Type: "string"} + } +} diff --git a/cmd/internal/schema_builder_test.go b/cmd/internal/schema_builder_test.go new file mode 100644 index 0000000..9cb6335 --- /dev/null +++ b/cmd/internal/schema_builder_test.go @@ -0,0 +1,136 @@ +package internal + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestSchemaBuilder_CanPickRightAirbyteType(t *testing.T) { + var tests = []struct { + MysqlType string + JSONSchemaType string + AirbyteType string + TreatTinyIntAsBoolean bool + }{ + { + MysqlType: "int(11)", + JSONSchemaType: "number", + AirbyteType: "integer", + }, + { + MysqlType: "smallint(4)", + JSONSchemaType: "number", + AirbyteType: "integer", + }, + { + MysqlType: "mediumint(8)", + JSONSchemaType: "number", + AirbyteType: "integer", + }, + { + MysqlType: "tinyint", + JSONSchemaType: "number", + AirbyteType: "integer", + TreatTinyIntAsBoolean: true, + }, + { + MysqlType: "tinyint(1)", + JSONSchemaType: "boolean", + AirbyteType: "", + TreatTinyIntAsBoolean: true, + }, + { + MysqlType: "tinyint(1) unsigned", + JSONSchemaType: "boolean", + AirbyteType: "", + TreatTinyIntAsBoolean: true, + }, + { + MysqlType: "tinyint(1)", + JSONSchemaType: "number", + AirbyteType: "integer", + TreatTinyIntAsBoolean: false, + }, + { + MysqlType: "tinyint(1) unsigned", + JSONSchemaType: "number", + AirbyteType: "integer", + TreatTinyIntAsBoolean: false, + }, + { + MysqlType: "bigint(16)", + JSONSchemaType: "number", + AirbyteType: "integer", + }, + { + MysqlType: "bigint unsigned", + JSONSchemaType: "number", + AirbyteType: "integer", + }, + { + MysqlType: "bigint zerofill", + JSONSchemaType: "number", + AirbyteType: "integer", + }, + { + MysqlType: "datetime", + JSONSchemaType: "string", + AirbyteType: "timestamp_without_timezone", + }, + { + MysqlType: "datetime(6)", + JSONSchemaType: "string", + AirbyteType: "timestamp_without_timezone", + }, + { + MysqlType: "time", + JSONSchemaType: "string", + AirbyteType: "time_without_timezone", + }, + { + MysqlType: "time(6)", + JSONSchemaType: "string", + AirbyteType: "time_without_timezone", + }, + { + MysqlType: "date", + JSONSchemaType: "string", + AirbyteType: "date", + }, + { + MysqlType: "text", + JSONSchemaType: "string", + AirbyteType: "", + }, + { + MysqlType: "varchar(256)", + JSONSchemaType: "string", + AirbyteType: "", + }, + { + MysqlType: "decimal(12,5)", + JSONSchemaType: "number", + AirbyteType: "", + }, + { + MysqlType: "double", + JSONSchemaType: "number", + AirbyteType: "", + }, + { + MysqlType: "float(30)", + JSONSchemaType: "number", + AirbyteType: "", + }, + } + + for _, typeTest := range tests { + + t.Run(fmt.Sprintf("mysql_type_%v", typeTest.MysqlType), func(t *testing.T) { + p := getAirbyteDataType(typeTest.MysqlType, typeTest.TreatTinyIntAsBoolean) + assert.Equal(t, typeTest.AirbyteType, p.AirbyteType) + assert.Equal(t, typeTest.JSONSchemaType, p.Type) + }) + } +} diff --git a/cmd/internal/types.go b/cmd/internal/types.go index 9929299..257df5a 100644 --- a/cmd/internal/types.go +++ b/cmd/internal/types.go @@ -2,6 +2,7 @@ package internal import ( "encoding/base64" + "github.com/planetscale/connect-sdk/lib" "regexp" "strconv" "strings" @@ -95,37 +96,16 @@ type SyncState struct { } type ShardStates struct { - Shards map[string]*SerializedCursor `json:"shards"` + Shards map[string]*lib.SerializedCursor `json:"shards"` } -type SerializedCursor struct { - Cursor string `json:"cursor"` -} - -func (s SerializedCursor) SerializedCursorToTableCursor(table ConfiguredStream) (*psdbconnect.TableCursor, error) { - var ( - tc psdbconnect.TableCursor - ) - decoded, err := base64.StdEncoding.DecodeString(s.Cursor) - if err != nil { - return nil, errors.Wrap(err, "unable to decode table cursor") - } - - err = codec.DefaultCodec.Unmarshal(decoded, &tc) - if err != nil { - return nil, errors.Wrap(err, "unable to deserialize table cursor") - } - - return &tc, nil -} - -func TableCursorToSerializedCursor(cursor *psdbconnect.TableCursor) (*SerializedCursor, error) { +func TableCursorToSerializedCursor(cursor *psdbconnect.TableCursor) (*lib.SerializedCursor, error) { d, err := codec.DefaultCodec.Marshal(cursor) if err != nil { return nil, errors.Wrap(err, "unable to marshal table cursor to save staate.") } - sc := &SerializedCursor{ + sc := &lib.SerializedCursor{ Cursor: base64.StdEncoding.EncodeToString(d), } return sc, nil diff --git a/cmd/internal/types_test.go b/cmd/internal/types_test.go index a477027..4b6b962 100644 --- a/cmd/internal/types_test.go +++ b/cmd/internal/types_test.go @@ -70,7 +70,7 @@ func TestCanUnmarshalLastKnownState(t *testing.T) { LastKnownPk: lastKnownPK, }) require.NoError(t, err) - tc, err := sc.SerializedCursorToTableCursor(ConfiguredStream{}) + tc, err := sc.DeserializeTableCursor() require.NoError(t, err) assert.Equal(t, "connect", tc.Keyspace) assert.Equal(t, "40-80", tc.Shard) diff --git a/go.mod b/go.mod index 00a40a1..28332ca 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,9 @@ module github.com/planetscale/airbyte-source -go 1.21 +go 1.21.3 require ( - github.com/go-sql-driver/mysql v1.7.1 + github.com/go-sql-driver/mysql v1.7.1 // indirect github.com/spf13/cobra v1.7.0 github.com/stretchr/testify v1.8.4 vitess.io/vitess v0.17.3 @@ -11,8 +11,11 @@ require ( require ( github.com/pkg/errors v0.9.1 + github.com/planetscale/connect-sdk v0.1.2 github.com/planetscale/psdb v0.0.0-20220429000526-e2a0e798aaf3 + github.com/twitchtv/twirp v8.1.2+incompatible google.golang.org/grpc v1.59.0 + google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0 google.golang.org/protobuf v1.31.0 ) diff --git a/go.sum b/go.sum index 63f2d6f..85d00af 100644 --- a/go.sum +++ b/go.sum @@ -317,6 +317,10 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= +github.com/planetscale/connect-sdk v0.1.1 h1:5CjYAfy+o5oCR1+dlzfPmxqS/ZjBXVpfxvFUf9xXcbM= +github.com/planetscale/connect-sdk v0.1.1/go.mod h1:yXabTbpaaDZfHS5CcBIOCYCgug5jD73nt0BKTuGkOyU= +github.com/planetscale/connect-sdk v0.1.2 h1:Md+VrAI+WKaRiCCorHYCHfQr2e/mtZUCvePebsTXb7U= +github.com/planetscale/connect-sdk v0.1.2/go.mod h1:s6B7tBzN+7ozfHFkcvowa9GzJB2vD40fbS2P2DWLySc= github.com/planetscale/pargzip v0.0.0-20201116224723-90c7fc03ea8a h1:y0OpQ4+5tKxeh9+H+2cVgASl9yMZYV9CILinKOiKafA= github.com/planetscale/pargzip v0.0.0-20201116224723-90c7fc03ea8a/go.mod h1:GJFUzQuXIoB2Kjn1ZfDhJr/42D5nWOqRcIQVgCxTuIE= github.com/planetscale/psdb v0.0.0-20220429000526-e2a0e798aaf3 h1:oEgD8tPIpxrTTEvVEDsY9pjkJOiqmpOE5kCGnTIGyHM= @@ -380,6 +384,8 @@ github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNG github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/tinylib/msgp v1.1.8 h1:FCXC1xanKO4I8plpHGH2P7koL/RzZs12l/+r7vakfm0= github.com/tinylib/msgp v1.1.8/go.mod h1:qkpG+2ldGg4xRFmx+jfTvZPxfGFhi64BcnL9vkCm/Tw= +github.com/twitchtv/twirp v8.1.2+incompatible h1:0O6TfzZW09ZP5r+ORA90XQEE3PTgA6C7MBbl2KxvVgE= +github.com/twitchtv/twirp v8.1.2+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg= @@ -769,6 +775,8 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0 h1:TLkBREm4nIsEcexnCjgQd5GQWaHcqMzwQV0TX9pq8S0= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0/go.mod h1:DNq5QpG7LJqD2AamLZ7zvKE0DEpVl2BSEVjFycAAjRY= google.golang.org/grpc/examples v0.0.0-20210430044426-28078834f35b h1:D/GTYPo6I1oEo08Bfpuj3xl5XE+UGHj7//5fVyKxhsQ= google.golang.org/grpc/examples v0.0.0-20210430044426-28078834f35b/go.mod h1:Ly7ZA/ARzg8fnPU9TyZIxoz33sEUuWX7txiqs8lPTgE= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -783,6 +791,7 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1.pb.go b/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1.pb.go index e096d1c..12a8c34 100644 --- a/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1.pb.go +++ b/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.21.5 +// protoc-gen-go v1.31.0 +// protoc v3.20.1 // source: psdbconnect.v1alpha1.proto package psdbconnectv1alpha1 diff --git a/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1_grpc.pb.go b/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1_grpc.pb.go index 4ebaf3f..29281c3 100644 --- a/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1_grpc.pb.go +++ b/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.5 +// - protoc v3.20.1 // source: psdbconnect.v1alpha1.proto package psdbconnectv1alpha1 diff --git a/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1_vtproto.pb.go b/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1_vtproto.pb.go index 32497a5..9822827 100644 --- a/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1_vtproto.pb.go +++ b/proto/psdbconnect/v1alpha1/psdbconnect.v1alpha1_vtproto.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-go-vtproto. DO NOT EDIT. -// protoc-gen-go-vtproto version: v0.4.0 +// protoc-gen-go-vtproto version: v0.5.0 // source: psdbconnect.v1alpha1.proto package psdbconnectv1alpha1