Skip to content

Commit ed16df0

Browse files
committed
Report correct type for schema
1 parent b22e410 commit ed16df0

File tree

10 files changed

+96
-17
lines changed

10 files changed

+96
-17
lines changed

modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ import { getDebugTableInfo } from '../replication/replication-utils.js';
99
import { KEEPALIVE_STATEMENT, PUBLICATION_NAME } from '../replication/WalStream.js';
1010
import * as types from '../types/types.js';
1111
import { getApplicationName } from '../utils/application-name.js';
12+
import { PostgresTypeCache } from '../types/cache.js';
13+
import { CustomTypeRegistry, isKnownType } from '../types/registry.js';
1214

1315
export class PostgresRouteAPIAdapter implements api.RouteAPI {
16+
private typeCache: PostgresTypeCache;
1417
connectionTag: string;
1518
// TODO this should probably be configurable one day
1619
publicationName = PUBLICATION_NAME;
@@ -31,6 +34,7 @@ export class PostgresRouteAPIAdapter implements api.RouteAPI {
3134
connectionTag?: string,
3235
private config?: types.ResolvedConnectionConfig
3336
) {
37+
this.typeCache = new PostgresTypeCache(config?.typeRegistry ?? new CustomTypeRegistry(), pool);
3438
this.connectionTag = connectionTag ?? sync_rules.DEFAULT_TAG;
3539
}
3640

@@ -297,6 +301,7 @@ LEFT JOIN (
297301
SELECT
298302
attrelid,
299303
attname,
304+
atttypid,
300305
format_type(atttypid, atttypmod) as data_type,
301306
(SELECT typname FROM pg_catalog.pg_type WHERE oid = atttypid) as pg_type,
302307
attnum,
@@ -311,6 +316,7 @@ LEFT JOIN (
311316
)
312317
GROUP BY schemaname, tablename, quoted_name`
313318
);
319+
await this.typeCache.fetchTypesForSchema();
314320
const rows = pgwire.pgwireRows(results);
315321

316322
let schemas: Record<string, service_types.DatabaseSchema> = {};
@@ -332,9 +338,11 @@ GROUP BY schemaname, tablename, quoted_name`
332338
if (pg_type.startsWith('_')) {
333339
pg_type = `${pg_type.substring(1)}[]`;
334340
}
341+
342+
const knownType = this.typeCache.registry.lookupType(Number(column.atttypid));
335343
table.columns.push({
336344
name: column.attname,
337-
sqlite_type: sync_rules.expressionTypeFromPostgresType(pg_type).typeFlags,
345+
sqlite_type: sync_rules.ExpressionType.fromTypeText(knownType.sqliteType()).typeFlags,
338346
type: column.data_type,
339347
internal_type: column.data_type,
340348
pg_type: pg_type

modules/module-postgres/src/module/PostgresModule.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
6969
private resolveConfig(config: types.PostgresConnectionConfig): types.ResolvedConnectionConfig {
7070
return {
7171
...config,
72-
...types.normalizeConnectionConfig(config)
72+
...types.normalizeConnectionConfig(config),
73+
typeRegistry: this.customTypes
7374
};
7475
}
7576

modules/module-postgres/src/replication/PgManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export class PgManager {
3131
) {
3232
// The pool is lazy - no connections are opened until a query is performed.
3333
this.pool = pgwire.connectPgWirePool(this.options, poolOptions);
34-
this.types = new PostgresTypeCache(poolOptions.registry, this.pool, () => this.getServerVersion());
34+
this.types = new PostgresTypeCache(poolOptions.registry, this.pool);
3535
}
3636

3737
public get connectionTag() {

modules/module-postgres/src/types/cache.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { DatabaseInputRow, SqliteInputRow, toSyncRulesRow } from '@powersync/ser
22
import * as pgwire from '@powersync/service-jpgwire';
33
import { CustomTypeRegistry } from './registry.js';
44
import semver from 'semver';
5+
import { getServerVersion } from '../utils/postgres_version.js';
56

67
/**
78
* A cache of custom types for which information can be crawled from the source database.
@@ -11,15 +12,14 @@ export class PostgresTypeCache {
1112

1213
constructor(
1314
readonly registry: CustomTypeRegistry,
14-
private readonly pool: pgwire.PgClient,
15-
private readonly getVersion: () => Promise<semver.SemVer | null>
15+
private readonly pool: pgwire.PgClient
1616
) {
1717
this.registry = new CustomTypeRegistry();
1818
}
1919

2020
private async fetchVersion(): Promise<semver.SemVer> {
2121
if (this.cachedVersion == null) {
22-
this.cachedVersion = (await this.getVersion()) ?? semver.parse('0.0.1');
22+
this.cachedVersion = (await getServerVersion(this.pool)) ?? semver.parse('0.0.1');
2323
}
2424

2525
return this.cachedVersion!;
@@ -138,9 +138,9 @@ WHERE t.oid = ANY($1)
138138
}
139139

140140
/**
141-
* Used for testing - fetches all custom types referenced by any column in the schema.
141+
* Used for testing - fetches all custom types referenced by any column in the database.
142142
*/
143-
public async fetchTypesForSchema(schema: string = 'public') {
143+
public async fetchTypesForSchema() {
144144
const sql = `
145145
SELECT DISTINCT a.atttypid AS type_oid
146146
FROM pg_attribute a
@@ -150,10 +150,10 @@ JOIN pg_type t ON t.oid = a.atttypid
150150
JOIN pg_namespace tn ON tn.oid = t.typnamespace
151151
WHERE a.attnum > 0
152152
AND NOT a.attisdropped
153-
AND cn.nspname = $1
153+
AND cn.nspname not in ('information_schema', 'pg_catalog', 'pg_toast')
154154
`;
155155

156-
const query = await this.pool.query({ statement: sql, params: [{ type: 'varchar', value: schema }] });
156+
const query = await this.pool.query({ statement: sql });
157157
let ids: number[] = [];
158158
for (const row of pgwire.pgwireRows(query)) {
159159
ids.push(Number(row.type_oid));

modules/module-postgres/src/types/registry.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,3 +276,7 @@ export class CustomTypeRegistry {
276276
}
277277
}
278278
}
279+
280+
export function isKnownType(type: MaybeKnownType): type is KnownType {
281+
return type.type != 'unknown';
282+
}

modules/module-postgres/src/types/types.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import * as lib_postgres from '@powersync/lib-service-postgres';
22
import * as service_types from '@powersync/service-types';
33
import * as t from 'ts-codec';
4+
import { CustomTypeRegistry } from './registry.js';
45

56
// Maintain backwards compatibility by exporting these
67
export const validatePort = lib_postgres.validatePort;
@@ -24,7 +25,10 @@ export type PostgresConnectionConfig = t.Decoded<typeof PostgresConnectionConfig
2425
/**
2526
* Resolved version of {@link PostgresConnectionConfig}
2627
*/
27-
export type ResolvedConnectionConfig = PostgresConnectionConfig & NormalizedPostgresConnectionConfig;
28+
export type ResolvedConnectionConfig = PostgresConnectionConfig &
29+
NormalizedPostgresConnectionConfig & {
30+
typeRegistry: CustomTypeRegistry;
31+
};
2832

2933
export function isPostgresConfig(
3034
config: service_types.configFile.DataSourceConfig

modules/module-postgres/test/src/pg_test.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { describe, expect, test } from 'vitest';
1111
import { clearTestDb, connectPgPool, connectPgWire, TEST_URI } from './util.js';
1212
import { WalStream } from '@module/replication/WalStream.js';
1313
import { PostgresTypeCache } from '@module/types/cache.js';
14-
import { getServerVersion } from '@module/utils/postgres_version.js';
14+
import { CustomTypeRegistry } from '@module/types/registry.js';
1515

1616
describe('pg data types', () => {
1717
async function setupTable(db: pgwire.PgClient) {
@@ -551,7 +551,7 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
551551
test('test replication - multiranges', async () => {
552552
const db = await connectPgPool();
553553

554-
if (!(await new PostgresTypeCache(db, () => getServerVersion(db)).supportsMultiRanges())) {
554+
if (!(await new PostgresTypeCache(new CustomTypeRegistry(), db).supportsMultiRanges())) {
555555
// This test requires Postgres 14 or later.
556556
return;
557557
}
@@ -620,7 +620,7 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
620620
* Return all the inserts from the first transaction in the replication stream.
621621
*/
622622
async function getReplicationTx(db: pgwire.PgClient, replicationStream: pgwire.ReplicationStream) {
623-
const typeCache = new PostgresTypeCache(db, () => getServerVersion(db));
623+
const typeCache = new PostgresTypeCache(new CustomTypeRegistry(), db);
624624
await typeCache.fetchTypesForSchema();
625625

626626
let transformed: SqliteInputRow[] = [];
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { describe, expect, test } from 'vitest';
2+
import { clearTestDb, connectPgPool } from './util.js';
3+
import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js';
4+
import { TYPE_INTEGER, TYPE_REAL, TYPE_TEXT } from '@powersync/service-sync-rules';
5+
6+
describe('PostgresRouteAPIAdapter tests', () => {
7+
test('infers connection schema', async () => {
8+
const db = await connectPgPool();
9+
try {
10+
await clearTestDb(db);
11+
const api = new PostgresRouteAPIAdapter(db);
12+
13+
await db.query(`CREATE DOMAIN rating_value AS FLOAT CHECK (VALUE BETWEEN 0 AND 5)`);
14+
await db.query(`
15+
CREATE TABLE test_users (
16+
id TEXT NOT NULL PRIMARY KEY,
17+
is_admin BOOLEAN,
18+
rating RATING_VALUE
19+
);
20+
`);
21+
22+
const schema = await api.getConnectionSchema();
23+
expect(schema).toStrictEqual([
24+
{
25+
name: 'public',
26+
tables: [
27+
{
28+
name: 'test_users',
29+
columns: [
30+
{
31+
internal_type: 'text',
32+
name: 'id',
33+
pg_type: 'text',
34+
sqlite_type: TYPE_TEXT,
35+
type: 'text'
36+
},
37+
{
38+
internal_type: 'boolean',
39+
name: 'is_admin',
40+
pg_type: 'bool',
41+
sqlite_type: TYPE_INTEGER,
42+
type: 'boolean'
43+
},
44+
{
45+
internal_type: 'rating_value',
46+
name: 'rating',
47+
pg_type: 'rating_value',
48+
sqlite_type: TYPE_REAL,
49+
type: 'rating_value'
50+
}
51+
]
52+
}
53+
]
54+
}
55+
]);
56+
} finally {
57+
await db.end();
58+
}
59+
});
60+
});

modules/module-postgres/test/src/slow_tests.test.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests';
1919
import * as mongo_storage from '@powersync/service-module-mongodb-storage';
2020
import * as postgres_storage from '@powersync/service-module-postgres-storage';
2121
import * as timers from 'node:timers/promises';
22+
import { CustomTypeRegistry } from '@module/types/registry.js';
2223

2324
describe.skipIf(!(env.CI || env.SLOW_TESTS))('slow tests', function () {
2425
describeWithStorage({ timeout: 120_000 }, function (factory) {
@@ -68,7 +69,7 @@ function defineSlowTests(factory: storage.TestStorageFactory) {
6869
});
6970

7071
async function testRepeatedReplication(testOptions: { compact: boolean; maxBatchSize: number; numBatches: number }) {
71-
const connections = new PgManager(TEST_CONNECTION_OPTIONS, {});
72+
const connections = new PgManager(TEST_CONNECTION_OPTIONS, { registry: new CustomTypeRegistry() });
7273
const replicationConnection = await connections.replicationConnection();
7374
const pool = connections.pool;
7475
await clearTestDb(pool);
@@ -329,7 +330,7 @@ bucket_definitions:
329330
await pool.query(`SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE active = FALSE`);
330331
i += 1;
331332

332-
const connections = new PgManager(TEST_CONNECTION_OPTIONS, {});
333+
const connections = new PgManager(TEST_CONNECTION_OPTIONS, { registry: new CustomTypeRegistry() });
333334
const replicationConnection = await connections.replicationConnection();
334335

335336
abortController = new AbortController();

modules/module-postgres/test/src/wal_stream_utils.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests';
1313
import * as pgwire from '@powersync/service-jpgwire';
1414
import { clearTestDb, getClientCheckpoint, TEST_CONNECTION_OPTIONS } from './util.js';
15+
import { CustomTypeRegistry } from '@module/types/registry.js';
1516

1617
export class WalStreamTestContext implements AsyncDisposable {
1718
private _walStream?: WalStream;
@@ -32,7 +33,7 @@ export class WalStreamTestContext implements AsyncDisposable {
3233
options?: { doNotClear?: boolean; walStreamOptions?: Partial<WalStreamOptions> }
3334
) {
3435
const f = await factory({ doNotClear: options?.doNotClear });
35-
const connectionManager = new PgManager(TEST_CONNECTION_OPTIONS, {});
36+
const connectionManager = new PgManager(TEST_CONNECTION_OPTIONS, { registry: new CustomTypeRegistry() });
3637

3738
if (!options?.doNotClear) {
3839
await clearTestDb(connectionManager.pool);

0 commit comments

Comments
 (0)