Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.databricks.jdbc.api.IDatabricksResultSet;
import com.databricks.jdbc.api.IExecutionStatus;
import com.databricks.jdbc.api.impl.arrow.ArrowStreamResult;
import com.databricks.jdbc.api.impl.arrow.ChunkProvider;
import com.databricks.jdbc.api.impl.converters.ConverterHelper;
import com.databricks.jdbc.api.impl.converters.ObjectConverter;
import com.databricks.jdbc.api.impl.volume.VolumeOperationResult;
Expand Down Expand Up @@ -41,6 +42,7 @@
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.http.entity.InputStreamEntity;

Expand Down Expand Up @@ -1997,6 +1999,14 @@ private BigDecimal applyScaleToBigDecimal(BigDecimal bigDecimal, int columnIndex
return bigDecimal.setScale(scale, RoundingMode.HALF_UP);
}

@VisibleForTesting
public Optional<ChunkProvider> getChunkProvider() {
if (executionResult instanceof ArrowStreamResult) {
return Optional.ofNullable(((ArrowStreamResult) executionResult).getChunkProvider());
}
return Optional.empty();
}

@Override
public String toString() {
return (new ToStringer(DatabricksResultSet.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ public Long getChunkIndex() {
return chunkIndex;
}

/**
* Returns the start row offset of this chunk in the overall result set.
*
* @return row offset
*/
public long getStartRowOffset() {
return rowOffset;
}

/**
* Checks if the chunk link is invalid or expired.
*
Expand Down Expand Up @@ -141,6 +150,10 @@ public boolean releaseChunk() {
return true;
}

public ExternalLink getChunkLink() {
return chunkLink;
}

/**
* Sets the external link details for this chunk.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ public long getAllowedChunksInMemory() {
return allowedChunksInMemory;
}

public T getChunkByIndex(long chunkIndex) {
return chunkIndexToChunksMap.get(chunkIndex);
}

/** Subclasses should override this method to perform their specific cleanup. */
protected void doClose() {
// Default implementation does nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ public long getChunkCount() {
return chunkProvider.getChunkCount();
}

/**
* Returns the chunk provider for testing purposes.
*
* @return the chunk provider wrapped in Optional
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional comment is not aligned with the return type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

*/
@VisibleForTesting
public ChunkProvider getChunkProvider() {
return chunkProvider;
}

private void setColumnInfo(TGetResultSetMetadataResp resultManifest) {
columnInfos = new ArrayList<>();
if (resultManifest.getSchema() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,18 @@ private void triggerNextBatchDownload() {
return;
}

// Calculate row offset for this batch
final long batchStartRowOffset = getChunkStartRowOffset(batchStartIndex);

LOGGER.info("Starting batch download from index {}", batchStartIndex);
currentDownloadTask =
CompletableFuture.runAsync(
() -> {
try {
Collection<ExternalLink> links =
session.getDatabricksClient().getResultChunks(statementId, batchStartIndex);
session
.getDatabricksClient()
.getResultChunks(statementId, batchStartIndex, batchStartRowOffset);
LOGGER.info(
"Retrieved {} links for batch starting at {} for statement id {}",
links.size(),
Expand Down Expand Up @@ -413,6 +418,28 @@ private void prepareNewBatchDownload(long startIndex) {
isDownloadChainStarted.set(false);
}

/**
* Gets the start row offset for a given chunk index.
*
* @param chunkIndex the chunk index to get the row offset for
* @return the start row offset for the chunk
*/
private long getChunkStartRowOffset(long chunkIndex) {
T chunk = chunkIndexToChunksMap.get(chunkIndex);
if (chunk == null) {
// Should never happen.
throw new IllegalStateException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's throw DatabricksValidationException here - as we push telemetry with these internal exceptions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DatabricksValidationException is linked to INPUT_VALIDATION_ERROR error code. Can you suggest an alternative exception.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could create a new exception stating DatabricksInvalidStateException but I guess just passing INVALID_STATE to DatabricksException lgtm

"Chunk not found in map for index "
+ chunkIndex
+ ". "
+ "Total chunks: "
+ totalChunks
+ ", StatementId: "
+ statementId);
}
return chunk.getStartRowOffset();
}

private boolean isChunkLinkExpired(ExternalLink link) {
if (link == null || link.getExpiration() == null) {
LOGGER.warn("Link or expiration is null, assuming link is expired");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public static ExternalLink createExternalLink(TSparkArrowResultLink chunkInfo, l
return new ExternalLink()
.setExternalLink(chunkInfo.getFileLink())
.setChunkIndex(chunkIndex)
.setRowCount(chunkInfo.getRowCount())
.setRowOffset(chunkInfo.getStartRowOffset())
.setByteCount(chunkInfo.getBytesNum())
.setExpiration(Long.toString(chunkInfo.getExpiryTime()));
}

Expand All @@ -90,7 +93,6 @@ public static void verifySuccessStatus(TStatus status, String errorContext, Stri
"Error thrift response received [%s] for statementId [%s]",
errorContext, statementId)
: String.format("Error thrift response received [%s]", errorContext);
LOGGER.error(errorMessage);
throw new DatabricksHttpException(errorMessage, status.getSqlState());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,10 @@ DatabricksResultSet getStatementResult(
*
* @param statementId statement-Id for which chunk should be fetched
* @param chunkIndex chunkIndex for which chunk should be fetched
* @param chunkStartRowOffset the row offset where the chunk starts in the result set
*/
Collection<ExternalLink> getResultChunks(StatementId statementId, long chunkIndex)
Collection<ExternalLink> getResultChunks(
StatementId statementId, long chunkIndex, long chunkStartRowOffset)
throws DatabricksSQLException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ public void cancelStatement(StatementId typedStatementId) throws DatabricksSQLEx
}

@Override
public Collection<ExternalLink> getResultChunks(StatementId typedStatementId, long chunkIndex)
public Collection<ExternalLink> getResultChunks(
StatementId typedStatementId, long chunkIndex, long chunkStartRowOffset)
throws DatabricksSQLException {
DatabricksThreadContextHolder.setStatementId(typedStatementId);
String statementId = typedStatementId.toSQLExecStatementId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,32 @@ TBase getThriftResponse(TBase request) throws DatabricksSQLException {
}
}

TFetchResultsResp getResultSetResp(TOperationHandle operationHandle, String context)
/**
* Fetch the next set of results for the given operation handle with default settings.
*
* @param operationHandle the operation handle
* @return TFetchResultsResp containing the results
* @throws DatabricksHttpException if fetch fails
*/
TFetchResultsResp getResultSetResp(TOperationHandle operationHandle)
throws DatabricksHttpException {
TFetchResultsReq req = createFetchResultsReqWithDefaults(operationHandle);
return executeFetchRequest(req);
}

/**
* Fetches results starting from a specific row offset.
*
* @param operationHandle the operation handle
* @param startRowOffset the row offset to start fetching from
* @return TFetchResultsResp containing the results
* @throws DatabricksHttpException if fetch fails
*/
TFetchResultsResp getResultSetResp(TOperationHandle operationHandle, long startRowOffset)
throws DatabricksHttpException {
return getResultSetResp(
new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS),
operationHandle,
context,
maxRowsPerBlock,
false);
TFetchResultsReq req = createFetchResultsReqWithDefaults(operationHandle);
req.setStartRowOffset(startRowOffset);
return executeFetchRequest(req);
}

TCancelOperationResp cancelOperation(TCancelOperationReq req) throws DatabricksHttpException {
Expand Down Expand Up @@ -166,16 +184,10 @@ TCloseOperationResp closeOperation(TCloseOperationReq req) throws DatabricksHttp

TFetchResultsResp getMoreResults(IDatabricksStatementInternal parentStatement)
throws DatabricksSQLException {
String context =
String.format(
"Fetching more results as it has more rows %s",
parentStatement.getStatementId().toSQLExecStatementId());
return getResultSetResp(
new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS),
getOperationHandle(parentStatement.getStatementId()),
context,
maxRowsPerBlock,
true);
TFetchResultsReq req =
createFetchResultsReqWithDefaults(getOperationHandle(parentStatement.getStatementId()));
setFetchMetadata(req);
return executeFetchRequest(req);
}

DatabricksResultSet execute(
Expand Down Expand Up @@ -227,15 +239,16 @@ DatabricksResultSet execute(
resultSet = response.getDirectResults().getResultSet();
resultSet.setResultSetMetadata(response.getDirectResults().getResultSetMetadata());
} else {
verifySuccessStatus(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already verify response at line 215 checkResponseForErrors(response);. That should suffice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved to after pollTillOperationFinished and before the if-else block?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verifySuccessStatus is taking response which is already tested at checkResponseForErrors(response). This check can be removed totally?

response.getStatus(), "executeStatement", statementId.toSQLExecStatementId());

// Fetch the result data after polling
TFetchResultsReq resultsReq =
createFetchResultsReqWithDefaults(response.getOperationHandle());
setFetchMetadata(resultsReq);
long fetchStartTime = System.nanoTime();
resultSet =
getResultSetResp(
response.getStatus(),
response.getOperationHandle(),
"executeStatement",
maxRowsPerBlock,
true);
resultSet = executeFetchRequest(resultsReq);

long fetchEndTime = System.nanoTime();
long fetchLatencyNanos = fetchEndTime - fetchStartTime;
long fetchLatencyMillis = fetchLatencyNanos / 1_000_000;
Expand Down Expand Up @@ -389,11 +402,18 @@ DatabricksResultSet getStatementResult(
TFetchResultsResp resultSet = null;
try {
response = getOperationStatus(request, statementId);
verifySuccessStatus(
response.getStatus(), "getStatementResult", statementId.toSQLExecStatementId());

TOperationState operationState = response.getOperationState();
if (operationState == TOperationState.FINISHED_STATE) {
long fetchStartTime = System.nanoTime();
resultSet =
getResultSetResp(response.getStatus(), operationHandle, "getStatementResult", -1, true);

TFetchResultsReq resultsReq = createFetchResultsReqWithDefaults(operationHandle);
resultsReq.setMaxRows(-1);
setFetchMetadata(resultsReq);
resultSet = executeFetchRequest(resultsReq);

long fetchEndTime = System.nanoTime();
long fetchLatencyNanos = fetchEndTime - fetchStartTime;
long fetchLatencyMillis = fetchLatencyNanos / 1_000_000;
Expand Down Expand Up @@ -481,46 +501,46 @@ void updateConfig(DatabricksConfig newConfig) {
this.databricksConfig = newConfig;
}

TFetchResultsResp getResultSetResp(
TStatus responseStatus,
TOperationHandle operationHandle,
String context,
int maxRowsPerBlock,
boolean fetchMetadata)
private TFetchResultsResp executeFetchRequest(TFetchResultsReq request)
throws DatabricksHttpException {
String statementId = StatementId.loggableStatementId(operationHandle);
verifySuccessStatus(responseStatus, context, statementId);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayantsing-db Checking the validity of a previously received response is not the responsibility of this function. Moved it to outside this function at the point where the response is received.

TFetchResultsReq request =
new TFetchResultsReq()
.setOperationHandle(operationHandle)
.setFetchType((short) 0) // 0 represents Query output. 1 represents Log
.setMaxRows(
maxRowsPerBlock) // Max number of rows that should be returned in the rowset.
.setMaxBytes(DEFAULT_BYTE_LIMIT);
if (fetchMetadata
&& ProtocolFeatureUtil.supportsResultSetMetadataFromFetch(serverProtocolVersion)) {
request.setIncludeResultSetMetadata(true); // fetch metadata if supported
}
TFetchResultsResp response;
try {
response = getThriftClient().FetchResults(request);
} catch (TException e) {
String errorMessage =
String.format(
"Error while fetching results from Thrift server. Request maxRows=%d, maxBytes=%d, Error {%s}",
"Error while fetching results from Thrift server. Request maxRows=%d, "
+ "maxBytes=%d, Error {%s}",
request.getMaxRows(), request.getMaxBytes(), e.getMessage());
LOGGER.error(e, errorMessage);
throw new DatabricksHttpException(errorMessage, e, DatabricksDriverErrorCode.INVALID_STATE);
}

String statementId = StatementId.loggableStatementId(request.getOperationHandle());
verifySuccessStatus(
response.getStatus(),
String.format(
"Error while fetching results Request maxRows=%d, maxBytes=%d. Response hasMoreRows=%s",
"Error while fetching results Request maxRows=%d, maxBytes=%d. "
+ "Response hasMoreRows=%s",
request.getMaxRows(), request.getMaxBytes(), response.hasMoreRows),
statementId);

return response;
}

private TFetchResultsReq createFetchResultsReqWithDefaults(TOperationHandle operationHandle) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for abstracting it in a single method

return new TFetchResultsReq()
.setOperationHandle(operationHandle)
.setFetchType((short) 0) // 0 represents Query output. 1 represents Log
.setMaxRows(maxRowsPerBlock) // Max number of rows that should be returned in the rowset.
.setMaxBytes(DEFAULT_BYTE_LIMIT);
}

private void setFetchMetadata(TFetchResultsReq request) {
if (ProtocolFeatureUtil.supportsResultSetMetadataFromFetch(serverProtocolVersion)) {
request.setIncludeResultSetMetadata(true);
}
}

private TFetchResultsResp listFunctions(TGetFunctionsReq request)
throws TException, DatabricksSQLException {
if (enableDirectResults) request.setGetDirectResults(DEFAULT_DIRECT_RESULTS);
Expand Down Expand Up @@ -663,8 +683,11 @@ TFetchResultsResp fetchMetadataResults(TResp response, String contextDescription
// Fetch the result data after polling
FResp statusField = response.fieldForId(statusFieldId);
TStatus status = (TStatus) response.getFieldValue(statusField);
return getResultSetResp(
status, operationHandle, contextDescription, DEFAULT_ROW_LIMIT_PER_BLOCK, false);
verifySuccessStatus(status, contextDescription, statementId);

TFetchResultsReq resultsReq = createFetchResultsReqWithDefaults(operationHandle);
resultsReq.setMaxRows(DEFAULT_ROW_LIMIT_PER_BLOCK);
return executeFetchRequest(resultsReq);
}
}

Expand Down
Loading
Loading