Skip to content

Conversation

@jayantsing-db
Copy link
Collaborator

Description

Implement streaming chunk provider that fetches results without dependency on total chunk count. Key components:

  • StreamingChunkProvider: Memory-bounded parallel downloads with proactive link prefetching
  • ChunkLinkFetcher interface with SeaChunkLinkFetcher for SEA API
  • StreamingChunkDownloadTask: Simplified download task with retry logic
  • Support for initial links from ResultData to avoid extra fetch calls

Enable via URL parameter: EnableStreamingChunkProvider=1

Next: Implement ThriftChunkLinkFetcher to unify SEA and Thrift code paths

Testing

Manual testing

Additional Notes to the Reviewer

Implement streaming chunk provider that fetches results without dependency on total chunk count. Key components:

- StreamingChunkProvider: Memory-bounded parallel downloads with proactive link prefetching
- ChunkLinkFetcher interface with SeaChunkLinkFetcher for SEA API
- StreamingChunkDownloadTask: Simplified download task with retry logic
- Support for initial links from ResultData to avoid extra fetch calls

Enable via URL parameter: EnableStreamingChunkProvider=1

Next: Implement ThriftChunkLinkFetcher to unify SEA and Thrift code paths
Copy link
Collaborator

@tejassp-db tejassp-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Big PR] Review is not completed yet.

Comment on lines +81 to +83
private final ReentrantLock prefetchLock = new ReentrantLock();
private final Condition consumerAdvanced = prefetchLock.newCondition();
private final Condition chunkCreated = prefetchLock.newCondition();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these be replaced with a BlockingQueue<ArrowResultChunk>?

* @return The refreshed ExternalLink with a new expiration time
* @throws DatabricksSQLException if the refetch operation fails
*/
ExternalLink refetchLink(long chunkIndex, long rowOffset) throws DatabricksSQLException;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the server endpoint API return multiple links? If so aren't we making unnecessary endpoint API calls?

Copy link
Collaborator

@gopalldb gopalldb Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do get multiple links, and likely subsequent links also need to be refreshed if they are not yet downloaded. Due to parallelism, it could be possible that some later links got downloaded earlier.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the server endpoint API return multiple links? If so aren't we making unnecessary endpoint API calls?

Yes I need to update this: Basically when we refresh for a chunk index, we should very well refresh the links for all the chunks we get in a single RPC

Note that with this consumer approach signalling link fetch, it would be rare to hit link expiry issue. I will also introduce a param to configure the link fetch window that can be adjusted based on the expected time spent by the main thread to consume the chunks

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the code should accommodate fetching multiple links with one call and reduce the number of calls. This API should change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the return type will change to a collection and corresponding changes.

if (endOfStreamReached) {
return highestKnownChunkIndex + 1;
}
return -1; // Unknown
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be documented in the API.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

LOGGER.debug("Successfully downloaded chunk {}", chunk.getChunkIndex());

} catch (IOException | DatabricksSQLException e) {
retries++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the httpClient also retry internally? Then this accounting is incorrect.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check this. I have not updated this code in the refactor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some ongoing efforts to unify the http retries in a separate branch https://github.com/databricks/databricks-jdbc/tree/retry-unification. These are yet to be merged.

This is the current behaviour:

What HTTP Client Does NOT Retry

Error Type HTTP Client Retries?
400 Bad Request No
401 Unauthorized No
403 Forbidden No
404 Not Found No
408 Request Timeout No
500 Internal Server Error No
502 Bad Gateway No
504 Gateway Timeout No
Network errors (IOException) No
Connection timeout No
SSL/TLS errors No
DNS resolution failures No
503 without Retry-After header No
429 without Retry-After header No

Given that chunk downloads from cloud storage (S3, Azure Blob, GCS):

  • Typically return 403 for expired links (not retried by HTTP client)
  • May return 500/502/504 for transient errors (not retried by HTTP client)
  • Rarely return 503/429 with Retry-After header (this is specific to Sql gateway)

For most practical scenarios, the HTTP client will NOT retry chunk downloads and therefore these task level retries were introduced. But the unify retry work https://github.com/databricks/databricks-jdbc/tree/retry-unification should address this later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, post that PR, we will only manually handle refetching an expired link once in the task and rest of the scenarios like IOException, error codes, retry delay will be handled by the HTTP client.

"Retry {} for chunk {}: {}", retries, chunk.getChunkIndex(), e.getMessage());
chunk.setStatus(ChunkStatus.DOWNLOAD_RETRY);
try {
Thread.sleep(RETRY_DELAY_MS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the httpClient retries with a delay, then this is incorrect accounting. Also why are we adding delays to a retry here?

}

List<ExternalLink> linkList =
externalLinks instanceof List
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why converting to ArrayList explicitly? can't you do collection.stream directly?

int lastIndex = resultLinks.size() - 1;
boolean hasMoreRows = resultsResp.hasMoreRows;

for (int i = 0; i < resultLinks.size(); i++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of i, use readable names

}

List<ExternalLink> linkList =
links instanceof List ? (List<ExternalLink>) links : new ArrayList<>(links);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this conversion to list? It is already a collection

long nextRowOffset = rowOffset;
long nextFetchIndex = chunkIndex;

for (int i = 0; i < resultLinks.size(); i++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better readable iterator name

}

// Exact match not found - this indicates a server bug
throw new DatabricksSQLException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log this?

Copy link
Collaborator

@tejassp-db tejassp-db Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gopalldb Throwing an exception is part of the API contract, and exceptions are expected to be handled at the place they are caught. Logging an exception is one way of handling it at the catch site.

Logging and throwing can create multiple/duplicate log entries for the same error if the place where the exception is caught also logs the error. Is there a specific reason to log and throw?

}

// Exact match not found - this indicates a server bug
throw new DatabricksSQLException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add logging

// No links returned - check if we should retry
if (!result.hasMore()) {
// No more data and no links - this is unexpected for a refetch
throw new DatabricksSQLException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here also

maxRetries);
}

throw new DatabricksSQLException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here also

}

if (chunk == null) {
throw new DatabricksSQLException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add logging


if (chunk == null) {
throw new DatabricksSQLException(
"Chunk " + currentChunkIndex + " not found after waiting",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use String.format for concating string

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String.format throws an IllegalFormatException on any formatting errors including incorrect format specifiers. It should be used when very specific formatting is required.

Generally it is safer to concatenate using + operator since it calls toString() on all the objects and handles null cases safely as well.

e,
DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR);
} catch (ExecutionException e) {
throw new DatabricksSQLException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add logging for all exceptions

@jayantsing-db jayantsing-db deleted the jayantsing-db/chunk-download branch December 9, 2025 08:28
@jayantsing-db jayantsing-db restored the jayantsing-db/chunk-download branch December 9, 2025 08:29
@jayantsing-db jayantsing-db reopened this Dec 9, 2025
@jayantsing-db
Copy link
Collaborator Author

Created a test PR for this: #1125

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants