Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Extract Connect SDK from sources #93

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
13 changes: 4 additions & 9 deletions cmd/airbyte-source/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/planetscale/connect-sdk/lib"
"os"

"github.com/planetscale/airbyte-source/cmd/internal"
Expand Down Expand Up @@ -43,13 +44,7 @@ func CheckCommand(ch *Helper) *cobra.Command {
return
}

defer func() {
if err := ch.Database.Close(); err != nil {
fmt.Fprintf(cmd.OutOrStdout(), "Unable to close connection to PlanetScale Database, failed with %v", err)
}
}()

cs, _ := checkConnectionStatus(ch.Database, psc)
cs, _ := checkConnectionStatus(ch.ConnectClient, ch.Source)
ch.Logger.ConnectionStatus(cs)
},
}
Expand All @@ -70,9 +65,9 @@ func parseSource(reader FileReader, configFilePath string) (internal.PlanetScale
return psc, nil
}

func checkConnectionStatus(database internal.PlanetScaleDatabase, psc internal.PlanetScaleSource) (internal.ConnectionStatus, error) {
func checkConnectionStatus(connectClient lib.ConnectClient, psc lib.PlanetScaleSource) (internal.ConnectionStatus, error) {

if err := database.CanConnect(context.Background(), psc); err != nil {
if err := connectClient.CanConnect(context.Background(), psc); err != nil {
return internal.ConnectionStatus{
Status: "FAILED",
Message: fmt.Sprintf("Unable to connect to PlanetScale database %v at host %v with username %v. Failed with \n %v", psc.Database, psc.Host, psc.Username, err),
Expand Down
23 changes: 14 additions & 9 deletions cmd/airbyte-source/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/planetscale/connect-sdk/lib"
"os"
"testing"

Expand All @@ -27,7 +28,6 @@ func TestCheckInvalidCatalogJSON(t *testing.T) {
content: []byte("i am not json"),
}
checkCommand := CheckCommand(&Helper{
Database: internal.PlanetScaleEdgeDatabase{},
FileReader: tfr,
Logger: internal.NewLogger(os.Stdout),
})
Expand All @@ -50,16 +50,16 @@ func TestCheckCredentialsInvalid(t *testing.T) {
content: []byte("{\"host\": \"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\"}"),
}

td := testDatabase{
td := testConnectClient{
connectResponse: canConnectResponse{
err: fmt.Errorf("[%v] is invalid", "username"),
},
}

checkCommand := CheckCommand(&Helper{
Database: td,
FileReader: tfr,
Logger: internal.NewLogger(os.Stdout),
ConnectClient: td,
FileReader: tfr,
Logger: internal.NewLogger(os.Stdout),
})
b := bytes.NewBufferString("")
checkCommand.SetOut(b)
Expand All @@ -80,16 +80,21 @@ func TestCheckExecuteSuccessful(t *testing.T) {
content: []byte("{\"host\": \"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\"}"),
}

td := testDatabase{
td := testConnectClient{
connectResponse: canConnectResponse{
err: nil,
},
}

checkCommand := CheckCommand(&Helper{
Database: td,
FileReader: tfr,
Logger: internal.NewLogger(os.Stdout),
ConnectClient: td,
FileReader: tfr,
Source: lib.PlanetScaleSource{
Host: "something.us-east-3.psdb.cloud",
Database: "database",
Username: "username",
},
Logger: internal.NewLogger(os.Stdout),
})
b := bytes.NewBufferString("")
checkCommand.SetOut(b)
Expand Down
16 changes: 8 additions & 8 deletions cmd/airbyte-source/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,24 @@ func DiscoverCommand(ch *Helper) *cobra.Command {
return
}

cs, err := checkConnectionStatus(ch.Database, psc)
cs, err := checkConnectionStatus(ch.ConnectClient, ch.Source)
if err != nil {
ch.Logger.ConnectionStatus(cs)
return
}

defer func() {
if err := ch.Database.Close(); err != nil {
fmt.Fprintf(cmd.OutOrStdout(), "Unable to close connection to PlanetScale Database, failed with %v", err)
}
}()
if err := ch.EnsureDB(psc); err != nil {
fmt.Fprintln(cmd.OutOrStdout(), "Unable to connect to PlanetScale Database")
return
}

c, err := ch.Database.DiscoverSchema(context.Background(), psc)
if err != nil {
sb := internal.NewSchemaBuilder()
if err := ch.MysqlClient.BuildSchema(context.Background(), ch.Source, sb); err != nil {
ch.Logger.Log(internal.LOGLEVEL_ERROR, fmt.Sprintf("Unable to discover database, failed with [%v]", err))
return
}

c := sb.(*internal.SchemaBuilder).GetCatalog()
ch.Logger.Catalog(c)
},
}
Expand Down
21 changes: 12 additions & 9 deletions cmd/airbyte-source/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ func TestDiscoverInvalidSource(t *testing.T) {
tfr := testFileReader{
content: []byte("{\"host\": \"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\"}"),
}
td := testDatabase{
td := testConnectClient{
connectResponse: canConnectResponse{
err: fmt.Errorf("[%v] is invalid", "username"),
},
}

b := bytes.NewBufferString("")
discover := DiscoverCommand(&Helper{
Database: td,
FileReader: tfr,
Logger: internal.NewLogger(b),
ConnectClient: td,
FileReader: tfr,
Logger: internal.NewLogger(b),
})
discover.SetArgs([]string{"config source.json"})

Expand All @@ -46,19 +46,22 @@ func TestDiscoverFailed(t *testing.T) {
tfr := testFileReader{
content: []byte("{\"host\": \"something.us-east-3.psdb.cloud\",\"database\":\"database\",\"username\":\"username\",\"password\":\"password\"}"),
}
td := testDatabase{
td := testConnectClient{
connectResponse: canConnectResponse{
err: nil,
},
discoverSchemaResponse: discoverSchemaResponse{
}
tmc := testMysqlClient{
buildSchemaResponse: buildSchemaResponse{
err: fmt.Errorf("unable to get catalog for %v", "keyspace"),
},
}
b := bytes.NewBufferString("")
discover := DiscoverCommand(&Helper{
Database: td,
FileReader: tfr,
Logger: internal.NewLogger(b),
ConnectClient: td,
MysqlClient: tmc,
FileReader: tfr,
Logger: internal.NewLogger(b),
})
discover.SetArgs([]string{"config source.json"})

Expand Down
26 changes: 17 additions & 9 deletions cmd/airbyte-source/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package airbyte_source

import (
"github.com/planetscale/airbyte-source/cmd/internal"
"github.com/planetscale/connect-sdk/lib"
"io"
"os"
)

type Helper struct {
Database internal.PlanetScaleDatabase
FileReader FileReader
Logger internal.AirbyteLogger
MysqlClient lib.MysqlClient
ConnectClient lib.ConnectClient
Source lib.PlanetScaleSource
FileReader FileReader
Logger internal.AirbyteLogger
}

type FileReader interface {
Expand All @@ -31,18 +34,23 @@ func DefaultHelper(w io.Writer) *Helper {
}

func (h *Helper) EnsureDB(psc internal.PlanetScaleSource) error {
if h.Database != nil {
if h.ConnectClient != nil {
return nil
}

mysql, err := internal.NewMySQL(&psc)
h.Source = lib.PlanetScaleSource{
UseReplica: true,
Username: psc.Username,
Database: psc.Database,
Host: psc.Host,
Password: psc.Password,
}
var err error
h.MysqlClient, err = lib.NewMySQL(&h.Source)
if err != nil {
return err
}
h.Database = internal.PlanetScaleEdgeDatabase{
Logger: h.Logger,
Mysql: mysql,
}
h.ConnectClient = lib.NewConnectClient(&h.MysqlClient)

return nil
}
39 changes: 26 additions & 13 deletions cmd/airbyte-source/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/planetscale/airbyte-source/cmd/internal"
psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1"
"github.com/planetscale/connect-sdk/lib"
"github.com/spf13/cobra"
"os"
)

var (
Expand Down Expand Up @@ -49,13 +50,7 @@ func ReadCommand(ch *Helper) *cobra.Command {
return
}

defer func() {
if err := ch.Database.Close(); err != nil {
fmt.Fprintf(cmd.OutOrStdout(), "Unable to close connection to PlanetScale Database, failed with %v", err)
}
}()

cs, err := checkConnectionStatus(ch.Database, psc)
cs, err := checkConnectionStatus(ch.ConnectClient, ch.Source)
if err != nil {
ch.Logger.ConnectionStatus(cs)
return
Expand All @@ -81,7 +76,8 @@ func ReadCommand(ch *Helper) *cobra.Command {
}
state = string(b)
}
shards, err := ch.Database.ListShards(context.Background(), psc)

shards, err := ch.ConnectClient.ListShards(context.Background(), ch.Source)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to list shards : %v", err))
os.Exit(1)
Expand All @@ -93,6 +89,9 @@ func ReadCommand(ch *Helper) *cobra.Command {
os.Exit(1)
}

allColumns := []string{}
rb := internal.NewResultBuilder(ch.Logger)
irb := rb.(*internal.ResultBuilder)
for _, table := range catalog.Streams {
keyspaceOrDatabase := table.Stream.Namespace
if keyspaceOrDatabase == "" {
Expand All @@ -104,25 +103,38 @@ func ReadCommand(ch *Helper) *cobra.Command {
ch.Logger.Error(fmt.Sprintf("Unable to read state for stream %v", streamStateKey))
os.Exit(1)
}
irb.SetKeyspace(keyspaceOrDatabase)
irb.SetTable(table.Stream.Name)

for shardName, shardState := range streamState.Shards {
tc, err := shardState.SerializedCursorToTableCursor(table)
tc, err := shardState.DeserializeTableCursor()
if err != nil {
ch.Logger.Error(fmt.Sprintf("invalid cursor for stream %v, failed with [%v]", streamStateKey, err))
os.Exit(1)
}
irb.HandleOnCursor = func(tc *psdbconnect.TableCursor) error {
sc, err := lib.SerializeTableCursor(tc)
if err != nil {
return err
}
syncState.Streams[streamStateKey].Shards[shardName] = sc
ch.Logger.State(syncState)
ch.Logger.Flush()
return nil
}

sc, err := ch.Database.Read(context.Background(), cmd.OutOrStdout(), psc, table, tc)
sc, err := ch.ConnectClient.Read(context.Background(), ch.Logger, ch.Source, table.Stream.Name, allColumns, tc, rb)
if err != nil {
ch.Logger.Error(err.Error())
os.Exit(1)
}

if sc != nil {
// if we get any new state, we assign it here.
// otherwise, the older state is round-tripped back to Airbyte.
syncState.Streams[streamStateKey].Shards[shardName] = sc
}

ch.Logger.Flush()
ch.Logger.State(syncState)
}
}
Expand Down Expand Up @@ -150,6 +162,7 @@ func readState(state string, psc internal.PlanetScaleSource, streams []internal.
}

for _, s := range streams {

keyspaceOrDatabase := s.Stream.Namespace
if keyspaceOrDatabase == "" {
keyspaceOrDatabase = psc.Database
Expand Down
Loading