Skip to content

Commit fd8c76f

Browse files
committed
[wip] working through changes for EnsureUploaded to support postgresql.
1 parent 6643158 commit fd8c76f

File tree

6 files changed

+161
-31
lines changed

6 files changed

+161
-31
lines changed

dbintf.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sqlcode
33
import (
44
"context"
55
"database/sql"
6+
"database/sql/driver"
67
)
78

89
type DB interface {
@@ -11,6 +12,7 @@ type DB interface {
1112
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
1213
Conn(ctx context.Context) (*sql.Conn, error)
1314
BeginTx(ctx context.Context, txOptions *sql.TxOptions) (*sql.Tx, error)
15+
Driver() driver.Driver
1416
}
1517

1618
var _ DB = &sql.DB{}

dbops.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,25 @@ package sqlcode
33
import (
44
"context"
55
"database/sql"
6+
7+
mssql "github.com/denisenkom/go-mssqldb"
8+
"github.com/jackc/pgx/v5/stdlib"
69
)
710

811
func Exists(ctx context.Context, dbc DB, schemasuffix string) (bool, error) {
912
var schemaID int
10-
err := dbc.QueryRowContext(ctx, `select isnull(schema_id(@p1), 0)`, SchemaName(schemasuffix)).Scan(&schemaID)
13+
14+
driver := dbc.Driver()
15+
var qs string
16+
17+
if _, ok := driver.(*mssql.Driver); ok {
18+
qs = `select isnull(schema_id(@p1), 0)`
19+
}
20+
if _, ok := driver.(*stdlib.Driver); ok {
21+
qs = `select coalesce((select oid from pg_namespace where nspname = $1),0)`
22+
}
23+
24+
err := dbc.QueryRowContext(ctx, qs, SchemaName(schemasuffix)).Scan(&schemaID)
1125
if err != nil {
1226
return false, err
1327
}

deployable.go

Lines changed: 76 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import (
1111
"time"
1212

1313
mssql "github.com/denisenkom/go-mssqldb"
14+
"github.com/jackc/pgx/v5"
15+
"github.com/jackc/pgx/v5/stdlib"
16+
pgxstdlib "github.com/jackc/pgx/v5/stdlib"
1417
"github.com/vippsas/sqlcode/sqlparser"
1518
)
1619

@@ -77,21 +80,22 @@ func impersonate(ctx context.Context, dbc DB, username string, f func(conn *sql.
7780
// Upload will create and upload the schema; resulting in an error
7881
// if the schema already exists
7982
func (d *Deployable) Upload(ctx context.Context, dbc DB) error {
80-
// First, impersonate a user with minimal privileges to get at least
81-
// some level of sandboxing so that migration scripts can't do anything
82-
// the caller didn't expect them to.
83-
return impersonate(ctx, dbc, "sqlcode-deploy-sandbox-user", func(conn *sql.Conn) error {
83+
driver := dbc.Driver()
84+
qs := make(map[string][]interface{}, 1)
85+
86+
var uploadFunc = func(conn *sql.Conn) error {
8487
tx, err := conn.BeginTx(ctx, nil)
8588
if err != nil {
8689
return err
8790
}
8891

89-
_, err = tx.ExecContext(ctx, `sqlcode.CreateCodeSchema`,
90-
sql.Named("schemasuffix", d.SchemaSuffix),
91-
)
92-
if err != nil {
93-
_ = tx.Rollback()
94-
return err
92+
for q, args := range qs {
93+
_, err = tx.ExecContext(ctx, q, args...)
94+
95+
if err != nil {
96+
_ = tx.Rollback()
97+
return fmt.Errorf("failed to execute (%s) with arg(%s) in schema %s: %w", q, args, d.SchemaSuffix, err)
98+
}
9599
}
96100

97101
preprocessed, err := Preprocess(d.CodeBase, d.SchemaSuffix)
@@ -123,8 +127,36 @@ func (d *Deployable) Upload(ctx context.Context, dbc DB) error {
123127

124128
return nil
125129

126-
})
130+
}
131+
132+
if _, ok := driver.(*mssql.Driver); ok {
133+
// First, impersonate a user with minimal privileges to get at least
134+
// some level of sandboxing so that migration scripts can't do anything
135+
// the caller didn't expect them to.
136+
qs["sqlcode.CreateCodeSchema"] = []interface {
137+
}{
138+
sql.Named("schemasuffix", d.SchemaSuffix),
139+
}
127140

141+
return impersonate(ctx, dbc, "sqlcode-deploy-sandbox-user", uploadFunc)
142+
}
143+
144+
if _, ok := driver.(*stdlib.Driver); ok {
145+
qs[`set role "sqlcode-deploy-sandbox-user"`] = nil
146+
qs[`call sqlcode.createcodeschema(@schemasuffix)`] = []interface{}{
147+
pgx.NamedArgs{"schemasuffix": d.SchemaSuffix},
148+
}
149+
conn, err := dbc.Conn(ctx)
150+
if err != nil {
151+
return err
152+
}
153+
defer func() {
154+
_ = conn.Close()
155+
}()
156+
return uploadFunc(conn)
157+
}
158+
159+
return fmt.Errorf("failed to determine sql driver to upload schema: %s", d.SchemaSuffix)
128160
}
129161

130162
// EnsureUploaded checks that the schema with the suffix already exists,
@@ -137,37 +169,51 @@ func (d *Deployable) EnsureUploaded(ctx context.Context, dbc DB) error {
137169
return nil
138170
}
139171

172+
driver := dbc.Driver()
140173
lockResourceName := "sqlcode.EnsureUploaded/" + d.SchemaSuffix
141174

175+
var lockRetCode int
176+
var lockQs string
177+
var unlockQs string
178+
var err error
179+
142180
// When a lock is opened with the Transaction lock owner,
143181
// that lock is released when the transaction is committed or rolled back.
144-
var lockRetCode int
145-
err := dbc.QueryRowContext(ctx, `
146-
declare @retcode int;
147-
exec @retcode = sp_getapplock @Resource = @resource, @LockMode = 'Shared', @LockOwner = 'Session', @LockTimeout = @timeoutMs;
148-
select @retcode;
149-
`,
150-
sql.Named("resource", lockResourceName),
151-
sql.Named("timeoutMs", 20000),
152-
).Scan(&lockRetCode)
182+
if _, ok := driver.(*pgxstdlib.Driver); ok {
183+
lockQs = `select sqlcode.get_applock(@resource, @timeout)`
184+
unlockQs = `select sqlcode.release_applock(@resource)`
185+
186+
err = dbc.QueryRowContext(ctx, lockQs, pgx.NamedArgs{
187+
"resource": lockResourceName,
188+
"timeoutMs": 20000,
189+
}).Scan(&lockRetCode)
190+
191+
defer func() {
192+
dbc.ExecContext(ctx, unlockQs, pgx.NamedArgs{"resource": lockResourceName})
193+
}()
194+
}
195+
196+
if _, ok := driver.(*mssql.Driver); ok {
197+
// TODO
198+
199+
defer func() {
200+
// TODO: This returns an error if the lock is already released
201+
_, _ = dbc.ExecContext(ctx, unlockQs,
202+
sql.Named("Resource", lockResourceName),
203+
sql.Named("LockOwner", "Session"),
204+
)
205+
}()
206+
}
207+
153208
if err != nil {
154209
return err
155210
}
156211
if lockRetCode < 0 {
157212
return errors.New("was not able to get lock before timeout")
158213
}
159-
160-
defer func() {
161-
// TODO: This returns an error if the lock is already released
162-
_, _ = dbc.ExecContext(ctx, `sp_releaseapplock`,
163-
sql.Named("Resource", lockResourceName),
164-
sql.Named("LockOwner", "Session"),
165-
)
166-
}()
167-
168214
exists, err := Exists(ctx, dbc, d.SchemaSuffix)
169215
if err != nil {
170-
return err
216+
return fmt.Errorf("unable to determine if schema %s exists: %w", d.SchemaSuffix, err)
171217
}
172218

173219
if exists {

docker-compose.pgsql.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ services:
77
POSTGRES_PASSWORD: VippsPw1
88
POSTGRES_USER: sa
99
POSTGRES_DB: master
10+
PGOPTIONS: "-c log_error_verbosity=verbose -c log_statement=all"
1011
healthcheck:
1112
test: ["CMD-SHELL", "pg_isready", "-d", "db_prod"]
1213
interval: 1s

migrations/0003.sqlcode.pgsql

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,52 @@ exception
156156
end;
157157
$$;
158158

159+
-- similar behaviour as mssql getapplock
160+
-- PostgreSQL advisory locks are session-based by default
161+
create or replace function sqlcode.get_applock(
162+
resource text,
163+
timeout_ms integer default 0
164+
)
165+
returns integer
166+
language plpgsql
167+
as $$
168+
declare
169+
resource_key bigint;
170+
acquired boolean;
171+
waited_ms integer := 0;
172+
begin
173+
-- convert string to advisory-lock key
174+
select hashtext(resource) into resource_key;
175+
176+
-- attempt lock with timeout loop
177+
loop
178+
select pg_try_advisory_lock_shared(resource_key)
179+
into acquired;
180+
181+
if acquired then
182+
return 1; -- lock acquired (success)
183+
end if;
184+
185+
if waited_ms >= timeout_ms then
186+
return 0; -- timeout
187+
end if;
188+
189+
perform pg_sleep(0.01); -- sleep 10 ms
190+
waited_ms := waited_ms + 10;
191+
end loop;
192+
193+
return null; -- safety fallback (should never hit)
194+
end;
195+
$$;
196+
197+
create or replace function sqlcode.release_applock(resource text)
198+
returns boolean
199+
language sql
200+
as $$
201+
select pg_advisory_unlock_shared(hashtext(resource));
202+
$$;
203+
204+
159205
-- ensure procedures are owned by the definer role
160206
alter procedure sqlcode.createcodeschema(varchar)
161207
owner to "sqlcode-definer-role";

sqltest/sqlcode_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"testing"
66

7+
"github.com/jackc/pgx/v5"
78
"github.com/stretchr/testify/assert"
89
"github.com/stretchr/testify/require"
910
)
@@ -58,5 +59,25 @@ func Test_EnsureUploaded(t *testing.T) {
5859
}
5960

6061
fixture.RunMigrationFile("../migrations/0003.sqlcode.pgsql")
62+
63+
ctx := context.Background()
64+
65+
_, err := fixture.adminDB.Exec(`grant create on database @database to "sqlcode-definer-role"`,
66+
pgx.NamedArgs{"database": fixture.DBName})
67+
require.NoError(t, err)
68+
69+
require.NoError(t, SQL.EnsureUploaded(ctx, fixture.DB))
70+
patched := SQL.Patch(`[code].Test`)
71+
72+
res, err := fixture.DB.ExecContext(ctx, patched)
73+
require.NoError(t, err)
74+
rowsAffected, err := res.RowsAffected()
75+
require.NoError(t, err)
76+
assert.Equal(t, int64(1), rowsAffected)
77+
78+
schemas := SQL.ListUploaded(ctx, fixture.DB)
79+
require.Len(t, schemas, 1)
80+
require.Equal(t, 6, schemas[0].Objects)
81+
require.Equal(t, "5420c0269aaf", schemas[0].Suffix())
6182
})
6283
}

0 commit comments

Comments
 (0)