-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathserver.js
80 lines (69 loc) · 2.34 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
const net = require('net');
const { EventEmitter } = require('events');
const { executeSELECTQuery, executeINSERTQuery, executeDELETEQuery } = require('./queryExecuter');
class QueryQueue extends EventEmitter {
constructor() {
super();
this.queue = [];
this.isProcessing = false;
}
addQuery(queryId, query, callback) {
this.queue.push({ query, callback, queryId });
this.emit('newQuery');
}
processQueue() {
if (this.isProcessing || this.queue.length === 0) {
return;
}
this.isProcessing = true;
const { query, callback, queryId } = this.queue.shift();
this.execute(query)
.then(result => callback(null, queryId, result))
.catch(error => callback(error, queryId))
.finally(() => {
this.isProcessing = false;
this.processQueue();
});
}
async execute(query) {
if (query.toLowerCase().startsWith('select')) {
return await executeSELECTQuery(query);
} else if (query.toLowerCase().startsWith('insert into')) {
return await executeINSERTQuery(query);
} else if (query.toLowerCase().startsWith('delete from')) {
return await executeDELETEQuery(query);
} else {
throw new Error('Unsupported command');
}
}
}
const queryQueue = new QueryQueue();
queryQueue.on('newQuery', () => queryQueue.processQueue());
const server = net.createServer();
let activeConnection = false;
server.on('connection', (socket) => {
if (activeConnection) {
socket.end('Another connection is already active.');
return;
}
activeConnection = true;
socket.write('Connected\n');
socket.on('data', (data) => {
const [queryId, query] = data.toString().trim().split(':', 2);
queryQueue.addQuery(queryId, query, (error, queryId, result) => {
let response;
if (error) {
response = `${queryId}<|>Error: ${error.message}`;
} else {
response = `${queryId}<|>${JSON.stringify(result)}`;
}
socket.write(response + '\n');
});
});
socket.on('close', () => {
activeConnection = false;
});
});
server.listen(5432, () => {
console.log('Server listening on port 5432');
});