Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cas scaling #1189

Draft
wants to merge 13 commits into
base: develop
Choose a base branch
from
2 changes: 2 additions & 0 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
"type": "sqs",
"awsRegion": "us-east-1",
"sqsQueueUrl": "",
"s3BucketName": "myS3Bucket",
"s3Endpoint": "",
"maxTimeToHoldMessageSec": 21600,
"waitTimeForMessageSec": 0
}
Expand Down
2 changes: 2 additions & 0 deletions config/env/dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
"type": "sqs",
"awsRegion": "@@AWS_REGION",
"sqsQueueUrl": "@@SQS_QUEUE_URL",
"s3BucketName": "@@S3_BUCKET_NAME",
"s3Endpoint": "@@S3_ENDPOINT",
"maxTimeToHoldMessageSec": "@@MAX_TIME_TO_HOLD_MESSAGE_SEC",
"waitTimeForMessageSec": "@@WAIT_TIME_FOR_MESSAGE_SEC"
}
Expand Down
2 changes: 2 additions & 0 deletions config/env/prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
"type": "sqs",
"awsRegion": "@@AWS_REGION",
"sqsQueueUrl": "@@SQS_QUEUE_URL",
"s3BucketName": "@@S3_BUCKET_NAME",
"s3Endpoint": "@@S3_ENDPOINT",
"maxTimeToHoldMessageSec": "@@MAX_TIME_TO_HOLD_MESSAGE_SEC",
"waitTimeForMessageSec": "@@WAIT_TIME_FOR_MESSAGE_SEC"
}
Expand Down
1 change: 1 addition & 0 deletions config/env/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"type": "sqs",
"awsRegion": "us-east-1",
"sqsQueueUrl": "",
"s3BucketName": "ceramic-tnet-cas",
"maxTimeToHoldMessageSec": 10800,
"waitTimeForMessageSec": 10
}
Expand Down
11 changes: 6 additions & 5 deletions src/repositories/anchor-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { parseCountResult } from './parse-count-result.util.js'
import { decode } from 'codeco'

const TABLE_NAME = 'anchor'
const chunkSize = 10000

export class AnchorRepository implements IAnchorRepository {
static inject = ['dbConnection'] as const
Expand All @@ -32,11 +33,10 @@ export class AnchorRepository implements IAnchorRepository {
* @returns A promise that resolve to the number of anchors created
*/
async createAnchors(anchors: Array<FreshAnchor>): Promise<number> {
const result: any = await this.table
.insert(anchors.map((anchor) => FreshAnchor.encode(anchor)))
.onConflict('requestId')
.ignore()
return parseCountResult(result.rowCount)
const result = await this.connection.batchInsert(TABLE_NAME, anchors.map((anchor) => FreshAnchor.encode(anchor)), chunkSize).catch(function (error) {
console.error(error)
}) ?? []
return parseCountResult(result.length)
}

/**
Expand All @@ -55,6 +55,7 @@ export class AnchorRepository implements IAnchorRepository {
async findByRequestId(requestId: string): Promise<StoredAnchor | null> {
const row = await this.table.where({ requestId: requestId }).first()
if (!row) return null
console.log("IM HERE with ", row, requestId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take this log out, @JulissaDantes? Might flood the logs at high throughput.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are completely right. Thank you for pointing this out!

return decode(StoredAnchor, row)
}
}
11 changes: 9 additions & 2 deletions src/repositories/request-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export const FAILURE_RETRY_INTERVAL = 1000 * 60 * 60 * 6 // 6H
const REPEATED_READ_SERIALIZATION_ERROR = '40001'

const TABLE_NAME = 'request'
const chunkSize = 10000

/**
* Records statistics about the set of requests
Expand Down Expand Up @@ -169,15 +170,21 @@ export class RequestRepository {
*/
async updateRequests(fields: RequestUpdateFields, requests: Request[]): Promise<number> {
const updatedAt = new Date()
const ids = requests.map((r) => r.id)
const result = await this.table
let result = 0

for (let i = 0; i < requests.length; i += chunkSize) {
const chunk = requests.slice(i, i + chunkSize)
const ids = chunk.map((r) => r.id)

result += await this.table
.update({
message: fields.message,
status: fields.status,
pinned: fields.pinned,
updatedAt: date.encode(updatedAt),
})
.whereIn('id', ids)
}

requests.map((request) => {
logEvent.db({
Expand Down
25 changes: 15 additions & 10 deletions src/services/anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,17 +314,20 @@ export class AnchorService {
logger.debug('Creating IPFS anchor proof')
const ipfsProofCid = this._createIPFSProof(merkleTree.car, tx, merkleTree.root.data.cid)

// Create anchor records on IPFS
// Create anchor records
logger.debug('Creating anchor commits')
const anchors = await this._createAnchorCommits(ipfsProofCid, merkleTree)

try {
await this.ipfsService.importCAR(merkleTree.car)
} catch (e) {
Metrics.count(METRIC_NAMES.MERKLE_CAR_STORAGE_FAILURE_IPFS, 1)
const message = `Can not store Merkle CAR to IPFS. Batch failed: ${e}`
logger.err(message)
throw e
// Do not store CAR file in IPFS by default
if (process.env['CAS_USE_IPFS_STORAGE']) {
try {
await this.ipfsService.importCAR(merkleTree.car)
} catch (e) {
Metrics.count(METRIC_NAMES.MERKLE_CAR_STORAGE_FAILURE_IPFS, 1)
const message = `Can not store Merkle CAR to IPFS. Batch failed: ${e}`
logger.err(message)
throw e
}
}

try {
Expand Down Expand Up @@ -502,8 +505,10 @@ export class AnchorService {
}

try {
await this.ipfsService.storeRecord(ipfsAnchorCommit)

// Do not store in IPFS by default
if (process.env['CAS_USE_IPFS_STORAGE']) {
await this.ipfsService.storeRecord(ipfsAnchorCommit)
}
// Do not publish to pubsub by default
if (process.env['CAS_PUBSUB_PUBLISH']) {
// TODO: Remove this case entirely after js-ceramic no longer supports pubsub
Expand Down
68 changes: 67 additions & 1 deletion src/services/queue/sqs-queue-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import {
ChangeMessageVisibilityCommand,
SendMessageCommand,
} from '@aws-sdk/client-sqs'
import AWSSDK from 'aws-sdk'
import LevelUp from 'levelup'
import S3LevelDOWN from 's3leveldown'
import { IpfsPubSubPublishQMessage, QueueMessageData } from '../../models/queue-message.js'
import {
IQueueConsumerService,
Expand All @@ -19,6 +22,7 @@ import { AbortOptions } from '@ceramicnetwork/common'

const DEFAULT_MAX_TIME_TO_HOLD_MESSAGES_S = 21600
const DEFAULT_WAIT_TIME_FOR_MESSAGE_S = 10
const BATCH_STORE_PATH = '/cas/anchor/batch'
/**
* Sqs Queue Message received by consumers.
* Once the message is done processing you can either "ack" the message (remove the message from the queue) or "nack" the message (put the message back on the queue)
Expand Down Expand Up @@ -60,6 +64,28 @@ export class SqsQueueMessage<TValue extends QueueMessageData> implements IQueueM
}
}

// This wrapper around SqsQueueMessage is used to handle the case where the list of batch request IDs is empty and must
// be fetched from S3. The underlying SqsQueueMessage remains the same (and is what is used for n/acking the message),
// but the data is updated to include the batch request IDs.
export class BatchQueueMessage implements IQueueMessage<AnchorBatchQMessage> {
readonly data: AnchorBatchQMessage

constructor(
private readonly anchorBatchMessage: IQueueMessage<AnchorBatchQMessage>,
batchJson: any
) {
this.data = decode(AnchorBatchQMessage, batchJson)
}

async ack(): Promise<void> {
await this.anchorBatchMessage.ack()
}

async nack(): Promise<void> {
await this.anchorBatchMessage.nack()
}
}

/**
* Consumer and Producer for Sqs Queues
*/
Expand Down Expand Up @@ -149,10 +175,50 @@ export class ValidationSqsQueueService extends SqsQueueService<RequestQMessage>
* AnchorBatchSqsQueueService is used to consume and publish anchor batch messages. These batches are anchored by anchor workers
*/
export class AnchorBatchSqsQueueService extends SqsQueueService<AnchorBatchQMessage> {
constructor(config: Config) {
constructor(
config: Config,
private s3StorePath = config.queue.s3BucketName + BATCH_STORE_PATH,
private s3Endpoint = config.queue.s3Endpoint ? config.queue.s3Endpoint : undefined,
private _s3store?: LevelUp.LevelUp
) {
const queueUrl = config.queue.sqsQueueUrl + 'batch'
super(config, queueUrl, AnchorBatchQMessage)
}

/**
* `new LevelUp` attempts to open a database, which leads to a request to AWS.
* Let's make initialization lazy.
*/
get s3store(): LevelUp.LevelUp {
if (!this._s3store) {
const levelDown = this.s3Endpoint
? new S3LevelDOWN(
this.s3StorePath,
new AWSSDK.S3({
endpoint: this.s3Endpoint,
s3ForcePathStyle: true,
})
)
: new S3LevelDOWN(this.s3StorePath)

this._s3store = new LevelUp(levelDown)
}
return this._s3store
}

override async receiveMessage(abortOptions?: AbortOptions): Promise<IQueueMessage<AnchorBatchQMessage> | undefined> {
const anchorBatchMessage: IQueueMessage<AnchorBatchQMessage> | undefined = await super.receiveMessage(abortOptions)
// If the list of batch request IDs is empty, we need to fetch the full batch from S3.
if (anchorBatchMessage && anchorBatchMessage.data.rids.length === 0) {
try {
const batchJson = await this.s3store.get(anchorBatchMessage.data.bid)
return new BatchQueueMessage(anchorBatchMessage, JSON.parse(batchJson))
} catch (err: any) {
throw Error(`Error retrieving batch ${anchorBatchMessage.data.bid} from S3: ${err.message}`)
}
}
return anchorBatchMessage
}
}

/**
Expand Down
Loading