Skip to content

Commit c95c1cc

Browse files
author
Lyubo Kamenov
authored
Fix operation for snapshots and replace use of information_schema (#154)
Snapshot records were being produced using the `Create` operation rather than 'Snapshot'. PostgreSQL `information_schema` can lack visibility to users with limited grants, switch to using pg_* tables where possible. Use jackc pgerrcode instead of maintaining our own constants.
1 parent 62feffe commit c95c1cc

File tree

9 files changed

+26
-25
lines changed

9 files changed

+26
-25
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/golangci/golangci-lint v1.58.2
1212
github.com/google/go-cmp v0.6.0
1313
github.com/google/uuid v1.6.0
14+
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
1415
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
1516
github.com/jackc/pgx/v5 v5.5.5
1617
github.com/matryer/is v1.4.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,8 @@ github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
327327
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
328328
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
329329
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
330+
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0=
331+
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
330332
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
331333
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
332334
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 h1:86CQbMauoZdLS0HDLcEHYo6rErjiCBjVvcxGsioIn7s=

source.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,9 @@ func (s *Source) getAllTables(ctx context.Context) ([]string, error) {
193193
// getPrimaryKey queries the db for the name of the primary key column for a
194194
// table if one exists and returns it.
195195
func (s *Source) getPrimaryKey(ctx context.Context, tableName string) (string, error) {
196-
query := `SELECT c.column_name
197-
FROM information_schema.table_constraints tc
198-
JOIN information_schema.constraint_column_usage AS ccu USING (constraint_schema, constraint_name)
199-
JOIN information_schema.columns AS c ON c.table_schema = tc.constraint_schema
200-
AND tc.table_name = c.table_name AND ccu.column_name = c.column_name
201-
WHERE constraint_type = 'PRIMARY KEY' AND tc.table_schema = 'public'
202-
AND tc.table_name = $1`
196+
query := `SELECT a.attname FROM pg_index i
197+
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
198+
WHERE i.indrelid = $1::regclass AND i.indisprimary`
203199

204200
rows, err := s.pool.Query(ctx, query, tableName)
205201
if err != nil {

source/iterator.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,4 @@ type Iterator interface {
3333
Teardown(context.Context) error
3434
}
3535

36-
var (
37-
_ Iterator = (*logrepl.CDCIterator)(nil)
38-
)
36+
var _ Iterator = (*logrepl.CDCIterator)(nil)

source/logrepl/internal/error.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@ package internal
1717
import (
1818
"errors"
1919

20+
"github.com/jackc/pgerrcode"
2021
"github.com/jackc/pgx/v5/pgconn"
2122
)
2223

23-
const pgDuplicateObjectErrorCode = "42710"
24-
2524
func IsPgDuplicateErr(err error) bool {
2625
var pgerr *pgconn.PgError
27-
return errors.As(err, &pgerr) && pgerr.Code == pgDuplicateObjectErrorCode
26+
return errors.As(err, &pgerr) && pgerr.Code == pgerrcode.DuplicateObject
2827
}

source/snapshot/fetch_worker.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,9 @@ func (*FetchWorker) validateKey(ctx context.Context, table, key string, tx pgx.T
364364

365365
if err := tx.QueryRow(
366366
ctx,
367-
"SELECT data_type FROM information_schema.columns WHERE table_name=$1 AND column_name=$2",
368-
table, key,
367+
`SELECT a.atttypid::regtype AS type FROM pg_class c JOIN pg_attribute a ON c.oid = a.attrelid
368+
WHERE c.relkind = 'r' AND a.attname = $1 AND c.relname = $2`,
369+
key, table,
369370
).Scan(&dataType); err != nil {
370371
if errors.Is(err, pgx.ErrNoRows) {
371372
return fmt.Errorf("key %q not present on table %q", key, table)
@@ -379,12 +380,12 @@ func (*FetchWorker) validateKey(ctx context.Context, table, key string, tx pgx.T
379380

380381
var isPK bool
381382

383+
// As per https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns
382384
if err := tx.QueryRow(
383385
ctx,
384-
`SELECT EXISTS(SELECT tc.constraint_type
385-
FROM information_schema.constraint_column_usage cu JOIN information_schema.table_constraints tc
386-
ON tc.constraint_name = cu.constraint_name
387-
WHERE cu.table_name=$1 AND cu.column_name=$2)`,
386+
`SELECT EXISTS(SELECT a.attname FROM pg_index i
387+
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
388+
WHERE i.indrelid = $1::regclass AND a.attname = $2 AND i.indisprimary)`,
388389
table, key,
389390
).Scan(&isPK); err != nil {
390391
return fmt.Errorf("unable to determine key %q constraints: %w", key, err)

source/snapshot/fetch_worker_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,11 @@ func Test_FetcherValidate(t *testing.T) {
191191

192192
err := f.Validate(ctx)
193193
is.True(err != nil)
194-
is.True(strings.Contains(
195-
err.Error(),
196-
fmt.Sprintf(`key "missing_key" not present on table %q`, table),
197-
))
194+
ok := strings.Contains(err.Error(), fmt.Sprintf(`key "missing_key" not present on table %q`, table))
195+
if !ok {
196+
t.Logf("error: %s", err.Error())
197+
}
198+
is.True(ok)
198199
})
199200
}
200201

source/snapshot/iterator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (i *Iterator) buildRecord(d FetchData) sdk.Record {
118118
metadata := make(sdk.Metadata)
119119
metadata["postgres.table"] = d.Table
120120

121-
return sdk.Util.Source.NewRecordCreate(pos, metadata, d.Key, d.Payload)
121+
return sdk.Util.Source.NewRecordSnapshot(pos, metadata, d.Key, d.Payload)
122122
}
123123

124124
func (i *Iterator) initFetchers(ctx context.Context) error {

source/snapshot/iterator_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/conduitio/conduit-connector-postgres/source/position"
2424
"github.com/conduitio/conduit-connector-postgres/test"
25+
sdk "github.com/conduitio/conduit-connector-sdk"
2526
"github.com/matryer/is"
2627
)
2728

@@ -48,9 +49,11 @@ func Test_Iterator_Next(t *testing.T) {
4849
}()
4950

5051
for j := 1; j <= 4; j++ {
51-
_, err = i.Next(ctx)
52+
r, err := i.Next(ctx)
5253
is.NoErr(err)
54+
is.Equal(r.Operation, sdk.OperationSnapshot)
5355
}
56+
5457
for j := 1; j <= 4; j++ {
5558
err = i.Ack(ctx, nil)
5659
is.NoErr(err)

0 commit comments

Comments
 (0)