-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdhtNode.js
140 lines (110 loc) · 4.17 KB
/
dhtNode.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// dhtNode.js
const murmurhash3 = require('./murmurhash3');
const kademlia = require('./kademlia');
class VectorDHTNode {
constructor(nodeId, ip, port) {
this.nodeId = nodeId;
this.ip = ip;
this.port = port;
this.routingTable = {};
this.vectorDb = {};
this.dimension = 3;
}
async handleRequest(reader, writer) {
const data = await reader.read(10000);
const message = JSON.parse(data.toString());
if (message.action === 'lookup') {
const key = message.key;
const vector = this.vectorDb[key] || null;
const response = { result: vector };
writer.write(JSON.stringify(response));
} else if (message.action === 'insert') {
const key = message.key;
const vector = message.value;
this.vectorDb[key] = vector;
const response = { result: 'OK' };
writer.write(JSON.stringify(response));
}
await writer.drain();
writer.close();
}
async startServer() {
const server = await require('net').createServer((socket) => {
this.handleRequest(socket, socket);
});
server.listen(this.port, this.ip, () => {
console.log(`Node ${this.nodeId} is listening on
${this.ip}:${this.port}`);
});
// Periodically update the routing table for load balancing
setInterval(() => {
kademlia.updateRoutingTable(this);
}, 5000);
}
async joinNetwork(existingNode) {
const existingNodeAddress = { host: existingNode.ip, port:
existingNode.port };
const request = { action: 'join', nodeId: this.nodeId, ip:
this.ip, port: this.port };
const socket = await
require('net').createConnection(existingNodeAddress);
socket.write(JSON.stringify(request));
socket.end();
}
murmurhash3_32(data) {
return murmurhash3.hash(data) >>> 0; // Unsigned 32-bit integer
}
generateKey(data) {
return this.murmurhash3_32(data).toString();
}
async lookup(key) {
const targetKey = this.generateKey(key);
const nodeId = this.findNode(targetKey);
const nodeAddress = this.routingTable[nodeId];
const request = { action: 'lookup', key };
const socket = await require('net').createConnection(nodeAddress);
socket.write(JSON.stringify(request));
const response = JSON.parse(await new Promise((resolve) => {
socket.on('data', (data) => resolve(data.toString()));
}));
socket.end();
return response.result;
}
async insert(key, vector) {
const targetKey = this.generateKey(key);
const nodeId = this.findNode(targetKey);
const nodeAddress = this.routingTable[nodeId];
const request = { action: 'insert', key, value: vector };
const socket = await require('net').createConnection(nodeAddress);
socket.write(JSON.stringify(request));
const response = JSON.parse(await new Promise((resolve) => {
socket.on('data', (data) => resolve(data.toString()));
}));
socket.end();
// Handle data persistence and replication
await this.handleDataPersistence(key, vector, nodeId);
return response.result;
}
findNode(targetKey) {
const closestNodes = Object.keys(this.routingTable).sort((a, b) =>
{
const xorDistA = kademlia.calculateXORDistance(targetKey, a);
const xorDistB = kademlia.calculateXORDistance(targetKey, b);
return xorDistA - xorDistB;
});
return closestNodes[0];
}
async handleNodeFailure(failedNode) {
// Implement logic to handle the failure of a node
// This could involve redistributing the data hosted by the failed
node to other healthy nodes
}
async handleDataPersistence(key, vector, nodeId) {
// Implement logic for data persistence and replication
// This could involve replicating the data to other nodes and
ensuring data consistency
// Also, handle the case when a node fails after successful data
insertion
}
}
module.exports = VectorDHTNode;