-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.ts
316 lines (290 loc) · 13.6 KB
/
app.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
import * as WebSocket from 'ws'
import {getRedis, getRedisSubscriber} from './redis'
import * as IORedis from 'ioredis'
import {
ActionBus,
AliasedAction,
AlterAliasedActionAction,
AlterButtonAction,
AlterGlyphAction,
AlterRegexAction,
AlterScreenAction,
AlterTranslationAction,
AuthDiscordEndHandshakeAction,
AuthMojangEndHandshakeAction,
AuthReestablishAuthedConnectionAction,
Button,
DeleteAliasedActionAction,
DeleteButtonAction,
DeleteGlyphAction,
DeleteRegexAction,
DeleteScreenAction,
DeleteTranslationAction,
Glyph,
InitializeClientAction,
LinkDiscordAction,
MigrateKeybindsAction,
RegularExpression,
RemoveAliasedActionAction,
RemoveButtonAction,
RemoveGlyphAction,
RemoveRegexAction,
RemoveScreenAction,
RemoveTranslationAction,
Resolver,
Screen,
ServerJoinedAction,
ServerLeftAction,
SetAliasedActionAction,
SetButtonAction,
SetClientSettingsAction,
SetCurrentUserCountAction,
SetGlyphForUserAction,
SetRegexAction,
SetScreenAction,
SetTranslationAction,
Translation
} from '@quickplaymod/quickplay-actions-js'
import StateAggregator from './StateAggregator'
import SessionContext from './SessionContext'
import MigrateKeybindsSubscriber from './subscribers/MigrateKeybindsSubscriber'
import InitializeClientSubscriber from './subscribers/InitializeClientSubscriber'
import AuthEndHandshakeSubscriber from './subscribers/AuthEndHandshakeSubscriber'
import AuthReestablishAuthedConnectionSubscriber from './subscribers/AuthReestablishAuthedConnectionSubscriber'
import AlterScreenSubscriber from './subscribers/AlterScreenSubscriber'
import DeleteScreenSubscriber from './subscribers/DeleteScreenSubscriber'
import AlterButtonSubscriber from './subscribers/AlterButtonSubscriber'
import DeleteButtonSubscriber from './subscribers/DeleteButtonSubscriber'
import DeleteAliasedActionSubscriber from './subscribers/DeleteAliasedActionSubscriber'
import AlterAliasedActionSubscriber from './subscribers/AlterAliasedActionSubscriber'
import AlterTranslationSubscriber from './subscribers/AlterTranslationSubscriber'
import DeleteTranslationSubscriber from './subscribers/DeleteTranslationSubscriber'
import SetClientSettingsSubscriber from './subscribers/SetClientSettingsSubscriber'
import ServerJoinedSubscriber from './subscribers/ServerJoinedSubscriber'
import ServerLeftSubscriber from './subscribers/ServerLeftSubscriber'
import mysqlPool from './mysqlPool'
import {RowDataPacket} from 'mysql2'
import AddUserCountHistoryAction
from '@quickplaymod/quickplay-actions-js/dist/actions/clientbound/AddUserCountHistoryAction'
import AlterGlyphSubscriber from './subscribers/AlterGlyphSubscriber'
import DeleteGlyphSubscriber from './subscribers/DeleteGlyphSubscriber'
import AlterRegexSubscriber from './subscribers/AlterRegexSubscriber'
import DeleteRegexSubscriber from './subscribers/DeleteRegexSubscriber'
import LinkDiscordSubscriber from './subscribers/LinkDiscordSubscriber'
let redis : IORedis.Redis
let redisSub : IORedis.Redis
let actionBus : ActionBus
(async () => {
redis = await getRedis()
redisSub = await getRedisSubscriber()
await begin()
})()
/**
* Delete connection count data points in the database which are older than 24 hours.
*/
async function deleteOldConnectionDatapoints(): Promise<void> {
await mysqlPool.query('DELETE FROM connection_chart_datapoints WHERE ' +
'`timestamp` < NOW() - INTERVAL 1 DAY')
}
/**
* Begin the websocket server.
*/
async function begin() {
// Populate redis
console.log('Beginning population.')
await StateAggregator.populate()
console.log('Population complete. Initializing on port 80.')
// Create websocket server
const ws = new WebSocket.Server({ port: 80 })
// Timestamp of when this socket last sent out the current user count to all connected admin clients
const userCountLastSentToAdminsAt = 0
// How often user count should be sent to admins when it changes, in milliseconds
const userCountUpdateFrequency = 500
// All incoming messages from Redis subscriptions are handled here
redisSub.on('message', async (channel, message) => {
if(channel == 'list-change') {
const splitMsg = message.split(',')
const id = splitMsg[0]
const key = splitMsg[1]
let buf
if (id == AlterAliasedActionAction.id) {
const aa = await AliasedAction.deserialize(await redis.hget('aliasedActions', key))
buf = new SetAliasedActionAction(aa).build()
} else if (id == AlterButtonAction.id) {
const button = await Button.deserialize(await redis.hget('buttons', key))
buf = new SetButtonAction(button).build()
} else if (id == AlterScreenAction.id) {
const scr = await Screen.deserialize(await redis.hget('screens', key))
buf = new SetScreenAction(scr).build()
} else if (id == AlterTranslationAction.id) {
const translation = new Translation(key)
translation.lang = splitMsg[2]
translation.value = await redis.hget('lang:' + translation.lang, key)
buf = new SetTranslationAction(translation).build()
} else if (id == AlterRegexAction.id) {
buf = new SetRegexAction(new RegularExpression(key, await redis.hget('regexes', key))).build()
} else if (id == DeleteAliasedActionAction.id) {
buf = new RemoveAliasedActionAction(key).build()
} else if (id == DeleteButtonAction.id) {
buf = new RemoveButtonAction(key).build()
} else if (id == DeleteScreenAction.id) {
buf = new RemoveScreenAction(key).build()
} else if (id == DeleteRegexAction.id) {
buf = new RemoveRegexAction(key).build()
} else if (id == DeleteTranslationAction.id) {
const translation = new Translation(key)
translation.lang = splitMsg[2]
buf = new RemoveTranslationAction(translation).build()
}
ws.clients.forEach((conn) => {
if (conn.readyState !== WebSocket.OPEN) {
return
}
conn.send(buf)
})
} else if(channel == 'glyph-removals') {
const glyphUpdateBuffer = new RemoveGlyphAction(message).build()
ws.clients.forEach((conn) => {
if(conn.readyState !== WebSocket.OPEN) {
return
}
conn.send(glyphUpdateBuffer)
})
} else if(channel == 'glyph-updates') {
const newGlyph: Glyph = JSON.parse(message)
let glyphPath = newGlyph.path
if(glyphPath && !glyphPath.startsWith('http')) {
glyphPath = process.env.GLYPH_PROXY + newGlyph.path
}
const glyphUpdateBuffer = new SetGlyphForUserAction(newGlyph.uuid, glyphPath, newGlyph.height,
newGlyph.yOffset, newGlyph.displayInGames).build()
ws.clients.forEach((conn) => {
if(conn.readyState !== WebSocket.OPEN) {
return
}
conn.send(glyphUpdateBuffer)
})
} else if(channel == 'conn-notif') {
const now = Date.now()
if(now - userCountLastSentToAdminsAt < userCountUpdateFrequency) {
return
}
const newUserCountAction = new SetCurrentUserCountAction(parseInt(message))
for (const conn of ws.clients) {
if(conn.readyState !== WebSocket.OPEN) {
continue
}
if(! await (conn as unknown as {ctx: SessionContext}).ctx.getIsAdmin()) {
continue
}
conn.send(newUserCountAction.build())
}
}
})
// Create new action bus and add all subscriptions
actionBus = new ActionBus()
actionBus.subscribe(MigrateKeybindsAction, new MigrateKeybindsSubscriber())
const endAuthSub = new AuthEndHandshakeSubscriber()
actionBus.subscribe(AuthMojangEndHandshakeAction, endAuthSub)
actionBus.subscribe(AuthDiscordEndHandshakeAction, endAuthSub)
actionBus.subscribe(InitializeClientAction, new InitializeClientSubscriber())
actionBus.subscribe(AuthReestablishAuthedConnectionAction, new AuthReestablishAuthedConnectionSubscriber())
actionBus.subscribe(AlterScreenAction, new AlterScreenSubscriber())
actionBus.subscribe(DeleteScreenAction, new DeleteScreenSubscriber())
actionBus.subscribe(AlterButtonAction, new AlterButtonSubscriber())
actionBus.subscribe(DeleteButtonAction, new DeleteButtonSubscriber())
actionBus.subscribe(AlterAliasedActionAction, new AlterAliasedActionSubscriber())
actionBus.subscribe(DeleteAliasedActionAction, new DeleteAliasedActionSubscriber())
actionBus.subscribe(AlterTranslationAction, new AlterTranslationSubscriber())
actionBus.subscribe(DeleteTranslationAction, new DeleteTranslationSubscriber())
actionBus.subscribe(SetClientSettingsAction, new SetClientSettingsSubscriber())
actionBus.subscribe(ServerJoinedAction, new ServerJoinedSubscriber())
actionBus.subscribe(ServerLeftAction, new ServerLeftSubscriber())
actionBus.subscribe(AlterGlyphAction, new AlterGlyphSubscriber())
actionBus.subscribe(DeleteGlyphAction, new DeleteGlyphSubscriber())
actionBus.subscribe(AlterRegexAction, new AlterRegexSubscriber())
actionBus.subscribe(DeleteRegexAction, new DeleteRegexSubscriber())
actionBus.subscribe(LinkDiscordAction, new LinkDiscordSubscriber())
// Delete all data points when the server initially starts, and then every 24 hours.
setInterval(deleteOldConnectionDatapoints, 86400000)
await deleteOldConnectionDatapoints()
// Once a minute, try to add the current connection count to the database if it hasn't been added
// already within the past 5 minutes. This is the easiest solution to multiple instances of this script
// running, however it's not the most efficient.
setInterval(async () => {
const sqlConn = await mysqlPool.getConnection()
try {
// We lock the table so another instance of this program doesn't read or alter
// the table while we're working on it.
await sqlConn.query('LOCK TABLE connection_chart_datapoints WRITE')
const [resultsFromLastFive] = <RowDataPacket[]> await sqlConn.query('SELECT timestamp FROM \
connection_chart_datapoints WHERE timestamp > NOW() - INTERVAL 5 MINUTE')
if(resultsFromLastFive.length <= 0) {
const connections = await redis.get('connections')
await sqlConn.query('INSERT INTO connection_chart_datapoints (connection_count) VALUES (?)', [connections])
}
// Send client the connection data from last hour (i.e. last two points). There can't be multiple points
// in the graph for the same timestamp, so this is not an issue.
const [resultsFromLastTen] = <RowDataPacket[]> await sqlConn.query('SELECT `timestamp`, connection_count FROM \
connection_chart_datapoints WHERE `timestamp` > NOW() - INTERVAL 10 MINUTE')
for (const conn of ws.clients) {
if(await (conn as unknown as {ctx: SessionContext}).ctx.getIsAdmin()) {
for(let i = 0; i < resultsFromLastTen.length; i++) {
(conn as unknown as {ctx: SessionContext})
.ctx.sendAction(new AddUserCountHistoryAction(new Date(resultsFromLastTen[i].timestamp),
resultsFromLastTen[i].connection_count, false))
}
}
}
} catch(e) {
console.error(e)
}
await sqlConn.query('UNLOCK TABLES')
sqlConn.release()
}, 60000)
ws.on('connection', async function connection(conn) {
const ctx = new SessionContext(conn)
ctx.lastPong = new Date().getTime();
// Adding a reference to the connection ctx is the best way to do this.
(conn as unknown as {ctx: SessionContext}).ctx = ctx
conn.on('pong', () => {
ctx.lastPong = new Date().getTime()
})
conn.on('message', function incoming(message) {
if(!(message instanceof Buffer)) {
return
}
const action = Resolver.from(message)
actionBus.publish(action, ctx)
})
conn.on('close', async () => {
const connCount = await redis.decr('connections')
await redis.publish('conn-notif', connCount.toString())
if(ctx.authedResetTimeout != null) {
clearTimeout(ctx.authedResetTimeout)
ctx.authedResetTimeout = null
}
})
conn.on('error', async function (err) {
console.warn(err)
conn.terminate()
})
const connCount = await redis.incr('connections')
await redis.publish('conn-notif', connCount.toString())
})
// Ping clients every 30 seconds; Terminate clients which haven't responded in 60 seconds
const pingInterval = 30000
setInterval(() => {
const now = new Date().getTime()
ws.clients.forEach((conn) => {
if((conn as unknown as {ctx: SessionContext}).ctx.lastPong < now - pingInterval * 2) {
conn.terminate()
return
}
conn.ping()
})
}, pingInterval)
ws.on('error', async function (err) {
console.warn(err)
})
}