Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: async read function by default #368

Open
wants to merge 1 commit into
base: v3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions samples/Encryption.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const flow = new Flow({
uploadMethod: 'POST',
fileParameterName: 'file',
// Asynchronous function called before each chunk upload request
asyncReadFileFn: async function(flowObj, startByte, endByte, fileType, chunk) {
readFileFn: async function(flowObj, startByte, endByte, fileType, chunk) {
// Load file chunk in memory
const plaintextbytes = await readFileChunk(flowObj.file, startByte, endByte);
// Encrypt chunk
Expand Down Expand Up @@ -109,7 +109,7 @@ class StreamEncryptor {
var encryptor = new StreamEncryptor(gpgKeys);
new Flow({
// ...
asyncReadFileFn: encryptor.read.bind(encryptor),
readFileFn: encryptor.read.bind(encryptor),
initFileFn: encryptor.init.bind(encryptor),
forceChunkSize: true,
});
Expand Down
70 changes: 15 additions & 55 deletions src/FlowChunk.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,30 +260,7 @@ export default class FlowChunk {
}

/**
* Finish preprocess state
* @function
*/
async preprocessFinished() {
// Re-compute the endByte after the preprocess function to allow an
// implementer of preprocess to set the fileObj size
this.endByte = this.computeEndByte();

this.preprocessState = 2;
await this.send();
}

/**
* Finish read state
* @function
*/
readFinished(payload) {
this.readState = 2;
this.payload = payload;
this.send();
}

/**
* asyncReadFileFn() helper provides the ability of asynchronous read()
* readFileFn() helper provides the ability of asynchronous read()
* Eg: When reading from a ReadableStream.getReader().
*
* But:
Expand Down Expand Up @@ -322,15 +299,17 @@ export default class FlowChunk {

this.readState = 1;
await this.readStreamGuard();
var data, asyncRead = this.flowObj.opts.asyncReadFileFn;

data = await asyncRead(this.fileObj, this.startByte, this.endByte, this.fileObj.file.type, this);
let data = await this.flowObj.opts.readFileFn(this.fileObj, this.startByte, this.endByte, this.fileObj.file.type, this);
this.readStreamState.resolve();

// Equivalent to readFinished()
// Equivalent to former readFinished()
this.readState = 2;

if (data) {
if (typeof data === 'string') { // In case a regular string is returned
data = new Blob([data], {type: 'application/octet-stream'});
}

this.readBytes = data.size || data.size === 0 ? data.size : -1;
}

Expand Down Expand Up @@ -374,36 +353,17 @@ export default class FlowChunk {
*/
async send() {
var preprocess = this.flowObj.opts.preprocess;
var read = this.flowObj.opts.readFileFn;
var asyncRead = this.flowObj.opts.asyncReadFileFn;

if (typeof preprocess === 'function') {
switch (this.preprocessState) {
case 0:
this.preprocessState = 1;
preprocess(this);
return;
case 1:
return;
}
}

if (asyncRead) {
await this.readStreamChunk();
return;
}

switch (this.readState) {
case 0:
this.readState = 1;
const data = read(this.fileObj, this.startByte, this.endByte, this.fileObj.file.type, this);
this.readFinished(data);
return;
case 1:
return;
this.preprocessState = 1;
await preprocess(this);
// Finish preprocess state
// Re-compute the endByte after the preprocess function to allow an
// implementer of preprocess to set the fileObj size
this.endByte = this.computeEndByte();
this.preprocessState = 2;
}

this.xhrSend(this.payload);
await this.readStreamChunk();
}

/**
Expand Down
10 changes: 5 additions & 5 deletions test/asyncSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ describe('upload stream', function() {
xhr_server.restore();
});

it('synchronous initFileFn and asyncReadFileFn', function (done) {
it('synchronous initFileFn and async readFileFn', function (done) {
// No File.stream() support : No test
// No support for skipping() test from Jasmine (https://github.com/jasmine/jasmine/issues/1709)
if (typeof Blob === 'undefined' || Blob.prototype.stream !== 'function') {
Expand Down Expand Up @@ -145,7 +145,7 @@ describe('upload stream', function() {
flow.opts.simultaneousUploads = simultaneousUploads;
flow.opts.initFileFn = streamer.init.bind(streamer);
flow.opts.readFileFn = streamer.read.bind(streamer);
flow.opts.asyncReadFileFn = streamer.read.bind(streamer);

(async function() {
await flow.addFile(sample_file);
await flow.upload();
Expand Down Expand Up @@ -208,7 +208,7 @@ describe('upload stream', function() {

var streamer = new Streamer(1);
flow.opts.initFileFn = streamer.init.bind(streamer);
flow.opts.asyncReadFileFn = streamer.read.bind(streamer);
flow.opts.readFileFn = streamer.read.bind(streamer);

flow.opts.chunkSize = 1;
flow.opts.maxChunkRetries = 3;
Expand Down Expand Up @@ -242,7 +242,7 @@ describe('upload stream', function() {
xhr_server.respondWith([200, { "Content-Type": "text/plain" }, 'ok']);
var streamer = new Streamer(1);
flow.opts.initFileFn = streamer.init.bind(streamer);
flow.opts.asyncReadFileFn = streamer.read.bind(streamer);
flow.opts.readFileFn = streamer.read.bind(streamer);

flow.opts.chunkSize = 1;
flow.opts.maxChunkRetries = 3;
Expand Down Expand Up @@ -283,7 +283,7 @@ describe('upload stream', function() {
xhr_server.configure({autoRespond: false, respondImmediately: false});
var streamer = new Streamer(1);
flow.opts.initFileFn = streamer.init.bind(streamer);
flow.opts.asyncReadFileFn = streamer.read.bind(streamer);
flow.opts.readFileFn = streamer.read.bind(streamer);

flow.opts.chunkSize = 1;
flow.opts.maxChunkRetries = 3;
Expand Down
4 changes: 1 addition & 3 deletions test/uploadSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ describe('upload file', function() {
expect(xhr.requests.length).toBe(0);
expect(preprocess).toHaveBeenCalledWith(file.chunks[0]);
expect(file.chunks[0].preprocessState).toBe(1);
file.chunks[0].preprocessFinished();
expect(xhr.requests.length).toBe(1);
xhr.requests[0].respond(200, [], "response");
expect(success).toHaveBeenCalledWith(asCustomEvent(file, "response", file.chunks[0]));
Expand All @@ -416,7 +415,7 @@ describe('upload file', function() {
var file = flow.files[0];
var secondFile = flow.files[1];
await flow.upload();
expect(xhr.requests.length).toBe(0);
expect(xhr.requests.length).toBe(1);
expect(preprocess).toHaveBeenCalledWith(file.chunks[0]);
expect(preprocess).not.toHaveBeenCalledWith(secondFile.chunks[0]);

Expand All @@ -439,7 +438,6 @@ describe('upload file', function() {
await flow.upload();
for(var i=0; i<file.chunks.length; i++) {
expect(preprocess).toHaveBeenCalledWith(file.chunks[i]);
file.chunks[i].preprocessFinished();
await file.pause();
await file.resume();
xhr.requests[xhr.requests.length-1].respond(200, [], "response");
Expand Down