Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1185,4 +1185,14 @@ public boolean getDisableOauthRefreshToken() {
public boolean isTokenFederationEnabled() {
return getParameter(DatabricksJdbcUrlParams.ENABLE_TOKEN_FEDERATION, "1").equals("1");
}

@Override
public boolean isStreamingChunkProviderEnabled() {
return getParameter(DatabricksJdbcUrlParams.ENABLE_STREAMING_CHUNK_PROVIDER).equals("1");
}

@Override
public int getLinkPrefetchWindow() {
return Integer.parseInt(getParameter(DatabricksJdbcUrlParams.LINK_PREFETCH_WINDOW));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ public Long getChunkIndex() {
return chunkIndex;
}

/**
* Returns the starting row offset for this chunk.
*
* @return the row offset
*/
public long getRowOffset() {
return rowOffset;
}

/**
* Checks if the chunk link is invalid or expired.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private void logDownloadMetrics(
double speedMBps = (contentLength / 1024.0 / 1024.0) / (downloadTimeMs / 1000.0);
String baseUrl = url.split("\\?")[0];

LOGGER.info(
LOGGER.debug(
String.format(
"CloudFetch download: %.4f MB/s, %d bytes in %dms from %s",
speedMBps, contentLength, downloadTimeMs, baseUrl));
Expand Down Expand Up @@ -197,6 +197,23 @@ public Builder withChunkInfo(BaseChunkInfo baseChunkInfo) {
return this;
}

/**
* Sets chunk metadata directly without requiring a BaseChunkInfo object. Useful for streaming
* chunk creation where metadata comes from ExternalLink.
*
* @param chunkIndex The index of this chunk
* @param rowCount The number of rows in this chunk
* @param rowOffset The starting row offset for this chunk
* @return this builder
*/
public Builder withChunkMetadata(long chunkIndex, long rowCount, long rowOffset) {
this.chunkIndex = chunkIndex;
this.numRows = rowCount;
this.rowOffset = rowOffset;
this.status = status == null ? ChunkStatus.PENDING : status;
return this;
}

public Builder withInputStream(InputStream stream, long rowCount) {
this.numRows = rowCount;
this.inputStream = stream;
Expand Down
211 changes: 195 additions & 16 deletions src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.databricks.jdbc.api.impl.arrow;

import static com.databricks.jdbc.common.util.DatabricksThriftUtil.createExternalLink;
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.getColumnInfoFromTColumnDesc;

import com.databricks.jdbc.api.impl.ComplexDataTypeParser;
import com.databricks.jdbc.api.impl.IExecutionResult;
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.api.internal.IDatabricksSession;
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
import com.databricks.jdbc.common.CompressionCodec;
Expand All @@ -16,12 +18,16 @@
import com.databricks.jdbc.model.client.thrift.generated.TColumnDesc;
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
import com.databricks.jdbc.model.client.thrift.generated.TGetResultSetMetadataResp;
import com.databricks.jdbc.model.client.thrift.generated.TSparkArrowResultLink;
import com.databricks.jdbc.model.core.ChunkLinkFetchResult;
import com.databricks.jdbc.model.core.ColumnInfo;
import com.databricks.jdbc.model.core.ColumnInfoTypeName;
import com.databricks.jdbc.model.core.ExternalLink;
import com.databricks.jdbc.model.core.ResultData;
import com.databricks.jdbc.model.core.ResultManifest;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/** Result container for Arrow-based query results. */
Expand Down Expand Up @@ -69,20 +75,71 @@ public ArrowStreamResult(
"Creating ArrowStreamResult with remote links for statementId: {}",
statementId.toSQLExecStatementId());
this.chunkProvider =
new RemoteChunkProvider(
statementId,
resultManifest,
resultData,
session,
httpClient,
session.getConnectionContext().getCloudFetchThreadPoolSize());
createRemoteChunkProvider(statementId, resultManifest, resultData, session, httpClient);
}
this.columnInfos =
resultManifest.getSchema().getColumnCount() == 0
? new ArrayList<>()
: new ArrayList<>(resultManifest.getSchema().getColumns());
}

/**
* Creates the appropriate remote chunk provider based on configuration.
*
* @param statementId The statement ID
* @param resultManifest The result manifest containing chunk metadata
* @param resultData The result data containing initial external links
* @param session The session for fetching additional chunks
* @param httpClient The HTTP client for downloading chunk data
* @return A ChunkProvider instance
*/
private static ChunkProvider createRemoteChunkProvider(
StatementId statementId,
ResultManifest resultManifest,
ResultData resultData,
IDatabricksSession session,
IDatabricksHttpClient httpClient)
throws DatabricksSQLException {

IDatabricksConnectionContext connectionContext = session.getConnectionContext();

if (connectionContext.isStreamingChunkProviderEnabled()) {
LOGGER.info(
"Using StreamingChunkProvider for statementId: {}", statementId.toSQLExecStatementId());

ChunkLinkFetcher linkFetcher = new SeaChunkLinkFetcher(session, statementId);
CompressionCodec compressionCodec = resultManifest.getResultCompression();
int maxChunksInMemory = connectionContext.getCloudFetchThreadPoolSize();
int linkPrefetchWindow = connectionContext.getLinkPrefetchWindow();
int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds();
double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold();

// Convert ExternalLinks to ChunkLinkFetchResult for the provider
ChunkLinkFetchResult initialLinks =
convertToChunkLinkFetchResult(resultData.getExternalLinks());

return new StreamingChunkProvider(
linkFetcher,
httpClient,
compressionCodec,
statementId,
maxChunksInMemory,
linkPrefetchWindow,
chunkReadyTimeoutSeconds,
cloudFetchSpeedThreshold,
initialLinks);
} else {
// Use the original RemoteChunkProvider
return new RemoteChunkProvider(
statementId,
resultManifest,
resultData,
session,
httpClient,
connectionContext.getCloudFetchThreadPoolSize());
}
}

public ArrowStreamResult(
TFetchResultsResp resultsResp,
boolean isInlineArrow,
Expand Down Expand Up @@ -110,16 +167,63 @@ public ArrowStreamResult(
if (isInlineArrow) {
this.chunkProvider = new InlineChunkProvider(resultsResp, parentStatement, session);
} else {
CompressionCodec compressionCodec =
CompressionCodec.getCompressionMapping(resultsResp.getResultSetMetadata());
this.chunkProvider =
new RemoteChunkProvider(
parentStatement,
resultsResp,
session,
httpClient,
session.getConnectionContext().getCloudFetchThreadPoolSize(),
compressionCodec);
createThriftRemoteChunkProvider(resultsResp, parentStatement, session, httpClient);
}
}

/**
* Creates the appropriate remote chunk provider for Thrift based on configuration.
*
* @param resultsResp The Thrift fetch results response
* @param parentStatement The parent statement for fetching additional chunks
* @param session The session for fetching additional chunks
* @param httpClient The HTTP client for downloading chunk data
* @return A ChunkProvider instance
*/
private static ChunkProvider createThriftRemoteChunkProvider(
TFetchResultsResp resultsResp,
IDatabricksStatementInternal parentStatement,
IDatabricksSession session,
IDatabricksHttpClient httpClient)
throws DatabricksSQLException {

IDatabricksConnectionContext connectionContext = session.getConnectionContext();
CompressionCodec compressionCodec =
CompressionCodec.getCompressionMapping(resultsResp.getResultSetMetadata());

if (connectionContext.isStreamingChunkProviderEnabled()) {
StatementId statementId = parentStatement.getStatementId();
LOGGER.info("Using StreamingChunkProvider for Thrift statementId: {}", statementId);

ChunkLinkFetcher linkFetcher = new ThriftChunkLinkFetcher(session, statementId);
int maxChunksInMemory = connectionContext.getCloudFetchThreadPoolSize();
int linkPrefetchWindow = connectionContext.getLinkPrefetchWindow();
int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds();
double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold();

// Convert initial Thrift links to ChunkLinkFetchResult
ChunkLinkFetchResult initialLinks = convertThriftLinksToChunkLinkFetchResult(resultsResp);

return new StreamingChunkProvider(
linkFetcher,
httpClient,
compressionCodec,
statementId,
maxChunksInMemory,
linkPrefetchWindow,
chunkReadyTimeoutSeconds,
cloudFetchSpeedThreshold,
initialLinks);
} else {
// Use the original RemoteChunkProvider
return new RemoteChunkProvider(
parentStatement,
resultsResp,
session,
httpClient,
connectionContext.getCloudFetchThreadPoolSize(),
compressionCodec);
}
}

Expand Down Expand Up @@ -268,4 +372,79 @@ private void setColumnInfo(TGetResultSetMetadataResp resultManifest) {
columnInfos.add(getColumnInfoFromTColumnDesc(tColumnDesc));
}
}

/**
* Converts a collection of ExternalLinks to a ChunkLinkFetchResult.
*
* @param externalLinks The external links to convert, may be null
* @return A ChunkLinkFetchResult, or null if input is null or empty
*/
private static ChunkLinkFetchResult convertToChunkLinkFetchResult(
Collection<ExternalLink> externalLinks) {
if (externalLinks == null || externalLinks.isEmpty()) {
return null;
}

List<ExternalLink> linkList =
externalLinks instanceof List
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why converting to ArrayList explicitly? can't you do collection.stream directly?

? (List<ExternalLink>) externalLinks
: new ArrayList<>(externalLinks);

// Derive hasMore and nextRowOffset from last link (SEA style)
ExternalLink lastLink = linkList.get(linkList.size() - 1);
boolean hasMore = lastLink.getNextChunkIndex() != null;
long nextFetchIndex = hasMore ? lastLink.getNextChunkIndex() : -1;
long nextRowOffset = lastLink.getRowOffset() + lastLink.getRowCount();

return ChunkLinkFetchResult.of(linkList, hasMore, nextFetchIndex, nextRowOffset);
}

/**
* Converts Thrift result links to a ChunkLinkFetchResult.
*
* <p>This method converts TSparkArrowResultLink from the Thrift response to the unified
* ChunkLinkFetchResult format used by StreamingChunkProvider.
*
* @param resultsResp The Thrift fetch results response containing initial links
* @return A ChunkLinkFetchResult, or null if no links
*/
private static ChunkLinkFetchResult convertThriftLinksToChunkLinkFetchResult(
TFetchResultsResp resultsResp) {
List<TSparkArrowResultLink> resultLinks = resultsResp.getResults().getResultLinks();
if (resultLinks == null || resultLinks.isEmpty()) {
return null;
}

List<ExternalLink> chunkLinks = new ArrayList<>();
int lastIndex = resultLinks.size() - 1;
boolean hasMoreRows = resultsResp.hasMoreRows;

for (int i = 0; i < resultLinks.size(); i++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of i, use readable names

TSparkArrowResultLink thriftLink = resultLinks.get(i);

// Convert Thrift link to ExternalLink (sets chunkIndex, rowOffset, rowCount, etc.)
ExternalLink externalLink = createExternalLink(thriftLink, i);

// For the last link, set nextChunkIndex based on hasMoreRows
if (i == lastIndex) {
if (hasMoreRows) {
// More chunks available - next fetch should start from lastIndex + 1
externalLink.setNextChunkIndex((long) i + 1);
}
// If hasMoreRows is false, nextChunkIndex remains null (end of stream)
} else {
// Not the last link - next chunk follows immediately
externalLink.setNextChunkIndex((long) i + 1);
}

chunkLinks.add(externalLink);
}

// Calculate next fetch positions from last link
TSparkArrowResultLink lastThriftLink = resultLinks.get(lastIndex);
long nextFetchIndex = hasMoreRows ? lastIndex + 1 : -1;
long nextRowOffset = lastThriftLink.getStartRowOffset() + lastThriftLink.getRowCount();

return ChunkLinkFetchResult.of(chunkLinks, hasMoreRows, nextFetchIndex, nextRowOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import com.databricks.jdbc.exception.DatabricksValidationException;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import com.databricks.jdbc.model.core.ChunkLinkFetchResult;
import com.databricks.jdbc.model.core.ExternalLink;
import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -219,16 +219,18 @@ private void triggerNextBatchDownload() {
CompletableFuture.runAsync(
() -> {
try {
Collection<ExternalLink> links =
session.getDatabricksClient().getResultChunks(statementId, batchStartIndex);
// rowOffset is 0 here as this service is used by RemoteChunkProvider (SEA-only)
// which fetches by chunkIndex, not rowOffset
ChunkLinkFetchResult result =
session.getDatabricksClient().getResultChunks(statementId, batchStartIndex, 0);
LOGGER.info(
"Retrieved {} links for batch starting at {} for statement id {}",
links.size(),
result.getChunkLinks().size(),
batchStartIndex,
statementId);

// Complete futures for all chunks in this batch
for (ExternalLink link : links) {
for (ExternalLink link : result.getChunkLinks()) {
CompletableFuture<ExternalLink> future =
chunkIndexToLinkFuture.get(link.getChunkIndex());
if (future != null) {
Expand All @@ -241,9 +243,12 @@ private void triggerNextBatchDownload() {
}

// Update next batch start index and trigger next batch
if (!links.isEmpty()) {
if (!result.getChunkLinks().isEmpty()) {
long maxChunkIndex =
links.stream().mapToLong(ExternalLink::getChunkIndex).max().getAsLong();
result.getChunkLinks().stream()
.mapToLong(ExternalLink::getChunkIndex)
.max()
.getAsLong();
nextBatchStartIndex.set(maxChunkIndex + 1);
LOGGER.debug("Updated next batch start index to {}", maxChunkIndex + 1);

Expand Down
Loading
Loading