Skip to content

Commit

Permalink
fix resume upload skip the first region obtained from query
Browse files Browse the repository at this point in the history
  • Loading branch information
lihsai0 committed Sep 8, 2023
1 parent c745ede commit cd4d041
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 45 deletions.
44 changes: 41 additions & 3 deletions qiniu/storage/internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,12 @@ ChangeRegionRetryPolicy.prototype.prepareRetry = function (context, ret) {
* @param {string} [options.resumeRecordFilePath]
* @param {RegionsProvider} options.regionsProvider
* @param {'v1' | 'v2' | string} options.uploadApiVersion
* @param {EndpointsProvider} [options.preferredEndpointsProvider]
*/
function UploadState (options) {
this.retryPolicies = options.retryPolicies || [];
this.regionsProvider = options.regionsProvider;
this.preferredEndpointsProvider = options.preferredEndpointsProvider;
this.context = {
serviceName: SERVICE_NAME.UP,
uploadApiVersion: options.uploadApiVersion,
Expand Down Expand Up @@ -411,9 +413,42 @@ UploadState.prototype.init = function () {
return loopRegions();
});
};
return this.regionsProvider.getRegions()
let preferredEndpoints;
return Promise.resolve()
.then(() => {
if (this.preferredEndpointsProvider) {
return this.preferredEndpointsProvider.getEndpoints();
}
return [];
})
.then(endpoints => {
preferredEndpoints = endpoints;
return this.regionsProvider.getRegions();
})
.then(regions => {
[this.context.region, ...this.context.alternativeRegions] = regions;
regions = regions.slice();
// find preferred region by preferred endpoints
let preferredRegionIndex = -1;
if (preferredEndpoints.length) {
preferredRegionIndex = regions.findIndex(r =>
r.services[this.context.serviceName].some(e =>
preferredEndpoints.map(pe => pe.host).includes(e.host)
)
);
}
// preferred endpoints is not a region, then make all regions alternative
if (preferredEndpoints.length && preferredRegionIndex < 0) {
[this.context.endpoint, ...this.context.alternativeEndpoints] = preferredEndpoints;
this.context.alternativeRegions = regions;
return Promise.resolve();
}
// preferred endpoints is a region, then reorder the regions
if (preferredEndpoints.length && preferredRegionIndex > 0) {
[this.context.region] = regions.splice(preferredRegionIndex, 1);
this.context.alternativeRegions = regions;
} else {
[this.context.region, ...this.context.alternativeRegions] = regions;
}
// check region available
if (!this.context.region) {
return Promise.reject(new Error('There isn\'t available region'));
Expand Down Expand Up @@ -477,6 +512,7 @@ UploadState.prototype.prepareRetry = function (ret) {
* @param {RetryPolicy[]} [options.retryPolicies]
* @param {'v1' | 'v2' | string} [options.uploadApiVersion]
* @param {string} [options.resumeRecordFilePath]
* @param {EndpointsProvider} [options.preferredEndpointsProvider]
* @returns {Promise<RetryRet>}
*/
function doWorkWithRetry (options) {
Expand All @@ -488,12 +524,14 @@ function doWorkWithRetry (options) {
const retryPolicies = options.retryPolicies || [];
const uploadApiVersion = options.uploadApiVersion;
const resumeRecordFilePath = options.resumeRecordFilePath;
const preferredEndpointsProvider = options.preferredEndpointsProvider;

const uploadState = new UploadState({
retryPolicies,
regionsProvider,
uploadApiVersion,
resumeRecordFilePath
resumeRecordFilePath,
preferredEndpointsProvider
});

// the workFn helper used for recursive calling to retry
Expand Down
112 changes: 71 additions & 41 deletions qiniu/storage/resume.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const {
ChangeEndpointRetryPolicy,
ChangeRegionRetryPolicy
} = require('./internal');
const { StaticEndpointsProvider } = require('../httpc/endpointsProvider');
const { Endpoint } = require('../httpc/endpoint');

exports.ResumeUploader = ResumeUploader;
exports.PutExtra = PutExtra;
Expand Down Expand Up @@ -130,23 +132,31 @@ ResumeUploader.prototype.putStream = function (
accessKey: util.getAKFromUptoken(uploadToken)
})
.then(regionsProvider => {
const resumeInfo = getResumeRecordInfo(putExtra.resumeRecordFile);
let preferredEndpointsProvider;
if (resumeInfo && Array.isArray(resumeInfo.upDomains)) {
preferredEndpointsProvider = new StaticEndpointsProvider(
resumeInfo.upDomains.map(d => new Endpoint(d, { defaultScheme: preferScheme }))
);
}
return doWorkWithRetry({
workFn: sendPutReq,

callbackFunc,
regionsProvider,
// use resume upDomain firstly
preferredEndpointsProvider: preferredEndpointsProvider,
// stream not support retry
retryPolicies: []
});
});

function sendPutReq (endpoint) {
const endpointValue = endpoint.getValue({
scheme: preferScheme
});
endpoint = Object.create(endpoint);
endpoint.defaultScheme = preferScheme;
return new Promise(resolve => {
putReq(
endpointValue,
endpoint,
uploadToken,
key,
rsStream,
Expand All @@ -158,7 +168,26 @@ ResumeUploader.prototype.putStream = function (
};

/**
* @param {string} upDomain
* @param {string} resumeRecordFilePath
* @returns {undefined | Object.<string, any>}
*/
function getResumeRecordInfo (resumeRecordFilePath) {
// get resume record info
let result;
// read resumeRecordFile
if (resumeRecordFilePath) {
try {
const resumeRecords = fs.readFileSync(resumeRecordFilePath).toString();
result = JSON.parse(resumeRecords);
} catch (e) {
e.code !== 'ENOENT' && console.error(e);
}
}
return result;
}

/**
* @param {Endpoint} upEndpoint
* @param {string} uploadToken
* @param {string | null} key
* @param {ReadableStream} rsStream
Expand All @@ -167,7 +196,7 @@ ResumeUploader.prototype.putStream = function (
* @param {reqCallback} callbackFunc
*/
function putReq (
upDomain,
upEndpoint,
uploadToken,
key,
rsStream,
Expand All @@ -182,17 +211,8 @@ function putReq (
}));

// get resume record info
let blkputRets = null;
const blkputRets = getResumeRecordInfo(putExtra.resumeRecordFile);
const totalBlockNum = Math.ceil(rsStreamLen / putExtra.partSize);
// read resumeRecordFile
if (putExtra.resumeRecordFile) {
try {
const resumeRecords = fs.readFileSync(putExtra.resumeRecordFile).toString();
blkputRets = JSON.parse(resumeRecords);
} catch (e) {
console.error(e);
}
}

// select upload version
/**
Expand All @@ -217,7 +237,7 @@ function putReq (
totalBlockNum
},
{
upDomain,
upEndpoint,
uploadToken,
key,
putExtra
Expand All @@ -237,7 +257,7 @@ function putReq (

/**
* @typedef SourceOptions
* @property { Object[] | null } blkputRets
* @property { Object.<string, any> | undefined } blkputRets
* @property { ReadableStream } rsStream
* @property { BlockStream } blkStream
* @property { number } rsStreamLen
Expand All @@ -247,7 +267,7 @@ function putReq (
/**
* @typedef UploadOptions
* @property { string | null } key
* @property { string } upDomain
* @property { Endpoint } upEndpoint
* @property { string } uploadToken
* @property { PutExtra } putExtra
*/
Expand All @@ -267,30 +287,32 @@ function putReqV1 (sourceOptions, uploadOptions, callbackFunc) {
} = sourceOptions;
let blkputRets = sourceOptions.blkputRets;
const {
upEndpoint,
key,
uploadToken,
putExtra
} = uploadOptions;

// use resume upDomain firstly
const upDomain = (blkputRets && blkputRets.upDomain) || uploadOptions.upDomain;

// initial state
const finishedCtxList = [];
const finishedBlkPutRets = {
upDomain: upDomain,
upDomains: [],
parts: []
};

// backward compatibility with ≤ 7.9.0
if (Array.isArray(blkputRets)) {
blkputRets = {
upDomain: upDomain,
upDomains: [],
parts: []
};
}
if (blkputRets && Array.isArray(blkputRets.upDomains)) {
finishedBlkPutRets.upDomains = blkputRets.upDomains;
}
finishedBlkPutRets.upDomains.push(upEndpoint.host);

// upload parts
const upDomains = upEndpoint.getValue();
let readLen = 0;
let curBlock = 0;
let isSent = false;
Expand Down Expand Up @@ -319,7 +341,7 @@ function putReqV1 (sourceOptions, uploadOptions, callbackFunc) {
if (needUploadBlk) {
blkStream.pause();
mkblkReq(
upDomain,
upDomains,
uploadToken,
chunk,
function (
Expand All @@ -346,7 +368,7 @@ function putReqV1 (sourceOptions, uploadOptions, callbackFunc) {
}
blkStream.resume();
if (finishedCtxList.length === totalBlockNum) {
mkfileReq(upDomain, uploadToken, rsStreamLen, finishedCtxList, key, putExtra, callbackFunc);
mkfileReq(upDomains, uploadToken, rsStreamLen, finishedCtxList, key, putExtra, callbackFunc);
isSent = true;
}
}
Expand All @@ -356,7 +378,7 @@ function putReqV1 (sourceOptions, uploadOptions, callbackFunc) {

blkStream.on('end', function () {
if (!isSent && rsStreamLen === 0) {
mkfileReq(upDomain, uploadToken, rsStreamLen, finishedCtxList, key, putExtra, callbackFunc);
mkfileReq(upDomains, uploadToken, rsStreamLen, finishedCtxList, key, putExtra, callbackFunc);
}
destroy(rsStream);
});
Expand All @@ -377,35 +399,35 @@ function putReqV2 (sourceOptions, uploadOptions, callbackFunc) {
rsStream
} = sourceOptions;
const {
upEndpoint,
uploadToken,
key,
putExtra
} = uploadOptions;

let upDomain = uploadOptions.upDomain;

// try resume upload blocks
let finishedBlock = 0;
const finishedEtags = {
upDomain,
upDomains: [],
etags: [],
uploadId: '',
expiredAt: 0
};
if (blkputRets !== null && blkputRets.upDomain) {
if (blkputRets && Array.isArray(blkputRets.upDomains)) {
// check etag expired or not
const expiredAt = blkputRets.expiredAt;
const timeNow = Date.now() / 1000;
if (expiredAt > timeNow && blkputRets.uploadId !== '') {
upDomain = blkputRets.upDomain;
finishedEtags.upDomain = blkputRets.upDomain;
if (expiredAt > timeNow && blkputRets.uploadId) {
finishedEtags.upDomains = blkputRets.upDomains;
finishedEtags.etags = blkputRets.etags;
finishedEtags.uploadId = blkputRets.uploadId;
finishedEtags.expiredAt = blkputRets.expiredAt;
finishedBlock = finishedEtags.etags.length;
}
}
finishedEtags.upDomains.push(upEndpoint.host);

const upDomain = upEndpoint.getValue();
const bucket = util.getBucketFromUptoken(uploadToken);
const encodedObjectName = key ? util.urlsafeBase64Encode(key) : '~';
if (finishedEtags.uploadId) {
Expand Down Expand Up @@ -734,28 +756,36 @@ ResumeUploader.prototype.putFile = function (
accessKey: util.getAKFromUptoken(uploadToken)
})
.then(regionsProvider => {
const resumeInfo = getResumeRecordInfo(putExtra.resumeRecordFile);
let preferredEndpointsProvider;
if (resumeInfo && Array.isArray(resumeInfo.upDomains)) {
preferredEndpointsProvider = new StaticEndpointsProvider(
resumeInfo.upDomains.map(d => new Endpoint(d, { defaultScheme: preferScheme }))
);
}
return doWorkWithRetry({
workFn: sendPutReq,

callbackFunc,
regionsProvider,
retryPolicies: this.retryPolicies,
uploadApiVersion: putExtra.version,
resumeRecordFilePath: putExtra.resumeRecordFile
// use resume upDomain firstly
preferredEndpointsProvider: preferredEndpointsProvider,
resumeRecordFilePath: putExtra.resumeRecordFile,
retryPolicies: this.retryPolicies
});
});

function sendPutReq (endpoint) {
endpoint = Object.create(endpoint);
endpoint.defaultScheme = preferScheme;
const rsStream = fs.createReadStream(localFile, {
highWaterMark: conf.BLOCK_SIZE
});
const rsStreamLen = fs.statSync(localFile).size;
const endpointValue = endpoint.getValue({
scheme: preferScheme
});
return new Promise((resolve) => {
putReq(
endpointValue,
endpoint,
uploadToken,
key,
rsStream,
Expand Down
2 changes: 1 addition & 1 deletion test/resume_up.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ describe('test resume up', function () {
if (len === total) {
var content = fs.readFileSync(tmpfile);
var data = JSON.parse(content);
data.upDomain.should.not.empty();
data.upDomains.should.not.empty();
data.parts.forEach(function (item) {
item.should.have.keys('ctx', 'expired_at', 'crc32');
});
Expand Down

0 comments on commit cd4d041

Please sign in to comment.