|
| 1 | +import { |
| 2 | + addPartialChecksums, |
| 3 | + bson, |
| 4 | + BucketChecksum, |
| 5 | + ChecksumCache, |
| 6 | + ChecksumMap, |
| 7 | + FetchPartialBucketChecksum, |
| 8 | + InternalOpId, |
| 9 | + isPartialChecksum, |
| 10 | + PartialChecksum, |
| 11 | + PartialChecksumMap, |
| 12 | + PartialOrFullChecksum |
| 13 | +} from '@powersync/service-core'; |
| 14 | +import * as lib_mongo from '@powersync/lib-service-mongodb'; |
| 15 | +import { logger } from '@powersync/lib-services-framework'; |
| 16 | +import { PowerSyncMongo } from './db.js'; |
| 17 | + |
| 18 | +/** |
| 19 | + * Checksum query implementation. |
| 20 | + */ |
| 21 | +export class MongoChecksums { |
| 22 | + private cache = new ChecksumCache({ |
| 23 | + fetchChecksums: (batch) => { |
| 24 | + return this.getChecksumsInternal(batch); |
| 25 | + } |
| 26 | + }); |
| 27 | + |
| 28 | + constructor( |
| 29 | + private db: PowerSyncMongo, |
| 30 | + private group_id: number |
| 31 | + ) {} |
| 32 | + |
| 33 | + /** |
| 34 | + * Calculate checksums, utilizing the cache. |
| 35 | + */ |
| 36 | + async getChecksums(checkpoint: InternalOpId, buckets: string[]): Promise<ChecksumMap> { |
| 37 | + return this.cache.getChecksumMap(checkpoint, buckets); |
| 38 | + } |
| 39 | + |
| 40 | + clearCache() { |
| 41 | + this.cache.clear(); |
| 42 | + } |
| 43 | + |
| 44 | + /** |
| 45 | + * Calculate (partial) checksums from bucket_state and the data collection. |
| 46 | + * |
| 47 | + * Results are not cached. |
| 48 | + */ |
| 49 | + private async getChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise<PartialChecksumMap> { |
| 50 | + if (batch.length == 0) { |
| 51 | + return new Map(); |
| 52 | + } |
| 53 | + |
| 54 | + const preFilters: any[] = []; |
| 55 | + for (let request of batch) { |
| 56 | + if (request.start == null) { |
| 57 | + preFilters.push({ |
| 58 | + _id: { |
| 59 | + g: this.group_id, |
| 60 | + b: request.bucket |
| 61 | + }, |
| 62 | + 'compacted_state.op_id': { $exists: true, $lte: request.end } |
| 63 | + }); |
| 64 | + } |
| 65 | + } |
| 66 | + |
| 67 | + const preStates = new Map<string, { opId: InternalOpId; checksum: BucketChecksum }>(); |
| 68 | + |
| 69 | + if (preFilters.length > 0) { |
| 70 | + // For un-cached bucket checksums, attempt to use the compacted state first. |
| 71 | + const states = await this.db.bucket_state |
| 72 | + .find({ |
| 73 | + $or: preFilters |
| 74 | + }) |
| 75 | + .toArray(); |
| 76 | + for (let state of states) { |
| 77 | + const compactedState = state.compacted_state!; |
| 78 | + preStates.set(state._id.b, { |
| 79 | + opId: compactedState.op_id, |
| 80 | + checksum: { |
| 81 | + bucket: state._id.b, |
| 82 | + checksum: Number(compactedState.checksum), |
| 83 | + count: compactedState.count |
| 84 | + } |
| 85 | + }); |
| 86 | + } |
| 87 | + } |
| 88 | + |
| 89 | + const mappedRequests = batch.map((request) => { |
| 90 | + let start = request.start; |
| 91 | + if (start == null) { |
| 92 | + const preState = preStates.get(request.bucket); |
| 93 | + if (preState != null) { |
| 94 | + start = preState.opId; |
| 95 | + } |
| 96 | + } |
| 97 | + return { |
| 98 | + ...request, |
| 99 | + start |
| 100 | + }; |
| 101 | + }); |
| 102 | + |
| 103 | + const queriedChecksums = await this.queryPartialChecksums(mappedRequests); |
| 104 | + |
| 105 | + return new Map<string, PartialOrFullChecksum>( |
| 106 | + batch.map((request) => { |
| 107 | + const bucket = request.bucket; |
| 108 | + // Could be null if this is either (1) a partial request, or (2) no compacted checksum was available |
| 109 | + const preState = preStates.get(bucket); |
| 110 | + // Could be null if we got no data |
| 111 | + const partialChecksum = queriedChecksums.get(bucket); |
| 112 | + const merged = addPartialChecksums(bucket, preState?.checksum ?? null, partialChecksum ?? null); |
| 113 | + |
| 114 | + return [bucket, merged]; |
| 115 | + }) |
| 116 | + ); |
| 117 | + } |
| 118 | + |
| 119 | + /** |
| 120 | + * Calculate (partial) checksums from the data collection directly. |
| 121 | + */ |
| 122 | + async queryPartialChecksums(batch: FetchPartialBucketChecksum[]): Promise<PartialChecksumMap> { |
| 123 | + try { |
| 124 | + return await this.queryPartialChecksumsInternal(batch); |
| 125 | + } catch (e) { |
| 126 | + if (e.codeName == 'MaxTimeMSExpired') { |
| 127 | + logger.warn(`Checksum query timed out; falling back to slower version`, e); |
| 128 | + // Timeout - try the slower but more robust version |
| 129 | + return await this.queryPartialChecksumsFallback(batch); |
| 130 | + } |
| 131 | + throw lib_mongo.mapQueryError(e, 'while reading checksums'); |
| 132 | + } |
| 133 | + } |
| 134 | + |
| 135 | + private async queryPartialChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise<PartialChecksumMap> { |
| 136 | + const filters: any[] = []; |
| 137 | + for (let request of batch) { |
| 138 | + filters.push({ |
| 139 | + _id: { |
| 140 | + $gt: { |
| 141 | + g: this.group_id, |
| 142 | + b: request.bucket, |
| 143 | + o: request.start ?? new bson.MinKey() |
| 144 | + }, |
| 145 | + $lte: { |
| 146 | + g: this.group_id, |
| 147 | + b: request.bucket, |
| 148 | + o: request.end |
| 149 | + } |
| 150 | + } |
| 151 | + }); |
| 152 | + } |
| 153 | + |
| 154 | + const aggregate = await this.db.bucket_data |
| 155 | + .aggregate( |
| 156 | + [ |
| 157 | + { |
| 158 | + $match: { |
| 159 | + $or: filters |
| 160 | + } |
| 161 | + }, |
| 162 | + CHECKSUM_QUERY_GROUP_STAGE |
| 163 | + ], |
| 164 | + { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } |
| 165 | + ) |
| 166 | + // Don't map the error here - we want to keep timeout errors as-is |
| 167 | + .toArray(); |
| 168 | + |
| 169 | + const partialChecksums = new Map<string, PartialOrFullChecksum>( |
| 170 | + aggregate.map((doc) => { |
| 171 | + const bucket = doc._id; |
| 172 | + return [bucket, checksumFromAggregate(doc)]; |
| 173 | + }) |
| 174 | + ); |
| 175 | + |
| 176 | + return new Map<string, PartialOrFullChecksum>( |
| 177 | + batch.map((request) => { |
| 178 | + const bucket = request.bucket; |
| 179 | + // Could be null if we got no data |
| 180 | + let partialChecksum = partialChecksums.get(bucket); |
| 181 | + if (partialChecksum == null) { |
| 182 | + partialChecksum = { |
| 183 | + bucket, |
| 184 | + partialCount: 0, |
| 185 | + partialChecksum: 0 |
| 186 | + }; |
| 187 | + } |
| 188 | + if (request.start == null && isPartialChecksum(partialChecksum)) { |
| 189 | + partialChecksum = { |
| 190 | + bucket, |
| 191 | + count: partialChecksum.partialCount, |
| 192 | + checksum: partialChecksum.partialChecksum |
| 193 | + }; |
| 194 | + } |
| 195 | + |
| 196 | + return [bucket, partialChecksum]; |
| 197 | + }) |
| 198 | + ); |
| 199 | + } |
| 200 | + |
| 201 | + /** |
| 202 | + * Checksums for large buckets can run over the query timeout. |
| 203 | + * To avoid this, we query in batches. |
| 204 | + * This version can handle larger amounts of data, but is slower, especially for many buckets. |
| 205 | + */ |
| 206 | + async queryPartialChecksumsFallback(batch: FetchPartialBucketChecksum[]): Promise<PartialChecksumMap> { |
| 207 | + const partialChecksums = new Map<string, PartialOrFullChecksum>(); |
| 208 | + for (let request of batch) { |
| 209 | + const checksum = await this.slowChecksum(request); |
| 210 | + partialChecksums.set(request.bucket, checksum); |
| 211 | + } |
| 212 | + |
| 213 | + return partialChecksums; |
| 214 | + } |
| 215 | + |
| 216 | + private async slowChecksum(request: FetchPartialBucketChecksum): Promise<PartialOrFullChecksum> { |
| 217 | + const batchLimit = 50_000; |
| 218 | + |
| 219 | + let lowerBound = 0n; |
| 220 | + const bucket = request.bucket; |
| 221 | + |
| 222 | + let runningChecksum: PartialOrFullChecksum = { |
| 223 | + bucket, |
| 224 | + partialCount: 0, |
| 225 | + partialChecksum: 0 |
| 226 | + }; |
| 227 | + if (request.start == null) { |
| 228 | + runningChecksum = { |
| 229 | + bucket, |
| 230 | + count: 0, |
| 231 | + checksum: 0 |
| 232 | + }; |
| 233 | + } |
| 234 | + |
| 235 | + while (true) { |
| 236 | + const filter = { |
| 237 | + _id: { |
| 238 | + $gt: { |
| 239 | + g: this.group_id, |
| 240 | + b: bucket, |
| 241 | + o: lowerBound |
| 242 | + }, |
| 243 | + $lte: { |
| 244 | + g: this.group_id, |
| 245 | + b: bucket, |
| 246 | + o: request.end |
| 247 | + } |
| 248 | + } |
| 249 | + }; |
| 250 | + const docs = await this.db.bucket_data |
| 251 | + .aggregate( |
| 252 | + [ |
| 253 | + { |
| 254 | + $match: filter |
| 255 | + }, |
| 256 | + // sort and limit _before_ grouping |
| 257 | + { $sort: { _id: 1 } }, |
| 258 | + { $limit: batchLimit }, |
| 259 | + CHECKSUM_QUERY_GROUP_STAGE |
| 260 | + ], |
| 261 | + { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } |
| 262 | + ) |
| 263 | + .toArray(); |
| 264 | + const doc = docs[0]; |
| 265 | + if (doc == null) { |
| 266 | + return runningChecksum; |
| 267 | + } |
| 268 | + const partial = checksumFromAggregate(doc); |
| 269 | + runningChecksum = addPartialChecksums(bucket, runningChecksum, partial); |
| 270 | + const isFinal = doc.count != batchLimit; |
| 271 | + if (isFinal) { |
| 272 | + break; |
| 273 | + } else { |
| 274 | + lowerBound = doc.last_op; |
| 275 | + } |
| 276 | + } |
| 277 | + return runningChecksum; |
| 278 | + } |
| 279 | +} |
| 280 | + |
| 281 | +const CHECKSUM_QUERY_GROUP_STAGE = { |
| 282 | + $group: { |
| 283 | + _id: '$_id.b', |
| 284 | + // Historically, checksum may be stored as 'int' or 'double'. |
| 285 | + // More recently, this should be a 'long'. |
| 286 | + // $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations. |
| 287 | + checksum_total: { $sum: { $toLong: '$checksum' } }, |
| 288 | + count: { $sum: 1 }, |
| 289 | + has_clear_op: { |
| 290 | + $max: { |
| 291 | + $cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0] |
| 292 | + } |
| 293 | + }, |
| 294 | + last_op: { $max: '$_id.o' } |
| 295 | + } |
| 296 | +}; |
| 297 | + |
| 298 | +/** |
| 299 | + * Convert output of CHECKSUM_QUERY_GROUP_STAGE into a checksum. |
| 300 | + */ |
| 301 | +function checksumFromAggregate(doc: bson.Document): PartialOrFullChecksum { |
| 302 | + const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff; |
| 303 | + const bucket = doc._id; |
| 304 | + |
| 305 | + if (doc.has_clear_op == 1) { |
| 306 | + return { |
| 307 | + // full checksum - replaces any previous one |
| 308 | + bucket, |
| 309 | + checksum: partialChecksum, |
| 310 | + count: doc.count |
| 311 | + } satisfies BucketChecksum; |
| 312 | + } else { |
| 313 | + return { |
| 314 | + // partial checksum - is added to a previous one |
| 315 | + bucket, |
| 316 | + partialCount: doc.count, |
| 317 | + partialChecksum |
| 318 | + } satisfies PartialChecksum; |
| 319 | + } |
| 320 | +} |
0 commit comments