Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"rules": {
"@typescript-eslint/explicit-function-return-type": "warn",
"@typescript-eslint/strict-boolean-expressions": "warn",
"@typescript-eslint/no-non-null-assertion": "off",
"simple-import-sort/sort": "error"
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"cleaners": "^0.3.12",
"compression": "^1.7.4",
"cors": "^2.8.5",
"edge-server-tools": "^0.2.11",
"edge-server-tools": "^0.2.13",
"express": "^4.17.1",
"firebase-admin": "^8.12.1",
"morgan": "^1.10.0",
Expand Down
64 changes: 64 additions & 0 deletions src/NotificationManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import io from '@pm2/io'
import admin from 'firebase-admin'

import { ApiKey } from './models'

import BatchResponse = admin.messaging.BatchResponse

const successCounter = io.counter({
id: 'notifications:success:total',
name: 'Total Successful Notifications'
})
const failureCounter = io.counter({
id: 'notifications:failure:total',
name: 'Total Failed Notifications'
})

export const createNotificationManager = async (
apiKey: ApiKey | string
): Promise<admin.app.App> => {
if (typeof apiKey === 'string') apiKey = await ApiKey.fetch(apiKey)

const name = `app:${apiKey.appId}`
let app: admin.app.App
try {
app = admin.app(name)
} catch (err) {
app = admin.initializeApp(
{
credential: admin.credential.cert(apiKey.adminsdk)
},
name
)
}
return app
}

export const sendNotification = async (
app: admin.app.App,
title: string,
body: string,
tokens: string[],
data = {}
): Promise<BatchResponse> => {
const message: admin.messaging.MulticastMessage = {
notification: {
title,
body
},
data,
tokens
}

try {
const response = await app.messaging().sendMulticast(message)

successCounter.inc(response.successCount)
failureCounter.inc(response.failureCount)

return response
} catch (err) {
console.error(JSON.stringify(err, null, 2))
throw err
}
}
14 changes: 0 additions & 14 deletions src/api/index.ts

This file was deleted.

40 changes: 24 additions & 16 deletions src/api/router.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { asArray, asObject, asString, asUnknown } from 'cleaners'
import { asArray, asObject, asString } from 'cleaners'
import {
type HttpRequest,
type HttpResponse,
Expand All @@ -10,10 +10,13 @@ import {
jsonResponse,
statusCodes,
statusResponse
} from '../types/response-types'
} from '../types/http/response-types'
import { asAction } from '../types/task/Action'
import { asActionEffect } from '../types/task/ActionEffect'
import {
DbDoc,
asTaskDoc,
logger,
TaskDoc,
wrappedDeleteFromDb,
wrappedGetFromDb,
wrappedSaveToDb
Expand Down Expand Up @@ -51,31 +54,34 @@ const getTaskRoute = async (request: HttpRequest): Promise<HttpResponse> => {
}

// Construct a body and returns it as an HttpResponse.
// The body should have triggers, action, and taskId.
// The body should have actionEffects, action, userId, _id and taskId.
const createTaskRoute = async (request: HttpRequest): Promise<HttpResponse> => {
try {
const asBody = asObject({
taskId: asString,
triggers: asArray(asUnknown),
action: asUnknown
actionEffects: asArray(asActionEffect),
action: asAction
})

const queryObject = getQueryParamObject(
['taskId', 'triggers', 'action'],
['taskId', 'actionEffects', 'action'],
request.path
)
const triggersAsString = queryObject.triggers
const triggersAsArray = convertStringToArray(triggersAsString)
queryObject.triggers = triggersAsArray ?? []
const { taskId, triggers, action } = asBody(queryObject)
const actionEffectsAsString = queryObject.actionEffects
const actionEffectsAsArray = convertStringToArray(actionEffectsAsString)
queryObject.actionEffects = actionEffectsAsArray ?? []
const { taskId, actionEffects, action } = asBody(queryObject)
const cleanedAction = asAction(action)

const doc: DbDoc = {
taskId,
const doc: TaskDoc = asTaskDoc({
taskId: taskId,
userId: request.headers.userId,
triggers,
action,
actionEffects: actionEffects.map(actionEffect =>
asActionEffect(actionEffect)
),
cleanedAction,
_id: `${request.headers.userId}:${taskId}` // To help with partitioning
}
})

await wrappedSaveToDb([doc])
return statusResponse(statusCodes.SUCCESS, 'Successfully created the task')
Expand All @@ -85,6 +91,8 @@ const createTaskRoute = async (request: HttpRequest): Promise<HttpResponse> => {
}
}

// Remove tasks from the database. If the taskIds array is empty, it
// will delete all tasks under the userId.
const deleteTaskRoute = async (request: HttpRequest): Promise<HttpResponse> => {
try {
const asQuery = asObject({
Expand Down
17 changes: 0 additions & 17 deletions src/config.ts

This file was deleted.

37 changes: 17 additions & 20 deletions src/couchSetup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
} from 'edge-server-tools'
import { ServerScope } from 'nano'

import { tasksListening, tasksPublishing } from './database/views/couch-tasks'
import { serverConfig } from './serverConfig'

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -48,21 +49,18 @@ export const settingsSetup: DatabaseSetup = {

const apiKeysSetup: DatabaseSetup = { name: 'db_api_keys' }

const thresholdsSetup: DatabaseSetup = { name: 'db_currency_thresholds' }

const devicesSetup: DatabaseSetup = { name: 'db_devices' }

const usersSetup: DatabaseSetup = {
name: 'db_user_settings'
// documents: {
// '_design/filter': makeJsDesign('by-currency', ?),
// '_design/map': makeJsDesign('currency-codes', ?)
// }
}

const defaultsSetup: DatabaseSetup = {
name: 'defaults'
// syncedDocuments: ['thresholds']
const tasksSetup: DatabaseSetup = {
name: 'db_tasks',
// Turn on partition by userId for performance and security reasons.
// https://docs.couchdb.org/en/3.2.2/partitioned-dbs/index.html
options: {
partitioned: true
},
// Set up the views
documents: {
'_design/tasks_listening': tasksListening,
'_design/tasks_publishing': tasksPublishing
}
}

// ---------------------------------------------------------------------------
Expand All @@ -79,13 +77,12 @@ export async function setupDatabases(
replicatorSetup: syncedReplicators,
disableWatching
}

// @ts-expect-error
await setupDatabase(connection, settingsSetup, options)
await Promise.all([
// @ts-expect-error
setupDatabase(connection, apiKeysSetup, options),
setupDatabase(connection, thresholdsSetup, options),
setupDatabase(connection, devicesSetup, options),
setupDatabase(connection, usersSetup, options),
setupDatabase(connection, defaultsSetup, options)
// @ts-expect-error
setupDatabase(connection, tasksSetup, options)
])
}
81 changes: 81 additions & 0 deletions src/database/views/couch-tasks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Configures couchDB views that are used to model message queues.
* Associated helper functions are also provided.
*
* Publishers listen to these views to perform actions on update. One
* way of doing this is to use the {@link viewToStream} function from
* `edge-server-tools`.
*
* A key advantage of using views is that documents are programmatically
* indexed and serverd to views based on certain conditions, thereby
* elimitating the need to build seqarate listeners that subscribe to db
* documents and perform actions on update.
*
* Views can be named as a string, just like a normal database. They can
* be called by using `db.view(name, params)` method. The response will
* be of type `nano.DocumentViewResponse<T>` where `T` is the shape of the
* documents defined elsewhere. This type has a `rows` property that is
* consistent with many other getter methods in nano.
*/

// Certain import lines have lintings disabled because they are
// referenced only by documentation comments.
import {
JsDesignDocument,
makeJsDesign,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
viewToStream
} from 'edge-server-tools'

// eslint-disable-next-line @typescript-eslint/no-unused-vars
import { ActionEffect } from '../../types/task/ActionEffect'
import { Task } from '../../types/task/Task'
import { dbTasks, logger, packChange, TaskDoc } from '../../utils/dbUtils'

/**
* A view that indexes to all tasks that contain at least one incomplete
* {@link ActionEffect}.
*
* @remarks
* This view is not intended to be subscribed by any publishers. Think
* of this as a staging area for ongoing tasks.
*/
export const tasksListening: JsDesignDocument = makeJsDesign(
'tasks_listening',
() => ({
filter: function (taskDoc: TaskDoc) {
return taskDoc.doc.actionEffects.some(e => e.completed === false)
}
})
)

/**
* A view that indexes to all tasks with all {@link ActionEffect}
* completed.
*/
export const tasksPublishing: JsDesignDocument = makeJsDesign(
'tasks_publishing',
() => ({
filter: function (taskDoc: TaskDoc) {
return taskDoc.doc.actionEffects.every(Boolean)
}
})
)

/**
* Updates the a task document in the `db_tasks` database. The function
* receives a {@link Task} object and updates the relavent document based on
* the content of this task.
* @param {Task} updatedTask - The task that has its `action.inProgress`
* flag updated.
*/
export const updateInProgress = async (
updatedTask: Task,
id: string
): Promise<void> => {
try {
await dbTasks.insert(packChange(updatedTask, id))
} catch (e) {
logger(`Failed to make ${updatedTask.taskId}'s action as inprogress: `, e)
}
}
33 changes: 33 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import nano from 'nano'
import { makeExpressRoute } from 'serverlet/express'

import { pushNotificationRouterV2 } from './api/router'
import { setupDatabases } from './couchSetup'
import { createServer } from './server'
import { serverConfig } from './serverConfig'

async function main(): Promise<void> {
// Set up databases:
const connection = nano(serverConfig.couchUri)
await setupDatabases(connection)

// Create server
const server = createServer(
makeExpressRoute(pushNotificationRouterV2),
serverConfig
)

// Start Server
server.listen(server.get('httpPort'), server.get('httpHost'), () => {
console.log(
`Express server listening on port ${JSON.stringify(
server.get('httpPort')
)}`
)
})
}

main().catch(error => {
console.error(error)
process.exit(1)
})
2 changes: 2 additions & 0 deletions src/models/CurrencyThreshold.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ export class CurrencyThreshold extends Base implements ICurrencyThreshold {
price: number
): Promise<CurrencyThreshold> {
const threshold = this.thresholds[hours] ?? {
custom: undefined,
lastUpdated: 0,
price: 0
}
threshold.lastUpdated = timestamp
threshold.price = price
this.thresholds[hours] = threshold

return (await this.save()) as CurrencyThreshold
}
}
Loading