-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpubsub.ts
118 lines (103 loc) · 3.61 KB
/
pubsub.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import { SQLiteCloudConnection } from './connection'
import SQLiteCloudTlsConnection from './connection-tls'
import { PubSubCallback } from './types'
export enum PUBSUB_ENTITY_TYPE {
TABLE = 'TABLE',
CHANNEL = 'CHANNEL'
}
/**
* Pub/Sub class to receive changes on database tables or to send messages to channels.
*/
export class PubSub {
constructor(connection: SQLiteCloudConnection) {
this.connection = connection
this.connectionPubSub = new SQLiteCloudTlsConnection(connection.getConfig())
}
private connection: SQLiteCloudConnection
private connectionPubSub: SQLiteCloudConnection
/**
* Listen for a table or channel and start to receive messages to the provided callback.
* @param entityType One of TABLE or CHANNEL'
* @param entityName Name of the table or the channel
* @param callback Callback to be called when a message is received
* @param data Extra data to be passed to the callback
*/
public async listen(entityType: PUBSUB_ENTITY_TYPE, entityName: string, callback: PubSubCallback, data?: any): Promise<any> {
// should not force user to import and pass in entity type
const entity = entityType === 'TABLE' ? 'TABLE ' : '' // should use PUBSUB_ENTITY_TYPE for check
const authCommand: string = await this.connection.sql(`LISTEN ${entity}${entityName};`)
return new Promise((resolve, reject) => {
this.connectionPubSub.sendCommands(authCommand, (error, results) => {
if (error) {
callback.call(this, error, null, data)
reject(error)
} else {
// skip results from pubSub auth command
if (results !== 'OK') {
callback.call(this, null, results, data)
}
resolve(results)
}
})
})
}
/**
* Stop receive messages from a table or channel.
* @param entityType One of TABLE or CHANNEL
* @param entityName Name of the table or the channel
*/
public async unlisten(entityType: string, entityName: string): Promise<any> {
const subject = entityType === 'TABLE' ? 'TABLE ' : ''
return this.connection.sql(`UNLISTEN ${subject}?;`, entityName)
}
/**
* Create a channel to send messages to.
* @param name Channel name
* @param failIfExists Raise an error if the channel already exists
*/
public async createChannel(name: string, failIfExists: boolean = true): Promise<any> {
let notExistsCommand = ''
if (!failIfExists) {
notExistsCommand = ' IF NOT EXISTS'
}
return this.connection.sql(`CREATE CHANNEL ?${notExistsCommand};`, name)
}
/**
* Deletes a Pub/Sub channel.
* @param name Channel name
*/
public async removeChannel(name: string): Promise<any> {
return this.connection.sql(`REMOVE CHANNEL ?;`, name)
}
/**
* Send a message to the channel.
*/
public notifyChannel(channelName: string, message: string): Promise<any> {
return this.connection.sql`NOTIFY ${channelName} ${message};`
}
/**
* Ask the server to close the connection to the database and
* to keep only open the Pub/Sub connection.
* Only interaction with Pub/Sub commands will be allowed.
*/
public setPubSubOnly(): Promise<any> {
return new Promise((resolve, reject) => {
this.connection.sendCommands('PUBSUB ONLY;', (error, results) => {
if (error) {
reject(error)
} else {
this.connection.close()
resolve(results)
}
})
})
}
/** True if Pub/Sub connection is open. */
public connected(): boolean {
return this.connectionPubSub.connected
}
/** Close Pub/Sub connection. */
public close(): void {
this.connectionPubSub.close()
}
}