Skip to content

Commit 6929239

Browse files
committed
mvp works well. added historical weekly and monthly. added cli arg for day or default to yesterdays day
1 parent a6c8db0 commit 6929239

9 files changed

Lines changed: 225 additions & 69 deletions

File tree

.prettierrc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{}

.vscode/settings.json

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
11
{
22
"editor.formatOnSave": true,
3-
"prettier.enable": false
3+
"prettier.enable": true,
4+
"prettier.configPath": ".prettierrc",
5+
"prettier.singleQuote": true,
6+
"prettier.printWidth": 100,
7+
"editor.defaultFormatter": "esbenp.prettier-vscode",
8+
"prettier.documentSelectors": [
9+
"**/*.{ts,js}"
10+
]
411
}

README.md

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# synthesize-events
22

3-
Synthesizes, processes, calculates various statistics about events that have taken place in the recent past. It reads event data from redis and then stores the synthesized data in other redis keys.
3+
Synthesizes, processes, calculates various statistics about events that have taken place in the recent past. It reads event data from redis and then stores the synthesized data in other redis keys. Every day or date is UTC timezone.
44

55
create a .env file or set env vars:
66

@@ -22,27 +22,18 @@ podman run \
2222

2323
Manually set redis values with `redis-cli` and `set <key> <value>` command
2424

25-
With node and npm installed, run
25+
With node and npm installed, to index yesterday UTC time, run
2626
`npm install` then `npm run watchIndex` (careful this will re-run on any file save)
2727

28+
Run this on a specific day with `npm run watchIndex -- --day '2024-01-30'`
29+
NOTE!: Indexing for a specific day, relies on the previous day's index. So if you want accurate "monthly active nodes" counts and indexing, then index a months worth of days first before doing a specific day.
30+
2831
# Redis keys
2932

3033
Event data read from `event::<event-id>` and `eventsByDay::<yyyy-mm-dd>` is synthesized and then the resulting data is stored in `metrics::` hashes
3134

3235
## Impact Dashboard keys
3336
The types can be found in `src/redisTypes.ts`
34-
### JSON
35-
`node::<node_id>`
36-
`service::<service_id>`
37-
`user::<user_id>`
38-
`activeUsersByDay`
39-
`activeNodesByDay`
40-
41-
### Sets
42-
`activeUsers`
43-
`activeNodes`
44-
45-
4637

4738
# Troubleshooting
4839

package-lock.json

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@
2525
"@upstash/redis": "^1.28.2",
2626
"date-fns": "^3.3.1",
2727
"dotenv": "^16.3.1",
28+
"minimist": "^1.2.8",
2829
"ndjson": "^2.0.0",
2930
"node-fetch": "^2.7.0"
3031
},
3132
"devDependencies": {
33+
"@types/minimist": "^1.2.5",
3234
"@types/ndjson": "^2.0.4",
3335
"@types/node": "^20.11.5",
3436
"@typescript-eslint/eslint-plugin": "^6.19.1",

src/activeNodesIndexing.ts

Lines changed: 111 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,83 @@
1-
import { dailyActiveNodesSet, monthlyActiveNodesSet, weeklyActiveNodesSet, type NodeJson, nodePrefix, makeAKeyByDay, dailyActiveNodesSetByDay, dailyActiveNodesByDay, weeklyActiveNodesByDay, monthlyActiveNodesByDay } from './redisTypes'
1+
import {
2+
type NodeJson, nodePrefix, makeAKeyByDay, dailyActiveNodesSetByDay,
3+
dailyActiveNodesByDay, weeklyActiveNodesByDay, monthlyActiveNodesByDay, weeklyActiveNodesSetByDay, monthlyActiveNodesSetByDay
4+
} from './redisTypes'
25
import { impactDashRedisClient, iterateSet } from './RedisClient'
3-
import { DAY_TO_INDEX_YYYY_MM_DD } from './index'
46

57
const ONE_DAY_MILLISECONDS = 24 * 60 * 60 * 1000 // 24 hours, 60 minutes per hour, 60 seconds per minute, 1000 milliseconds per second
68
const ONE_WEEK_MILLISECONDS = 7 * ONE_DAY_MILLISECONDS // 7 days in a week
79
const ONE_MONTH_MILLISECONDS = 30 * ONE_DAY_MILLISECONDS // Assuming 30 days in a month for simplicity
810

9-
const now = new Date()
10-
// todo: do yesterday
11-
const startOfDay = new Date(now.getFullYear(), now.getMonth(), now.getDate())
12-
const timestampStartOfDay = startOfDay.getTime()
11+
// Separate thresholds for when the indexing day is different from today (yesterday?)
12+
let dayToIndexYyyyMmDd: string | null = null
13+
let indexDayStartOfDay: Date | null = null
14+
let indexDayTimestampStartOfDay: number | null = null
1315

14-
const dailyActiveThresholdMs = timestampStartOfDay - ONE_DAY_MILLISECONDS
15-
const weeklyActiveThresholdMs = timestampStartOfDay - ONE_WEEK_MILLISECONDS
16-
const monthlyActiveThresholdMs = timestampStartOfDay - ONE_MONTH_MILLISECONDS
17-
console.log('Active nodes indexing thresholds (daily, weekly, monthly): ', dailyActiveThresholdMs, weeklyActiveThresholdMs, monthlyActiveThresholdMs)
16+
let dailyThresholdTimeMs: number | null = null
17+
let weeklyThresholdTimeMs: number | null = null
18+
let monthlyThresholdTimeMs: number | null = null
19+
console.log('Active nodes sets indexing threshold timestamps (daily, weekly, monthly): ',
20+
dailyThresholdTimeMs, weeklyThresholdTimeMs, monthlyThresholdTimeMs)
1821

1922
/**
20-
* Used to keep track of historically active nodes by day
23+
* Used to keep track of historically active nodes by day, week, and month
2124
*/
22-
const dailyActiveNodesByDayKey = makeAKeyByDay(dailyActiveNodesSetByDay, DAY_TO_INDEX_YYYY_MM_DD)
25+
let dailyActiveNodesByDayKey: string | null = null
26+
let weeklyActiveNodesByDayKey: string | null = null
27+
let monthlyActiveNodesByDayKey: string | null = null
28+
let dayBeforeWeeklyActiveNodesByDayKey: string | null = null
29+
let dayBeforeMonthlyActiveNodesByDayKey: string | null = null
30+
31+
export const initializeDateConstants = (dayBeforeDayToIndexYyyyMmDd: string,
32+
dayToIndexDate: Date, argDayToIndexYyyyMmDd: string): void => {
33+
dayToIndexYyyyMmDd = argDayToIndexYyyyMmDd
34+
// Separate thresholds for when the indexing day is different from today (yesterday?)
35+
indexDayStartOfDay = new Date(dayToIndexDate.getFullYear(), dayToIndexDate.getMonth(), dayToIndexDate.getDate())
36+
indexDayTimestampStartOfDay = indexDayStartOfDay.getTime()
37+
38+
dailyThresholdTimeMs = indexDayTimestampStartOfDay - ONE_DAY_MILLISECONDS
39+
weeklyThresholdTimeMs = indexDayTimestampStartOfDay - ONE_WEEK_MILLISECONDS
40+
monthlyThresholdTimeMs = indexDayTimestampStartOfDay - ONE_MONTH_MILLISECONDS
41+
console.log('Active nodes sets indexing threshold timestamps (daily, weekly, monthly): ',
42+
dailyThresholdTimeMs, weeklyThresholdTimeMs, monthlyThresholdTimeMs)
43+
44+
/**
45+
* Used to keep track of historically active nodes by day, week, and month
46+
*/
47+
dailyActiveNodesByDayKey = makeAKeyByDay(dailyActiveNodesSetByDay, dayToIndexYyyyMmDd)
48+
weeklyActiveNodesByDayKey = makeAKeyByDay(weeklyActiveNodesSetByDay, dayToIndexYyyyMmDd)
49+
monthlyActiveNodesByDayKey = makeAKeyByDay(monthlyActiveNodesSetByDay, dayToIndexYyyyMmDd)
50+
dayBeforeWeeklyActiveNodesByDayKey = makeAKeyByDay(weeklyActiveNodesSetByDay, dayBeforeDayToIndexYyyyMmDd)
51+
dayBeforeMonthlyActiveNodesByDayKey = makeAKeyByDay(monthlyActiveNodesSetByDay, dayBeforeDayToIndexYyyyMmDd)
52+
}
2353

2454
/**
25-
* This function looks a node's lastRunningTimestampMs to see if it
26-
* has been active recently. Using this and the node's other metadata,
27-
* this function indexes active nodes by specId, region, os, etc.
55+
* This function takes a node report and looks a node's lastRunningTimestampMs
56+
* to see if it has been active for the day being indexed.
57+
* modifies day/week/month...NodesByDay sets
2858
* @param nodeJson
2959
*/
3060
export const indexSingleNodeReport = async (nodeJson: NodeJson): Promise<void> => {
3161
console.log('indexSingleNodeReport nodeId: ', nodeJson.nodeId)
32-
// if node is active, save to active node sets
62+
if (dailyThresholdTimeMs === null || weeklyThresholdTimeMs === null || monthlyThresholdTimeMs === null ||
63+
dailyActiveNodesByDayKey === null || weeklyActiveNodesByDayKey === null || monthlyActiveNodesByDayKey === null) {
64+
throw new Error('A required time, date, or key is null.')
65+
}
3366
if (nodeJson.lastRunningTimestampMs == null) {
3467
return
3568
}
36-
if (nodeJson.lastRunningTimestampMs >= dailyActiveThresholdMs) {
37-
await impactDashRedisClient.addToSet(dailyActiveNodesSet, nodeJson.nodeId)
69+
70+
// console.log(`node last running: ${nodeJson.lastRunningTimestampMs}, monthly thres`)
71+
// if node is active, save to active node sets
72+
if (nodeJson.lastRunningTimestampMs >= dailyThresholdTimeMs) {
3873
await impactDashRedisClient.addToSet(dailyActiveNodesByDayKey, nodeJson.nodeId)
3974
}
40-
if (nodeJson.lastRunningTimestampMs >= weeklyActiveThresholdMs) {
41-
await impactDashRedisClient.addToSet(weeklyActiveNodesSet, nodeJson.nodeId)
75+
if (nodeJson.lastRunningTimestampMs >= weeklyThresholdTimeMs) {
76+
await impactDashRedisClient.addToSet(weeklyActiveNodesByDayKey, nodeJson.nodeId)
4277
}
43-
if (nodeJson.lastRunningTimestampMs >= monthlyActiveThresholdMs) {
44-
await impactDashRedisClient.addToSet(monthlyActiveNodesSet, nodeJson.nodeId)
78+
if (nodeJson.lastRunningTimestampMs >= monthlyThresholdTimeMs) {
79+
console.log(`Monthly active set: adding ${nodeJson.nodeId} to set ${monthlyActiveNodesByDayKey}`)
80+
await impactDashRedisClient.addToSet(monthlyActiveNodesByDayKey, nodeJson.nodeId)
4581
}
4682
}
4783

@@ -70,43 +106,54 @@ const monthlyActiveNodesIndex: activeNodesIndex = {
70106
country: {}
71107
}
72108

109+
/**
110+
* Looks at a node's lastRunningTimestampMs to see if it should be included in an active node set.
111+
* If the node is active, then the node's metadata will be indexed in the global index vars:
112+
* dailyActiveNodesIndex, weeklyActiveNodesIndex, monthlyActiveNodesIndex
113+
* @param setKey a redis set key. daily, weekly, or monthly.
114+
* @returns
115+
*/
73116
const processNode = (
74-
setKey: typeof dailyActiveNodesSet | typeof weeklyActiveNodesSet | typeof monthlyActiveNodesSet
117+
setKey: string,
118+
timePeriod: 'daily' | 'weekly' | 'monthly'
75119
): (nodeIdStrOrNum: string | number) => Promise<void> => {
76120
return async (nodeIdStrOrNum: string | number): Promise<void> => {
121+
if (dailyThresholdTimeMs === null || weeklyThresholdTimeMs === null || monthlyThresholdTimeMs === null) {
122+
throw new Error('A required time, date, or key is null.')
123+
}
77124
const nodeId = nodeIdStrOrNum as string
78125
const node: NodeJson = await impactDashRedisClient.client.json.get(`${nodePrefix}${nodeId}`)
79126

80127
// console.log(`Indexing. processNode ${nodeId}, setKey ${setKey}, node ${JSON.stringify(node)}`)
81128

82129
let indexToUpdate
83-
if (setKey === dailyActiveNodesSet) {
84-
if (node.lastRunningTimestampMs == null || node.lastRunningTimestampMs < dailyActiveThresholdMs) {
130+
if (timePeriod === 'daily') {
131+
if (node.lastRunningTimestampMs == null || node.lastRunningTimestampMs < dailyThresholdTimeMs) {
85132
console.log(`Removing ${nodeId} from daily set`)
86-
await impactDashRedisClient.removeFromSet(dailyActiveNodesSet, nodeId)
133+
await impactDashRedisClient.removeFromSet(setKey, nodeId)
87134
} else {
88135
// node was detected running that day, update indexing data
89136
indexToUpdate = dailyActiveNodesIndex
90137
}
91-
} else if (setKey === weeklyActiveNodesSet) {
92-
if (node.lastRunningTimestampMs == null || node.lastRunningTimestampMs < weeklyActiveThresholdMs) {
138+
} else if (timePeriod === 'weekly') {
139+
if (node.lastRunningTimestampMs == null || node.lastRunningTimestampMs < weeklyThresholdTimeMs) {
93140
console.log(`Removing ${nodeId} from weekly set`)
94-
await impactDashRedisClient.removeFromSet(weeklyActiveNodesSet, nodeId)
141+
await impactDashRedisClient.removeFromSet(setKey, nodeId)
95142
} else {
96143
// node was detected running that week, update indexing data
97144
indexToUpdate = weeklyActiveNodesIndex
98145
}
99-
} else if (setKey === monthlyActiveNodesSet) {
100-
if (node.lastRunningTimestampMs == null || node.lastRunningTimestampMs < monthlyActiveThresholdMs) {
101-
console.log(`Removing ${nodeId} from monthly set...${node.lastRunningTimestampMs} and ${monthlyActiveThresholdMs}`)
102-
await impactDashRedisClient.removeFromSet(monthlyActiveNodesSet, nodeId)
146+
} else if (timePeriod === 'monthly') {
147+
if (node.lastRunningTimestampMs == null || node.lastRunningTimestampMs < monthlyThresholdTimeMs) {
148+
console.warn(`Removing ${nodeId} from monthly set...${node.lastRunningTimestampMs} and ${monthlyThresholdTimeMs}`)
149+
await impactDashRedisClient.removeFromSet(setKey, nodeId)
103150
} else {
104151
// node was detected running that month, update indexing data
105152
indexToUpdate = monthlyActiveNodesIndex
106153
}
107154
}
108155

109-
// if undefined, setKey is not an expected value
156+
// if undefined, then the node was removed from the active set and should not be counted in the index
110157
if (indexToUpdate !== undefined) {
111158
console.log(`Indexing ${nodeId} ${JSON.stringify(node)} to ${JSON.stringify(indexToUpdate)} set`)
112159
indexToUpdate.count++
@@ -126,8 +173,24 @@ const processNode = (
126173
}
127174
}
128175

176+
export const copyPreviousWeekMonthActiveNodes = async (): Promise<void> => {
177+
if (dailyThresholdTimeMs === null || weeklyThresholdTimeMs === null || monthlyThresholdTimeMs === null ||
178+
dailyActiveNodesByDayKey === null || weeklyActiveNodesByDayKey === null || monthlyActiveNodesByDayKey === null ||
179+
dayToIndexYyyyMmDd === null || dayBeforeWeeklyActiveNodesByDayKey === null || dayBeforeMonthlyActiveNodesByDayKey === null) {
180+
throw new Error('A required time, date, or key is null.')
181+
}
182+
// First copy previous day's non-daily active node sets to the index day's sets before updating
183+
// day doesn't need copied, because it is ONLY created from the daily event reports that are processed in the next step
184+
console.log(`Redis copying set ${dayBeforeWeeklyActiveNodesByDayKey} to ${weeklyActiveNodesByDayKey}`)
185+
const copyResult = await impactDashRedisClient.client.copy(dayBeforeWeeklyActiveNodesByDayKey, weeklyActiveNodesByDayKey, { replace: true })
186+
console.log('Copy result, ', copyResult)
187+
console.log(`Redis copying set ${dayBeforeMonthlyActiveNodesByDayKey} to ${monthlyActiveNodesByDayKey}`)
188+
const copyResult2 = await impactDashRedisClient.client.copy(dayBeforeMonthlyActiveNodesByDayKey, monthlyActiveNodesByDayKey, { replace: true })
189+
console.log('Copy result2, ', copyResult2)
190+
}
191+
129192
/**
130-
* // for each node
193+
* // for each node in a event report
131194
// if not active "enough", remove from set
132195
// else
133196
// bump count for set
@@ -138,15 +201,19 @@ const processNode = (
138201
// store count of each region, type, etc. byDay
139202
*/
140203
export const indexActiveNodeSets = async (): Promise<void> => {
141-
// clean active sets - iterate daily, weekly, monthly sets
142-
await iterateSet(impactDashRedisClient, monthlyActiveNodesSet, processNode(monthlyActiveNodesSet))
143-
await iterateSet(impactDashRedisClient, weeklyActiveNodesSet, processNode(weeklyActiveNodesSet))
144-
await iterateSet(impactDashRedisClient, dailyActiveNodesSet, processNode(dailyActiveNodesSet))
204+
if (dailyThresholdTimeMs === null || weeklyThresholdTimeMs === null || monthlyThresholdTimeMs === null ||
205+
dailyActiveNodesByDayKey === null || weeklyActiveNodesByDayKey === null || monthlyActiveNodesByDayKey === null ||
206+
dayToIndexYyyyMmDd === null || dayBeforeWeeklyActiveNodesByDayKey === null || dayBeforeMonthlyActiveNodesByDayKey === null) {
207+
throw new Error('A required time, date, or key is null.')
208+
}
209+
210+
// clean and index active sets - iterate daily, weekly, monthly sets and count active by type, region, etc.
211+
await iterateSet(impactDashRedisClient, dailyActiveNodesByDayKey, processNode(dailyActiveNodesByDayKey, 'daily'))
212+
await iterateSet(impactDashRedisClient, weeklyActiveNodesByDayKey, processNode(weeklyActiveNodesByDayKey, 'weekly'))
213+
await iterateSet(impactDashRedisClient, monthlyActiveNodesByDayKey, processNode(monthlyActiveNodesByDayKey, 'monthly'))
145214

146215
// save indexed data
147-
await impactDashRedisClient.client.json.set(makeAKeyByDay(dailyActiveNodesByDay, DAY_TO_INDEX_YYYY_MM_DD), '$', dailyActiveNodesIndex)
148-
await impactDashRedisClient.client.json.set(makeAKeyByDay(weeklyActiveNodesByDay, DAY_TO_INDEX_YYYY_MM_DD), '$', weeklyActiveNodesIndex)
149-
await impactDashRedisClient.client.json.set(makeAKeyByDay(monthlyActiveNodesByDay, DAY_TO_INDEX_YYYY_MM_DD), '$', monthlyActiveNodesIndex)
216+
await impactDashRedisClient.client.json.set(makeAKeyByDay(dailyActiveNodesByDay, dayToIndexYyyyMmDd), '$', dailyActiveNodesIndex)
217+
await impactDashRedisClient.client.json.set(makeAKeyByDay(weeklyActiveNodesByDay, dayToIndexYyyyMmDd), '$', weeklyActiveNodesIndex)
218+
await impactDashRedisClient.client.json.set(makeAKeyByDay(monthlyActiveNodesByDay, dayToIndexYyyyMmDd), '$', monthlyActiveNodesIndex)
150219
}
151-
152-
// todo: retro fill in active nodes

0 commit comments

Comments
 (0)