Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve fetch performance of DAP4 #2765

Merged
merged 7 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This file contains a high-level description of this package's evolution. Release

## 4.9.3 - TBD


* Improve the speed and data quantity for DAP4 queries. See [Github #2765](https://github.com/Unidata/netcdf-c/pull/2765).
* Remove the use of execinfo to programmatically dump the stack; it never worked. See [Github #2789](https://github.com/Unidata/netcdf-c/pull/2789).
* Update the internal copy of tinyxml2 to latest code. See [Github #2771](https://github.com/Unidata/netcdf-c/pull/2771).
* Mitigate the problem of remote/nczarr-related test interference. See [Github #2755](https://github.com/Unidata/netcdf-c/pull/2755).
Expand Down
18 changes: 12 additions & 6 deletions dap4_test/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ typedef int TDMR;
static NCbytes* input = NULL;
static NCbytes* output = NULL;
static NCD4meta* metadata = NULL;
static NCD4response* resp = NULL;
static char* infile = NULL;
static char* outfile = NULL;
static int ncid = 0;
Expand Down Expand Up @@ -85,16 +86,21 @@ setup(int tdmr, int argc, char** argv)
if(translatenc4)
controller->controls.translation = NCD4_TRANSNC4;
NCD4_applyclientfragmentcontrols(controller);
if((metadata=NCD4_newmeta(controller))==NULL)
fail(NC_ENOMEM);
metadata->mode = mode;
NCD4_attachraw(metadata, ncbyteslength(input),ncbytescontents(input));

if((ret=NCD4_dechunk(metadata))) /* ok for mode == DMR or mode == DAP */
if((ret=NCD4_newMeta(controller,&metadata)))
fail(ret);

if((ret=NCD4_newResponse(controller,&resp)))
fail(ret);
resp->raw.size = ncbyteslength(input);
resp->raw.memory = ncbytescontents(input);
resp->mode = mode;

if((ret=NCD4_dechunk(resp))) /* ok for mode == DMR or mode == DAP */
fail(ret);
#ifdef DEBUG
{
int swap = (metadata->serial.hostbigendian != metadata->serial.remotebigendian);
int swap = (controller->platform.hostlittleendian != resp->remotelittleendian);
void* d = metadata->serial.dap;
size_t sz = metadata->serial.dapsize;
fprintf(stderr,"====================\n");
Expand Down
2 changes: 1 addition & 1 deletion dap4_test/test_meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ main(int argc, char** argv)
fprintf(stderr,"t_dmrmeta %s -> %s\n",infile,outfile);
#endif

if((ret = NCD4_parse(metadata))) goto done;
if((ret = NCD4_parse(metadata,resp,0))) goto done;
if((ret = NCD4_metabuild(metadata,ncid))) goto done;

done:
Expand Down
2 changes: 1 addition & 1 deletion dap4_test/test_parse.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ main(int argc, char** argv)

setup(TDMR_PARSE,argc,argv);

if((ret = NCD4_parse(metadata))) goto done;
if((ret = NCD4_parse(metadata,resp,0))) goto done;
ret = NCD4_print(metadata,output);
ncbytesnull(output);
if(ret == NC_NOERR) {
Expand Down
26 changes: 14 additions & 12 deletions include/ncuri.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,8 @@ typedef struct NCURI {
char* path; /*!< path */
char* query; /*!< query */
char* fragment; /*!< fragment */
char** fraglist; /* envv style list of decomposed fragment*/
char** querylist; /* envv style list of decomposed query*/
#if 0
char* projection; /*!< without leading '?'*/
char* selection; /*!< with leading '&'*/
#endif
void* fraglist; /* some representation of the decomposed fragment string */
void* querylist; /* some representation of the decomposed query string */
} NCURI;

#if 0
Expand Down Expand Up @@ -90,6 +86,18 @@ EXTERNL int ncurisetfragmentkey(NCURI* duri,const char* key, const char* value);
/* append a specific &key=...& in uri fragment */
EXTERNL int ncuriappendfragmentkey(NCURI* duri,const char* key, const char* value);

/* Replace a specific &key=...& in uri query */
EXTERNL int ncurisetquerykey(NCURI* duri,const char* key, const char* value);

/* append a specific &key=...& in uri query */
EXTERNL int ncuriappendquerykey(NCURI* duri,const char* key, const char* value);

/* Get the actual list of queryies */
EXTERNL void* ncuriqueryparams(NCURI* uri);
/* Get the actual list of frags */
EXTERNL void* ncurifragmentparams(NCURI* uri);


/* Construct a complete NC URI; caller frees returned string */
EXTERNL char* ncuribuild(NCURI*,const char* prefix, const char* suffix, int flags);

Expand All @@ -105,12 +113,6 @@ EXTERNL const char* ncurifragmentlookup(NCURI*, const char* param);
*/
EXTERNL const char* ncuriquerylookup(NCURI*, const char* param);

/* Obtain the complete list of fragment pairs in envv format */
EXTERNL const char** ncurifragmentparams(NCURI*);

/* Obtain the complete list of query pairs in envv format */
EXTERNL const char** ncuriqueryparams(NCURI*);

/* URL Encode/Decode */
EXTERNL char* ncuridecode(const char* s);
/* Partial decode */
Expand Down
135 changes: 65 additions & 70 deletions libdap4/d4chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,133 +22,128 @@ and whether it has checksums.
*/

/* Forward */
static int processerrchunk(NCD4meta* metadata, void* errchunk, unsigned int count);
static int processerrchunk(NCD4response*, void* errchunk, unsigned int count);

/**************************************************/

void
NCD4_resetSerial(NCD4serial* serial, size_t rawsize, void* rawdata)
{
nullfree(serial->errdata);
nullfree(serial->dmr);
nullfree(serial->dap);
nullfree(serial->rawdata);
/* clear all fields */
memset(serial,0,sizeof(NCD4serial));
/* Reset fields */
serial->hostlittleendian = NCD4_isLittleEndian();
serial->rawsize = rawsize;
serial->rawdata = rawdata;
}

int
NCD4_dechunk(NCD4meta* metadata)
NCD4_dechunk(NCD4response* resp)
{
unsigned char *praw, *phdr, *pdap;
unsigned char *praw, *pdmr, *phdr, *pdap, *pappend, *pchunk;
NCD4HDR hdr;
int firstchunk;

#ifdef D4DUMPRAW
NCD4_tagdump(metadata->serial.rawsize,metadata->serial.rawdata,0,"RAW");
NCD4_tagdump(resp->serial.raw.size,resp->serial.raw.data,0,"RAW");
#endif

/* Access the returned raw data */
praw = metadata->serial.rawdata;
praw = (unsigned char*)resp->raw.memory;

if(metadata->mode == NCD4_DSR) {
if(resp->mode == NCD4_DSR) {
return THROW(NC_EDMR);
} else if(metadata->mode == NCD4_DMR) {
} else if(resp->mode == NCD4_DMR) {
/* Verify the mode; assume that the <?xml...?> is optional */
if(memcmp(praw,"<?xml",strlen("<?xml"))==0
|| memcmp(praw,"<Dataset",strlen("<Dataset"))==0) {
size_t len = 0;
/* setup as dmr only */
/* Avoid strdup since rawdata might contain nul chars */
len = metadata->serial.rawsize;
if((metadata->serial.dmr = malloc(len+1)) == NULL)
len = resp->raw.size;
if((resp->serial.dmr = malloc(len+1)) == NULL)
return THROW(NC_ENOMEM);
memcpy(metadata->serial.dmr,praw,len);
metadata->serial.dmr[len] = '\0';
memcpy(resp->serial.dmr,praw,len);
resp->serial.dmr[len] = '\0';
/* Suppress nuls */
(void)NCD4_elidenuls(metadata->serial.dmr,len);
(void)NCD4_elidenuls(resp->serial.dmr,len);
return THROW(NC_NOERR);
}
} else if(metadata->mode != NCD4_DAP)
} else if(resp->mode != NCD4_DAP)
return THROW(NC_EDAP);

/* We must be processing a DAP mode packet */
praw = (metadata->serial.dap = metadata->serial.rawdata);
metadata->serial.rawdata = NULL;
praw = resp->raw.memory;

/* If the raw data looks like xml, then we almost certainly have an error */
if(memcmp(praw,"<?xml",strlen("<?xml"))==0
|| memcmp(praw,"<!doctype",strlen("<!doctype"))==0) {
/* Set up to report the error */
int stat = NCD4_seterrormessage(metadata, metadata->serial.rawsize, metadata->serial.rawdata);
int stat = NCD4_seterrormessage(resp, resp->raw.size, resp->raw.memory);
return THROW(stat); /* slight lie */
}

/* Get the DMR chunk header*/
phdr = NCD4_getheader(praw,&hdr,metadata->serial.hostlittleendian);
/* Get the first header to get dmr content and endian flags*/
pdmr = NCD4_getheader(praw,&hdr,resp->controller->platform.hostlittleendian);
if(hdr.count == 0)
return THROW(NC_EDMR);
if(hdr.flags & NCD4_ERR_CHUNK) {
return processerrchunk(metadata, (void*)phdr, hdr.count);
}
if(hdr.flags & NCD4_ERR_CHUNK)
return processerrchunk(resp, (void*)pdmr, hdr.count);
resp->remotelittleendian = ((hdr.flags & NCD4_LITTLE_ENDIAN_CHUNK) ? 1 : 0);

metadata->serial.remotelittleendian = ((hdr.flags & NCD4_LITTLE_ENDIAN_CHUNK) ? 1 : 0);
/* Again, avoid strxxx operations on dmr */
if((metadata->serial.dmr = malloc(hdr.count+1)) == NULL)
/* avoid strxxx operations on dmr */
if((resp->serial.dmr = malloc(hdr.count+1)) == NULL)
return THROW(NC_ENOMEM);
memcpy(metadata->serial.dmr,phdr,hdr.count);
metadata->serial.dmr[hdr.count-1] = '\0';
memcpy(resp->serial.dmr,pdmr,hdr.count);
resp->serial.dmr[hdr.count-1] = '\0';
/* Suppress nuls */
(void)NCD4_elidenuls(metadata->serial.dmr,hdr.count);
(void)NCD4_elidenuls(resp->serial.dmr,hdr.count);

/* See if there is any data after the DMR */
if(hdr.flags & NCD4_LAST_CHUNK)
return THROW(NC_ENODATA);

/* Read and concat together the data chunks */
phdr = phdr + hdr.count; /* point to data chunk header */
phdr = pdmr + hdr.count; /* point to data chunk header */
/* Do a sanity check in case the server has shorted us with no data */
if((hdr.count + CHUNKHDRSIZE) >= metadata->serial.rawsize) {
if((hdr.count + CHUNKHDRSIZE) >= resp->raw.size) {
/* Server only sent the DMR part */
metadata->serial.dapsize = 0;
resp->serial.dapsize = 0;
return THROW(NC_EDATADDS);
}
pdap = metadata->serial.dap;
for(;;) {
phdr = NCD4_getheader(phdr,&hdr,metadata->serial.hostlittleendian);
if(hdr.flags & NCD4_ERR_CHUNK) {
return processerrchunk(metadata, (void*)phdr, hdr.count);
/* walk all the data chunks */
/* invariants:
praw -- beginning of the raw response
pdmr -- beginning of the dmr in the raw data
pdap -- beginning of the dechunked dap data
phdr -- pointer to the hdr of the current chunk
pchunk -- pointer to the data part of the current chunk
pappend -- where to append next chunk to the growing dechunked data
*/
for(firstchunk=1;;firstchunk=0) {
pchunk = NCD4_getheader(phdr,&hdr,resp->controller->platform.hostlittleendian); /* Process first data chunk header */
if(firstchunk) {
pdap = phdr; /* remember start point of the dechunked data */
pappend = phdr; /* start appending here */
}
if(hdr.flags & NCD4_ERR_CHUNK)
return processerrchunk(resp, (void*)pchunk, hdr.count);
/* data chunk; possibly last; possibly empty */
if(hdr.count > 0) {
d4memmove(pdap,phdr,hdr.count); /* will overwrite the header */
phdr += hdr.count;
pdap += hdr.count;
}
if(hdr.count > 0)
d4memmove(pappend,pchunk,hdr.count); /* overwrite the header; this the heart of dechunking */
pappend += hdr.count; /* next append point */
phdr = pchunk + hdr.count; /* point to header of next chunk */
if(hdr.flags & NCD4_LAST_CHUNK) break;
}
metadata->serial.dapsize = (size_t)DELTA(pdap,metadata->serial.dap);
resp->serial.dap = pdap; /* start of dechunked data */
resp->serial.dapsize = (size_t)DELTA(pappend,pdap);

#ifdef D4DUMPDMR
fprintf(stderr,"%s\n",metadata->serial.dmr);
fprintf(stderr,"%s\n",resp->serial.dmr);
fflush(stderr);
#endif
#ifdef D4DUMPDAP
NCD4_tagdump(metadata->serial.dapsize,metadata->serial.dap,0,"DAP");
NCD4_tagdump(resp->serial.dapsize,resp->serial.dap,0,"DAP");
#endif
return THROW(NC_NOERR);
}

static int
processerrchunk(NCD4meta* metadata, void* errchunk, unsigned int count)
processerrchunk(NCD4response* resp, void* errchunk, unsigned int count)
{
metadata->serial.errdata = (char*)d4alloc(count+1);
if(metadata->serial.errdata == NULL)
resp->serial.errdata = (char*)d4alloc(count+1);
if(resp->serial.errdata == NULL)
return THROW(NC_ENOMEM);
memcpy(metadata->serial.errdata,errchunk,count);
metadata->serial.errdata[count] = '\0';
memcpy(resp->serial.errdata,errchunk,count);
resp->serial.errdata[count] = '\0';
return THROW(NC_ENODATA); /* slight lie */
}

Expand All @@ -157,26 +152,26 @@ Given a raw response, attempt to infer the mode: DMR, DAP, DSR.
Since DSR is not standardizes, it becomes the default.
*/
int
NCD4_infermode(NCD4meta* meta)
NCD4_infermode(NCD4response* resp)
{
d4size_t size = meta->serial.rawsize;
char* raw = meta->serial.rawdata;
d4size_t size = resp->raw.size;
char* raw = resp->raw.memory;

if(size < 16)
return THROW(NC_EDAP); /* must have at least this to hold a hdr + partial dmr*/
if(memcmp(raw,"<?xml",strlen("<?xml"))==0
|| memcmp(raw,"<Dataset",strlen("<Dataset"))==0) {
meta->mode = NCD4_DMR;
resp->mode = NCD4_DMR;
goto done;
}
raw += 4; /* Pretend we have a DAP hdr */
if(memcmp(raw,"<?xml",strlen("<?xml"))==0
|| memcmp(raw,"<Dataset",strlen("<Dataset"))==0) {
meta->mode = NCD4_DAP;
resp->mode = NCD4_DAP;
goto done;
}
/* Default to DSR */
meta->mode = NCD4_DSR;
resp->mode = NCD4_DSR;

done:
return NC_NOERR;
Expand Down
4 changes: 2 additions & 2 deletions libdap4/d4curlfunctions.c
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ NCD4_get_rcproperties(NCD4INFO* state)
ncerror err = NC_NOERR;
char* option = NULL;
#ifdef HAVE_CURLOPT_BUFFERSIZE
option = NC_rclookup(D4BUFFERSIZE,state->uri->uri,NULL);
option = NC_rclookup(D4BUFFERSIZE,state->dmruri->uri,NULL);
if(option != NULL && strlen(option) != 0) {
long bufsize;
if(strcasecmp(option,"max")==0)
Expand All @@ -351,7 +351,7 @@ NCD4_get_rcproperties(NCD4INFO* state)
}
#endif
#ifdef HAVE_CURLOPT_KEEPALIVE
option = NC_rclookup(D4KEEPALIVE,state->uri->uri,NULL);
option = NC_rclookup(D4KEEPALIVE,state->dmruri->uri,NULL);
if(option != NULL && strlen(option) != 0) {
/* The keepalive value is of the form 0 or n/m,
where n is the idle time and m is the interval time;
Expand Down
Loading
Loading