From cd4d041ef2c078351c74168b676a761abd25e9c4 Mon Sep 17 00:00:00 2001 From: lihsai0 Date: Thu, 7 Sep 2023 18:47:46 +0800 Subject: [PATCH] fix resume upload skip the first region obtained from query --- qiniu/storage/internal.js | 44 ++++++++++++++- qiniu/storage/resume.js | 112 ++++++++++++++++++++++++-------------- test/resume_up.test.js | 2 +- 3 files changed, 113 insertions(+), 45 deletions(-) diff --git a/qiniu/storage/internal.js b/qiniu/storage/internal.js index 6d3662d..dbc9820 100644 --- a/qiniu/storage/internal.js +++ b/qiniu/storage/internal.js @@ -369,10 +369,12 @@ ChangeRegionRetryPolicy.prototype.prepareRetry = function (context, ret) { * @param {string} [options.resumeRecordFilePath] * @param {RegionsProvider} options.regionsProvider * @param {'v1' | 'v2' | string} options.uploadApiVersion + * @param {EndpointsProvider} [options.preferredEndpointsProvider] */ function UploadState (options) { this.retryPolicies = options.retryPolicies || []; this.regionsProvider = options.regionsProvider; + this.preferredEndpointsProvider = options.preferredEndpointsProvider; this.context = { serviceName: SERVICE_NAME.UP, uploadApiVersion: options.uploadApiVersion, @@ -411,9 +413,42 @@ UploadState.prototype.init = function () { return loopRegions(); }); }; - return this.regionsProvider.getRegions() + let preferredEndpoints; + return Promise.resolve() + .then(() => { + if (this.preferredEndpointsProvider) { + return this.preferredEndpointsProvider.getEndpoints(); + } + return []; + }) + .then(endpoints => { + preferredEndpoints = endpoints; + return this.regionsProvider.getRegions(); + }) .then(regions => { - [this.context.region, ...this.context.alternativeRegions] = regions; + regions = regions.slice(); + // find preferred region by preferred endpoints + let preferredRegionIndex = -1; + if (preferredEndpoints.length) { + preferredRegionIndex = regions.findIndex(r => + r.services[this.context.serviceName].some(e => + preferredEndpoints.map(pe => pe.host).includes(e.host) + ) + ); + } + // preferred endpoints is not a region, then make all regions alternative + if (preferredEndpoints.length && preferredRegionIndex < 0) { + [this.context.endpoint, ...this.context.alternativeEndpoints] = preferredEndpoints; + this.context.alternativeRegions = regions; + return Promise.resolve(); + } + // preferred endpoints is a region, then reorder the regions + if (preferredEndpoints.length && preferredRegionIndex > 0) { + [this.context.region] = regions.splice(preferredRegionIndex, 1); + this.context.alternativeRegions = regions; + } else { + [this.context.region, ...this.context.alternativeRegions] = regions; + } // check region available if (!this.context.region) { return Promise.reject(new Error('There isn\'t available region')); @@ -477,6 +512,7 @@ UploadState.prototype.prepareRetry = function (ret) { * @param {RetryPolicy[]} [options.retryPolicies] * @param {'v1' | 'v2' | string} [options.uploadApiVersion] * @param {string} [options.resumeRecordFilePath] + * @param {EndpointsProvider} [options.preferredEndpointsProvider] * @returns {Promise} */ function doWorkWithRetry (options) { @@ -488,12 +524,14 @@ function doWorkWithRetry (options) { const retryPolicies = options.retryPolicies || []; const uploadApiVersion = options.uploadApiVersion; const resumeRecordFilePath = options.resumeRecordFilePath; + const preferredEndpointsProvider = options.preferredEndpointsProvider; const uploadState = new UploadState({ retryPolicies, regionsProvider, uploadApiVersion, - resumeRecordFilePath + resumeRecordFilePath, + preferredEndpointsProvider }); // the workFn helper used for recursive calling to retry diff --git a/qiniu/storage/resume.js b/qiniu/storage/resume.js index 43ab9db..0f1d032 100644 --- a/qiniu/storage/resume.js +++ b/qiniu/storage/resume.js @@ -17,6 +17,8 @@ const { ChangeEndpointRetryPolicy, ChangeRegionRetryPolicy } = require('./internal'); +const { StaticEndpointsProvider } = require('../httpc/endpointsProvider'); +const { Endpoint } = require('../httpc/endpoint'); exports.ResumeUploader = ResumeUploader; exports.PutExtra = PutExtra; @@ -130,23 +132,31 @@ ResumeUploader.prototype.putStream = function ( accessKey: util.getAKFromUptoken(uploadToken) }) .then(regionsProvider => { + const resumeInfo = getResumeRecordInfo(putExtra.resumeRecordFile); + let preferredEndpointsProvider; + if (resumeInfo && Array.isArray(resumeInfo.upDomains)) { + preferredEndpointsProvider = new StaticEndpointsProvider( + resumeInfo.upDomains.map(d => new Endpoint(d, { defaultScheme: preferScheme })) + ); + } return doWorkWithRetry({ workFn: sendPutReq, callbackFunc, regionsProvider, + // use resume upDomain firstly + preferredEndpointsProvider: preferredEndpointsProvider, // stream not support retry retryPolicies: [] }); }); function sendPutReq (endpoint) { - const endpointValue = endpoint.getValue({ - scheme: preferScheme - }); + endpoint = Object.create(endpoint); + endpoint.defaultScheme = preferScheme; return new Promise(resolve => { putReq( - endpointValue, + endpoint, uploadToken, key, rsStream, @@ -158,7 +168,26 @@ ResumeUploader.prototype.putStream = function ( }; /** - * @param {string} upDomain + * @param {string} resumeRecordFilePath + * @returns {undefined | Object.} + */ +function getResumeRecordInfo (resumeRecordFilePath) { + // get resume record info + let result; + // read resumeRecordFile + if (resumeRecordFilePath) { + try { + const resumeRecords = fs.readFileSync(resumeRecordFilePath).toString(); + result = JSON.parse(resumeRecords); + } catch (e) { + e.code !== 'ENOENT' && console.error(e); + } + } + return result; +} + +/** + * @param {Endpoint} upEndpoint * @param {string} uploadToken * @param {string | null} key * @param {ReadableStream} rsStream @@ -167,7 +196,7 @@ ResumeUploader.prototype.putStream = function ( * @param {reqCallback} callbackFunc */ function putReq ( - upDomain, + upEndpoint, uploadToken, key, rsStream, @@ -182,17 +211,8 @@ function putReq ( })); // get resume record info - let blkputRets = null; + const blkputRets = getResumeRecordInfo(putExtra.resumeRecordFile); const totalBlockNum = Math.ceil(rsStreamLen / putExtra.partSize); - // read resumeRecordFile - if (putExtra.resumeRecordFile) { - try { - const resumeRecords = fs.readFileSync(putExtra.resumeRecordFile).toString(); - blkputRets = JSON.parse(resumeRecords); - } catch (e) { - console.error(e); - } - } // select upload version /** @@ -217,7 +237,7 @@ function putReq ( totalBlockNum }, { - upDomain, + upEndpoint, uploadToken, key, putExtra @@ -237,7 +257,7 @@ function putReq ( /** * @typedef SourceOptions - * @property { Object[] | null } blkputRets + * @property { Object. | undefined } blkputRets * @property { ReadableStream } rsStream * @property { BlockStream } blkStream * @property { number } rsStreamLen @@ -247,7 +267,7 @@ function putReq ( /** * @typedef UploadOptions * @property { string | null } key - * @property { string } upDomain + * @property { Endpoint } upEndpoint * @property { string } uploadToken * @property { PutExtra } putExtra */ @@ -267,30 +287,32 @@ function putReqV1 (sourceOptions, uploadOptions, callbackFunc) { } = sourceOptions; let blkputRets = sourceOptions.blkputRets; const { + upEndpoint, key, uploadToken, putExtra } = uploadOptions; - // use resume upDomain firstly - const upDomain = (blkputRets && blkputRets.upDomain) || uploadOptions.upDomain; - // initial state const finishedCtxList = []; const finishedBlkPutRets = { - upDomain: upDomain, + upDomains: [], parts: [] }; - // backward compatibility with ≤ 7.9.0 if (Array.isArray(blkputRets)) { blkputRets = { - upDomain: upDomain, + upDomains: [], parts: [] }; } + if (blkputRets && Array.isArray(blkputRets.upDomains)) { + finishedBlkPutRets.upDomains = blkputRets.upDomains; + } + finishedBlkPutRets.upDomains.push(upEndpoint.host); // upload parts + const upDomains = upEndpoint.getValue(); let readLen = 0; let curBlock = 0; let isSent = false; @@ -319,7 +341,7 @@ function putReqV1 (sourceOptions, uploadOptions, callbackFunc) { if (needUploadBlk) { blkStream.pause(); mkblkReq( - upDomain, + upDomains, uploadToken, chunk, function ( @@ -346,7 +368,7 @@ function putReqV1 (sourceOptions, uploadOptions, callbackFunc) { } blkStream.resume(); if (finishedCtxList.length === totalBlockNum) { - mkfileReq(upDomain, uploadToken, rsStreamLen, finishedCtxList, key, putExtra, callbackFunc); + mkfileReq(upDomains, uploadToken, rsStreamLen, finishedCtxList, key, putExtra, callbackFunc); isSent = true; } } @@ -356,7 +378,7 @@ function putReqV1 (sourceOptions, uploadOptions, callbackFunc) { blkStream.on('end', function () { if (!isSent && rsStreamLen === 0) { - mkfileReq(upDomain, uploadToken, rsStreamLen, finishedCtxList, key, putExtra, callbackFunc); + mkfileReq(upDomains, uploadToken, rsStreamLen, finishedCtxList, key, putExtra, callbackFunc); } destroy(rsStream); }); @@ -377,35 +399,35 @@ function putReqV2 (sourceOptions, uploadOptions, callbackFunc) { rsStream } = sourceOptions; const { + upEndpoint, uploadToken, key, putExtra } = uploadOptions; - let upDomain = uploadOptions.upDomain; - // try resume upload blocks let finishedBlock = 0; const finishedEtags = { - upDomain, + upDomains: [], etags: [], uploadId: '', expiredAt: 0 }; - if (blkputRets !== null && blkputRets.upDomain) { + if (blkputRets && Array.isArray(blkputRets.upDomains)) { // check etag expired or not const expiredAt = blkputRets.expiredAt; const timeNow = Date.now() / 1000; - if (expiredAt > timeNow && blkputRets.uploadId !== '') { - upDomain = blkputRets.upDomain; - finishedEtags.upDomain = blkputRets.upDomain; + if (expiredAt > timeNow && blkputRets.uploadId) { + finishedEtags.upDomains = blkputRets.upDomains; finishedEtags.etags = blkputRets.etags; finishedEtags.uploadId = blkputRets.uploadId; finishedEtags.expiredAt = blkputRets.expiredAt; finishedBlock = finishedEtags.etags.length; } } + finishedEtags.upDomains.push(upEndpoint.host); + const upDomain = upEndpoint.getValue(); const bucket = util.getBucketFromUptoken(uploadToken); const encodedObjectName = key ? util.urlsafeBase64Encode(key) : '~'; if (finishedEtags.uploadId) { @@ -734,28 +756,36 @@ ResumeUploader.prototype.putFile = function ( accessKey: util.getAKFromUptoken(uploadToken) }) .then(regionsProvider => { + const resumeInfo = getResumeRecordInfo(putExtra.resumeRecordFile); + let preferredEndpointsProvider; + if (resumeInfo && Array.isArray(resumeInfo.upDomains)) { + preferredEndpointsProvider = new StaticEndpointsProvider( + resumeInfo.upDomains.map(d => new Endpoint(d, { defaultScheme: preferScheme })) + ); + } return doWorkWithRetry({ workFn: sendPutReq, callbackFunc, regionsProvider, - retryPolicies: this.retryPolicies, uploadApiVersion: putExtra.version, - resumeRecordFilePath: putExtra.resumeRecordFile + // use resume upDomain firstly + preferredEndpointsProvider: preferredEndpointsProvider, + resumeRecordFilePath: putExtra.resumeRecordFile, + retryPolicies: this.retryPolicies }); }); function sendPutReq (endpoint) { + endpoint = Object.create(endpoint); + endpoint.defaultScheme = preferScheme; const rsStream = fs.createReadStream(localFile, { highWaterMark: conf.BLOCK_SIZE }); const rsStreamLen = fs.statSync(localFile).size; - const endpointValue = endpoint.getValue({ - scheme: preferScheme - }); return new Promise((resolve) => { putReq( - endpointValue, + endpoint, uploadToken, key, rsStream, diff --git a/test/resume_up.test.js b/test/resume_up.test.js index ff30ec9..a0ec4a9 100644 --- a/test/resume_up.test.js +++ b/test/resume_up.test.js @@ -373,7 +373,7 @@ describe('test resume up', function () { if (len === total) { var content = fs.readFileSync(tmpfile); var data = JSON.parse(content); - data.upDomain.should.not.empty(); + data.upDomains.should.not.empty(); data.parts.forEach(function (item) { item.should.have.keys('ctx', 'expired_at', 'crc32'); });