-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathround-and-tasking-service.test.js
272 lines (228 loc) · 9.49 KB
/
round-and-tasking-service.test.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import assert from 'assert'
import { after, before, beforeEach, describe, it } from 'node:test'
import { createPgPool } from '../lib/pool.js'
import { migrateWithPgClient } from '../lib/migrate.js'
import { DATABASE_URL, poolConfig } from '../lib/config.js'
import { RoundService } from '../lib/round-service.js'
import { TaskingService } from '../lib/tasking-service.js'
import { createApp } from '../lib/app.js'
import { withRound, withSubnetTasks } from './test-helpers.js'
const DEFAULT_CONFIG = {
roundDurationMs: 1000,
checkRoundExpirationIntervalMs: 200
}
describe('round and tasking service', () => {
/** @type {import('pg').Pool} */
let pgPool
/** @type {import('fastify').FastifyInstance} */
let app
/** @type {string} */
let baseUrl
before(async () => {
pgPool = await createPgPool(DATABASE_URL)
await migrateWithPgClient(pgPool)
app = createApp({
databaseUrl: DATABASE_URL,
dbPoolConfig: poolConfig,
logger: {
level:
process.env.DEBUG === '*' || process.env.DEBUG?.includes('test')
? 'debug'
: 'error'
}
})
baseUrl = await app.listen()
})
after(async () => {
await app.close()
await pgPool.end()
})
beforeEach(async () => {
// Reset the database state before each test
await pgPool.query('DELETE FROM checker_rounds')
await pgPool.query('ALTER SEQUENCE checker_rounds_id_seq RESTART WITH 1')
await pgPool.query('DELETE FROM checker_subnet_tasks')
await pgPool.query('ALTER SEQUENCE checker_subnet_tasks_id_seq RESTART WITH 1')
})
describe('RoundService', () => {
describe('rounds', () => {
it('should create a new round if no active round exists', async () => {
const taskingService = new TaskingService(pgPool)
const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG)
await roundService.start()
roundService.stop()
const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true')
assert.strictEqual(rounds.length, 1)
assert.ok(new Date(rounds[0].end_time) > new Date())
})
it('should resume an active round if one exists', async () => {
await withRound({
pgPool,
roundDurationMs: DEFAULT_CONFIG.roundDurationMs,
active: true
})
const taskingService = new TaskingService(pgPool)
const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG)
await roundService.start()
roundService.stop()
const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true')
assert.strictEqual(rounds.length, 1)
})
it('should stop the round service and prevent further round checks', async () => {
const taskingService = new TaskingService(pgPool)
const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG)
await roundService.start()
roundService.stop()
const { rows: rounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true')
assert.strictEqual(rounds.length, 1)
// Wait for the check interval to pass and ensure no new rounds are created
await new Promise(resolve => setTimeout(resolve, DEFAULT_CONFIG.checkRoundExpirationIntervalMs + 1000))
const { rows: newRounds } = await pgPool.query('SELECT * FROM checker_rounds')
assert.strictEqual(newRounds.length, 1)
})
})
describe('round transitions', () => {
it('should deactivate the old round and create a new one when the current round ends', async () => {
await withRound({
pgPool,
roundDurationMs: 1000, // 1 second duration
active: true
})
const taskingService = new TaskingService(pgPool)
const roundService = new RoundService(pgPool, taskingService, DEFAULT_CONFIG)
await roundService.start()
// Wait for the current round to end
await new Promise(resolve => setTimeout(resolve, 2000))
roundService.stop()
const { rows: activeRounds } = await pgPool.query('SELECT * FROM checker_rounds WHERE active = true')
const { rows: allRounds } = await pgPool.query('SELECT * FROM checker_rounds')
assert.strictEqual(activeRounds.length, 1)
assert.strictEqual(allRounds.length, 2)
})
})
})
describe('TaskingService', () => {
describe('registerTaskSampler', () => {
it('should register a task sampler for a subnet', () => {
const taskingService = new TaskingService(pgPool)
const samplerFn = async () => []
taskingService.registerTaskSampler('subnet1', samplerFn)
assert.doesNotThrow(() => taskingService.registerTaskSampler('subnet1', samplerFn))
})
it('should throw an error if samplerFn is not a function', () => {
const taskingService = new TaskingService(pgPool)
assert.throws(
// @ts-ignore
() => taskingService.registerTaskSampler('subnet1', null),
/Task sampler for subnet subnet1 must be a function/
)
})
})
describe('task generation', () => {
it('should generate tasks for all registered subnets that dont throw errors', async () => {
const taskingService = new TaskingService(pgPool)
taskingService.registerTaskSampler('subnet1', async () => [
{ payloadId: 'task1', nodeId: 'node1' },
{ payloadId: 'task2', nodeId: 'node2' }
])
taskingService.registerTaskSampler('subnet2', async () => [
{ payloadId: 'task3', nodeId: 'node3' }
])
taskingService.registerTaskSampler('subnet3', async () => {
throw new Error('Error sampling tasks')
})
const round = await withRound({
pgPool,
roundDurationMs: 1000, // 1 second duration
active: true
})
await taskingService.generateTasksForRound(round.id)
const { rows: tasks } = await pgPool.query('SELECT * FROM checker_subnet_tasks WHERE round_id = $1', [round.id])
const taskPayloads = tasks.map(task => task.task_definition)
assert.deepStrictEqual(taskPayloads.sort(), [
{ payloadId: 'task1', nodeId: 'node1' },
{ payloadId: 'task2', nodeId: 'node2' },
{ payloadId: 'task3', nodeId: 'node3' }
])
})
it('should not generate tasks if no samplers are registered', async () => {
const taskingService = new TaskingService(pgPool)
const round = await withRound({
pgPool,
roundDurationMs: 1000, // 1 second duration
active: true
})
taskingService.generateTasksForRound(round.id)
const { rows: tasks } = await pgPool.query('SELECT * FROM checker_subnet_tasks WHERE round_id = $1', [round.id])
assert.strictEqual(tasks.length, 0)
})
})
})
describe('round API routes', () => {
describe('GET /rounds/current', () => {
it('should return the current active round with tasks', async () => {
const round = await withRound({
pgPool,
roundDurationMs: 1000, // 1 second duration
active: true
})
await withSubnetTasks(pgPool, round.id, 'walrus', { key: 'value' })
/** @type {any} */
const response = await fetch(`${baseUrl}/rounds/current`)
assert.strictEqual(response.status, 200)
const responseBody = await response.json()
assert.equal(responseBody.id, round.id)
assert.strictEqual(responseBody.active, true)
assert.strictEqual(responseBody.tasks.length, 1)
assert.deepStrictEqual(responseBody.tasks, [{
subnet: 'walrus',
task_definition: { key: 'value' }
}])
})
it('should return 404 if no current active', async () => {
/** @type {any} */
const response = await fetch(`${baseUrl}/rounds/current`)
assert.strictEqual(response.status, 404)
// insert inactive round
await withRound({
pgPool,
roundDurationMs: 1000, // 1 second duration
active: false
})
/** @type {any} */
const secondResponse = await fetch(`${baseUrl}/rounds/current`)
assert.strictEqual(secondResponse.status, 404)
})
})
describe('GET /rounds/:roundId', () => {
it('should return the round with the specified ID and its tasks', async () => {
const round = await withRound({
pgPool,
roundDurationMs: 1000, // 1 second duration
active: false
})
await withSubnetTasks(pgPool, round.id, 'arweave', { key: 'value' })
/** @type {any} */
const response = await fetch(`${baseUrl}/rounds/${round.id}`)
assert.strictEqual(response.status, 200)
const responseBody = await response.json()
assert.equal(responseBody.id, round.id)
assert.strictEqual(responseBody.active, false)
assert.strictEqual(responseBody.tasks.length, 1)
assert.deepStrictEqual(responseBody.tasks, [{
subnet: 'arweave',
task_definition: { key: 'value' }
}])
})
it('should return 400 if roundId is not a number', async () => {
const response = await fetch(`${baseUrl}/rounds/invalid`)
assert.strictEqual(response.status, 400)
})
it('should return 404 if round is not found', async () => {
/** @type {any} */
const response = await fetch(`${baseUrl}/rounds/999999`)
assert.strictEqual(response.status, 404)
})
})
})
})