Skip to content

Commit

Permalink
fix things
Browse files Browse the repository at this point in the history
  • Loading branch information
oguimbal committed Aug 6, 2024
1 parent 48a167d commit 096f5cd
Show file tree
Hide file tree
Showing 13 changed files with 466 additions and 155 deletions.
38 changes: 36 additions & 2 deletions src/adapters/adapters.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { LibAdapters, IMemoryDb, NotSupported, QueryResult, SlonikAdapterOptions } from '../interfaces';
import { LibAdapters, IMemoryDb, NotSupported, QueryResult, SlonikAdapterOptions, BindServerOptions, BindServerResult } from '../interfaces';
import lru from 'lru-cache';
import { compareVersions, delay, doRequire, timeoutOrImmediate } from '../utils';
import { toLiteral } from '../misc/pg-utils';
import { _IDb, _IType } from '../interfaces-private';
import { TYPE_SYMBOL } from '../execution/select';
import { ArrayType } from '../datatypes';
import { CustomEnumType } from '../datatypes/t-custom-enum';
import { socketAdapter } from './pg-socket-adapter';
import { bindPgServer, socketAdapter } from './pg-socket-adapter';


export function replaceQueryArgs$(this: void, sql: string, values: any[]) {
Expand Down Expand Up @@ -429,4 +429,38 @@ export class Adapters implements LibAdapters {
});
return sql;
}


async bindServer(opts?: BindServerOptions): Promise<BindServerResult> {
const { createServer } = doRequire('net') as typeof import('net');
const srv = createServer();
return new Promise<BindServerResult>((res, rej) => {
srv.listen(opts?.port ?? 0, opts?.host ?? '127.0.0.1', () => {
const a = srv.address();
if (!a) {
srv.close();
return rej('cannot find a port');
}
if (typeof a === 'string') {
srv.close();
return rej('cannot find a port');
}

srv.on('connection', (socket) => {
bindPgServer(socket, this.db.public);
});

res({
postgresConnectionString: `postgresql://${a.address}:${a.port}/postgres?sslmode=disable`,
connectionSettings: {
port: a.port,
host: a.address,
},
close: () => {
srv.close();
},
})
});
});
}
}
288 changes: 150 additions & 138 deletions src/adapters/pg-socket-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,7 @@ import { AsyncQueue, delay, doRequire, lazySync, nullIsh } from '../utils';
// https://www.postgresql.org/docs/current/protocol-message-formats.html

export const socketAdapter = lazySync(() => {
const { CommandCode, bindSocket } = doRequire('pg-server') as typeof import('pg-server');
const EventEmitter = doRequire('events') as typeof import('events');

const byCode = Object.fromEntries(
Object.entries(CommandCode).map(([k, v]) => [v, k])
);

class InMemorySocket extends EventEmitter {
readonly peer: InMemorySocket;
isTop: boolean;
Expand Down Expand Up @@ -60,147 +54,165 @@ export const socketAdapter = lazySync(() => {
class Connection {
socket = new InMemorySocket();
constructor(private mem: _ISchema, private queryLatency?: number) {
const log = typeof process !== 'undefined' && process.env.DEBUG_PG_SERVER === 'true'
? console.log.bind(console)
: (...args: any[]) => { };
let prepared: _IPreparedQuery | undefined;
let preparedQuery: string | undefined;
let bound: _IBoundQuery | undefined;
let runningTx: _Transaction | undefined;
const queue = new AsyncQueue();
bindSocket(this.socket.peer as any, ({ command: cmd }, writer) => queue.enqueue(async () => {
function sendDescribe(prepared: _IPreparedQuery) {
const p = prepared.describe();
writer.parameterDescription(p.parameters.map(x => x.typeId));

// see RowDescription() in connection.js of postgres.js
// => we just need typeId
const descs = p.result.map<import('pg-server').FieldDesc>(x => ({
name: x.name,
tableID: 0,
columnID: 0,
dataTypeID: x.typeId,
dataTypeSize: 0,
dataTypeModifier: -1,
format: 0,
mode: 'text',
// mode: textMode ? 'text' : 'binary',
}))
writer.rowDescription(descs);
}
bindPgServer(this.socket.peer, mem, queryLatency);
}
}

return (mem: _ISchema, queryLatency?: number) => new Connection(mem, queryLatency).socket;
});

function sendResults(bound: _IBoundQuery, qname: string) {
const results = bound.executeAll(runningTx);
if (runningTx && results.state) {
runningTx = results.state;

export function bindPgServer(this: void, peer: any, mem: _ISchema, queryLatency?: number) {
const { CommandCode, bindSocket } = doRequire('pg-server') as typeof import('pg-server');
const byCode = Object.fromEntries(
Object.entries(CommandCode).map(([k, v]) => [v, k])
);

const log = typeof process !== 'undefined' && process.env.DEBUG_PG_SERVER === 'true'
? console.log.bind(console)
: (...args: any[]) => { };
let prepared: _IPreparedQuery | undefined;
let preparedQuery: string | undefined;
let bound: _IBoundQuery | undefined;
let runningTx: _Transaction | undefined;
const queue = new AsyncQueue();
bindSocket(peer, ({ command: cmd }, writer) => queue.enqueue(async () => {
function sendDescribe(prepared: _IPreparedQuery) {
const p = prepared.describe();
writer.parameterDescription(p.parameters.map(x => x.typeId));

// see RowDescription() in connection.js of postgres.js
// => we just need typeId
const descs = p.result.map<import('pg-server').FieldDesc>(x => ({
name: x.name,
tableID: 0,
columnID: 0,
dataTypeID: x.typeId,
dataTypeSize: 0,
dataTypeModifier: -1,
format: 0,
mode: 'text',
// mode: textMode ? 'text' : 'binary',
}))
writer.rowDescription(descs);
}

function sendResults(bound: _IBoundQuery, qname: string) {
const results = bound.executeAll(runningTx);
if (runningTx && results.state) {
runningTx = results.state;
}
for (const row of results.rows) {
writer.dataRow(results.fields.map((x) => row[x.name]));
}
log('...complete', qname);
writer.commandComplete(qname);
writer.readyForQuery();
}
try {
await delay(queryLatency ?? 0);
const t = cmd.type;
const cmdName = byCode[t];

switch (t) {
case CommandCode.init:
writer.authenticationOk();
writer.parameterStatus('client_encoding', 'UTF8');
writer.parameterStatus('DateStyle', 'ISO, MDY');
writer.parameterStatus('integer_datetimes', 'on');
writer.parameterStatus('server_encoding', 'UTF8');
writer.parameterStatus('server_version', '12.5');
writer.parameterStatus('TimeZone', 'UTC');

return writer.readyForQuery();

case CommandCode.parse:
try {
prepared = mem.prepare(cmd.query);
if (!prepared) {
return writer.emptyQuery();
}
preparedQuery = cmd.queryName || cmd.query;
} catch (e: any) {
return writer.error(e);
}
for (const row of results.rows) {
writer.dataRow(results.fields.map((x) => row[x.name]));
return writer.parseComplete();
case CommandCode.describe: {
if (!prepared) {
return writer.error("no prepared query");
}
log('...complete', qname);
writer.commandComplete(qname);
writer.readyForQuery();

sendDescribe(prepared);
return;
}
try {
await delay(this.queryLatency ?? 0);
const t = cmd.type;
const cmdName = byCode[t];

switch (t) {
case CommandCode.init:
return writer.readyForQuery();

case CommandCode.parse:
try {
prepared = mem.prepare(cmd.query);
if (!prepared) {
return writer.emptyQuery();
}
preparedQuery = cmd.queryName;
} catch (e: any) {
return writer.error(e);
}
return writer.parseComplete();
case CommandCode.describe: {
if (!prepared) {
return writer.error("no prepared query");
}
case CommandCode.bind: {
if (!prepared) {
return writer.error("no prepared query");
}
try {
bound = prepared.bind(cmd.values);
} catch (e: any) {
return writer.error(e);
}
return writer.bindComplete();
}
case CommandCode.execute: {
if (!bound || !preparedQuery) {
return writer.error("no bound query");
}
sendResults(bound, preparedQuery);
return;
}
case CommandCode.sync:
prepared = undefined;
preparedQuery = undefined;
bound = undefined;
// writer.readyForQuery();
return;
case CommandCode.flush:
return;
case CommandCode.query: {
if (!cmd.query) {
return writer.emptyQuery();
}

sendDescribe(prepared);
// handle transactions
const qlow = cmd.query.trim().toLowerCase();
switch (qlow) {
case 'begin':
runningTx = mem.db.data.fork();
writer.commandComplete(qlow.toUpperCase());
writer.readyForQuery();
return;
}
case CommandCode.bind: {
if (!prepared) {
return writer.error("no prepared query");
}
try {
bound = prepared.bind(cmd.values);
} catch (e: any) {
return writer.error(e);
case 'commit':
if (!runningTx) {
return writer.error("no transaction to commit");
}
return writer.bindComplete();
}
case CommandCode.execute: {
if (!bound || !preparedQuery) {
return writer.error("no bound query");
}
sendResults(bound, preparedQuery);
return;
}
case CommandCode.sync:
prepared = undefined;
preparedQuery = undefined;
bound = undefined;
// writer.readyForQuery();
return;
case CommandCode.flush:
runningTx.fullCommit();
runningTx = undefined;
writer.commandComplete(qlow.toUpperCase());
writer.readyForQuery();
return;
case CommandCode.query: {
if (!cmd.query) {
return writer.emptyQuery();
}

// handle transactions
const qlow = cmd.query.trim().toLowerCase();
switch (qlow) {
case 'begin':
runningTx = mem.db.data.fork();
writer.commandComplete(qlow.toUpperCase());
writer.readyForQuery();
return;
case 'commit':
if (!runningTx) {
return writer.error("no transaction to commit");
}
runningTx.fullCommit();
runningTx = undefined;
writer.commandComplete(qlow.toUpperCase());
writer.readyForQuery();
return;
case 'rollback':
runningTx = undefined;
writer.commandComplete(qlow.toUpperCase());
writer.readyForQuery();
return;
}

// simple query flow
const prep = mem.prepare(cmd.query);
sendDescribe(prep);
const bound = prep.bind();
sendResults(bound, cmd.query);
case 'rollback':
runningTx = undefined;
writer.commandComplete(qlow.toUpperCase());
writer.readyForQuery();
return;
}
default:
return writer.error(`pg-mem does not implement PG command ${cmdName}`);
}
} catch (e: any) {
log("🔥 ", e);
writer.error(e);

// simple query flow
const prep = mem.prepare(cmd.query);
sendDescribe(prep);
const bound = prep.bind();
sendResults(bound, cmd.query);
return;
}
}));
default:
return writer.error(`pg-mem does not implement PG command ${cmdName}`);
}
} catch (e: any) {
log("🔥 ", e);
writer.error(e);
}
}

return (mem: _ISchema, queryLatency?: number) => new Connection(mem, queryLatency).socket;
});
}));
}
19 changes: 19 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,25 @@ export interface LibAdapters {

/** Creates a Postres.js `sql` tag bound to this db */
createPostgresJsTag(queryLatency?: number): any;

/** Binds a server to this instance */
bindServer(opts?: BindServerOptions): Promise<BindServerResult>;
}

export interface BindServerResult {
postgresConnectionString: string;
connectionSettings: {
host: string;
port: number;
};
close(): void;
}

export interface BindServerOptions {
/** defaults to a random port */
port?: number;
/** defaults to 'localhost' */
host?: string;
}

export interface SlonikAdapterOptions {
Expand Down
Loading

0 comments on commit 096f5cd

Please sign in to comment.