-
Notifications
You must be signed in to change notification settings - Fork 7
feat/incremental document sync #674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
| @OnQueueWaiting() | ||
| async OnQueueWaiting(job: Job) { | ||
| this.logger.debug(`Waiting ${job.name} document ${job.data}`); | ||
| async OnQueueWaiting(jobId: number) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Argument to this hook differs from others https://docs.nestjs.com/techniques/queues#event-listeners-1
4f62bd5 to
c18173f
Compare
c18173f to
581f885
Compare
jrhender
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good start to the PR but I think the issue with the jobs disappearing should be addressed
src/modules/did/did.service.ts
Outdated
| ).filter((doc) => { | ||
| const identity = doc.id.split(':')[3]; | ||
| return changedIdentities.includes(identity); | ||
| }); | ||
| didsToSynchronize.forEach(async (did) => { | ||
| this.logger.debug(`Synchronizing DID ${did.id}`); | ||
| await this.pinDocument(did.id); | ||
| }); | ||
|
|
||
| await this.latestDidSyncRepository.save({ block: topBlock }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @JGiter, thanks for the PR. Really exciting that this code is being improved.
I think this approach is flawed however. The issue that I see is that jobs are persisted in separate storage (Redis) and then processed asynchronously, but the latestDidSync topBlock is persisted synchronously. So we could have a situation like this:
- A bunch of DID update jobs are queued and
lastestDidSync.blockis updated - The Redis storage fails or is wiped (it's in-memory, so definitely could happen) -> then the system won't update the DIDs in the lost jobs because
lastestDidSync.blockalready ahead of these events.
I'm thinking an alternative approach to get around this problem would be to:
- Mark updated DIDs as "stale":
- Query all of the "changedIdentities" since the last check (as you've done in this PR)
- Mark all of these DIDs as "invalid" (in the Postgres Entities). I think something like this could be used to bulk update entities.
- Update
latestDidSync.block - Query all of DID entities with invalid status and add a job to the queue (if it doesn't exist already)
- When done processing the job, update the DID to be "valid"
In this above, even if the job queue is reset/wiped, the DID entities will still have invalid status, so job for them can be re-added.
Also, we can change the GET Did Doc endpoint to synchronously query the RPC if the cached DID is invalid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. This is really good improvement
src/modules/did/did.service.ts
Outdated
| private async pinDocument(did: string): Promise<void> { | ||
| try { | ||
| await this.didQueue.add(UPDATE_DID_DOC_JOB_NAME, did); | ||
| await this.didQueue.add(UPDATE_DID_DOC_JOB_NAME, { did }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JGiter does this change relate to this comment you made in the PR description?
Fixed using of DID synchronization job
Does the data need to be in an object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can not say why, but when passing job as a string, I observed multiple errors error parsing JSON though there are no seemingly JSON parsing in DidProcessor. So I decided to make job as an object, same as in PinProcessor.
| @PrimaryGeneratedColumn() | ||
| id: number; | ||
|
|
||
| @OneToOne(() => DIDDocumentEntity, (document) => document.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addinig relation only on DidSyncStatusEntity side. The DidDocumentEntity will not be changed
jrhender
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some further comments but I think it is continuing to move in a good direction 👍
| @OneToOne(() => DIDDocumentEntity, (document) => document.id) | ||
| @JoinColumn() | ||
| document: string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems wrong to me that this property is string type. I think that typically in a one-to-one join column, the join property has the same type as the joined entity.
See the Profile example at the start of the TypeORM documentation: https://orkhan.gitbook.io/typeorm/docs/one-to-one-relations
| @OneToOne(() => DIDDocumentEntity, (document) => document.id) | |
| @JoinColumn() | |
| document: string; | |
| @OneToOne(() => DIDDocumentEntity, (document) => document.id) | |
| @JoinColumn() | |
| document: DIDDocumentEntity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I overlooked that
src/modules/did/did.service.ts
Outdated
| const updated = await this.didRepository.save(updatedEntity); | ||
| await this.didSyncStatusRepository.save({ | ||
| document: did, | ||
| status: DidSyncStatus.Synced, | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be in a transaction as I think an invariant should be that there is always a didSyncStatus entity if there is a did entity.
src/modules/did/did.service.ts
Outdated
| const staleDIDs = ( | ||
| await this.didRepository.find({ select: ['id'] }) | ||
| ).filter((doc) => { | ||
| const identity = addressOf(doc.id); | ||
| return changedIdentities.includes(identity); | ||
| }); | ||
| await this.didSyncStatusRepository | ||
| .createQueryBuilder() | ||
| .useTransaction(true) | ||
| .update(DidSyncStatusEntity) | ||
| .set({ status: DidSyncStatus.Stale }) | ||
| .where({ document: In(staleDIDs) }) | ||
| .execute(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I seems to me like a better approach would be to rely on the database to do the filtering in the where clause, rather than reading all dids into application memory and then filtering in the application. I think that, if the number of DIDs became large enough, this this.didRepository.find({ select: ['id'] query would probably fail.
I suppose the challenge might be how to use the addresses returned from changedIdentities to find the didSyncStatus entities to update. I think a couple options are:
- Store the address of the identity on the
DidSyncStatusEntity - Join to
DIDDocumentEntityin the query and filter on theidproperty. Of course the addresses would need to be transformed todid:ethrDID ids.
Note that I think that option 2 might be slightly less complicated if we got rid of the one-to-one join entity and just added a new column to the DidDocumentEntity.
I think we only really need a single isStale boolean column (or could also keep the enum), so the impact on the performance of the database query when reading the DID Document would be negligible, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for suggestions. Option 2 seems more safe to me, because it excludes case when two status entities connected to the same document. I would also like to avoid changing document entity. It is used everywhere in ssi-hub and I am bit afraid to cause some unexpected changes. And besides property isState is not inherently part of the document.
src/modules/did/did.service.ts
Outdated
| await this.didSyncStatusRepository | ||
| .createQueryBuilder() | ||
| .useTransaction(true) | ||
| .update(DidSyncStatusEntity) | ||
| .set({ status: DidSyncStatus.Stale }) | ||
| .where({ document: In(staleDIDs) }) | ||
| .execute(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this query work for existing cached documents (already existing in the cache I mean)? Maybe I missed it, but I don't see a migration that adds a DidSyncStatusEntity for all existing cached DID Documents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I missed that
487f07f to
c6e1b08
Compare
Co-authored-by: John Henderson <[email protected]>
With digits redactor, the logs looked like: debug [DIDService] : 2024-11-21T22:32:25.999Z - Fetched 0 DID events from interval [DIGITS, DIGITS]
Co-authored-by: John Henderson <[email protected]>
Co-authored-by: John Henderson <[email protected]>
5f9b0cd to
4177c78
Compare
Tested manually, because tests scenarios require application restart from desired state, which is not supported in Nest.js.
Test scenarios:
2 Restart after clean start. Events are fetched from last sync block