Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ public void setChunkLink(ExternalLink chunk) {
setStatus(ChunkStatus.URL_FETCHED);
}

/**
* Returns the external link for this chunk.
*
* @return the external link, or null if not set
*/
protected ExternalLink getChunkLink() {
return chunkLink;
}

/**
* Returns the current status of the chunk.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import com.databricks.jdbc.model.core.ExternalLink;
import com.google.common.annotations.VisibleForTesting;
import java.time.Instant;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -114,6 +115,24 @@ public ChunkLinkDownloadService(

this.chunkIndexToChunksMap = chunkIndexToChunksMap;

// Complete futures for chunks that already have their links (upfront-fetched)
if (nextBatchStartIndex > 0) {
LOGGER.info("Completing futures for {} upfront-fetched links", nextBatchStartIndex);
int completedCount = 0;
for (long i = 0; i < Math.min(nextBatchStartIndex, totalChunks); i++) {
T chunk = chunkIndexToChunksMap.get(i);
if (chunk != null) {
ExternalLink link = chunk.getChunkLink();
if (link != null) {
LOGGER.debug("Completing link future for chunk {} in constructor", i);
chunkIndexToLinkFuture.get(i).complete(link);
completedCount++;
}
}
}
LOGGER.info("Completed {} futures for upfront-fetched links", completedCount);
}

if (session.getConnectionContext().getClientType() == DatabricksClientType.SEA
&& isDownloadChainStarted.compareAndSet(false, true)) {
// SEA doesn't give all chunk links, so better to trigger download chain as soon as possible
Expand Down Expand Up @@ -423,4 +442,15 @@ private boolean isChunkLinkExpired(ExternalLink link) {

return expirationWithBuffer.isBefore(Instant.now());
}

/**
* Returns the CompletableFuture for a specific chunk index for testing purposes.
*
* @param chunkIndex The index of the chunk
* @return The CompletableFuture associated with the chunk index, or null if not found
*/
@VisibleForTesting
CompletableFuture<ExternalLink> getLinkFutureForTest(long chunkIndex) {
return chunkIndexToLinkFuture.get(chunkIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ChunkLinkDownloadServiceTest {
@BeforeEach
void setUp() {
when(mockSession.getConnectionContext()).thenReturn(mock(IDatabricksConnectionContext.class));
lenient().when(mockChunkMap.get(anyLong())).thenReturn(null);
}

@Test
Expand Down Expand Up @@ -267,6 +268,66 @@ void testBatchDownloadChaining()
verify(mockClient, times(1)).getResultChunks(mockStatementId, 5L);
}

@Test
void testUpfrontFetchedLinks_FuturesCompletedInConstructor()
throws ExecutionException, InterruptedException, TimeoutException {
when(mockSession.getConnectionContext().getClientType())
.thenReturn(DatabricksClientType.THRIFT);

// Create links for upfront-fetched chunks
ExternalLink link0 =
createExternalLink("url-0", 0L, Collections.emptyMap(), "2025-02-16T00:00:00Z");
ExternalLink link1 =
createExternalLink("url-1", 1L, Collections.emptyMap(), "2025-02-16T00:00:00Z");
ExternalLink link2 =
createExternalLink("url-2", 2L, Collections.emptyMap(), "2025-02-16T00:00:00Z");

// Create mock chunks with links already set
ArrowResultChunk mockChunk0 = mock(ArrowResultChunk.class);
ArrowResultChunk mockChunk1 = mock(ArrowResultChunk.class);
ArrowResultChunk mockChunk2 = mock(ArrowResultChunk.class);

ArrowResultChunk mockChunk3 = mock(ArrowResultChunk.class);
ArrowResultChunk mockChunk4 = mock(ArrowResultChunk.class);

when(mockChunk0.getChunkLink()).thenReturn(link0);
when(mockChunk1.getChunkLink()).thenReturn(link1);
when(mockChunk2.getChunkLink()).thenReturn(link2);

when(mockChunkMap.get(0L)).thenReturn(mockChunk0);
when(mockChunkMap.get(1L)).thenReturn(mockChunk1);
when(mockChunkMap.get(2L)).thenReturn(mockChunk2);
lenient().when(mockChunkMap.get(3L)).thenReturn(mockChunk3);
lenient().when(mockChunkMap.get(4L)).thenReturn(mockChunk4);

// Create service with nextBatchStartIndex = 3 (meaning chunks 0, 1, 2 were upfront-fetched)
long nextBatchStartIndex = 3L;
ChunkLinkDownloadService<ArrowResultChunk> service =
new ChunkLinkDownloadService<>(
mockSession, mockStatementId, TOTAL_CHUNKS, mockChunkMap, nextBatchStartIndex);

// Verify that futures for chunks 0, 1, 2 are already completed
CompletableFuture<ExternalLink> future0 = service.getLinkFutureForTest(0L);
CompletableFuture<ExternalLink> future1 = service.getLinkFutureForTest(1L);
CompletableFuture<ExternalLink> future2 = service.getLinkFutureForTest(2L);

assertTrue(future0.isDone(), "Future for chunk 0 should be completed");
assertTrue(future1.isDone(), "Future for chunk 1 should be completed");
assertTrue(future2.isDone(), "Future for chunk 2 should be completed");

// Verify the futures contain the correct links
assertEquals(link0, future0.get(100, TimeUnit.MILLISECONDS));
assertEquals(link1, future1.get(100, TimeUnit.MILLISECONDS));
assertEquals(link2, future2.get(100, TimeUnit.MILLISECONDS));

// Verify that futures for chunks 3, 4 are not completed
CompletableFuture<ExternalLink> future3 = service.getLinkFutureForTest(3L);
CompletableFuture<ExternalLink> future4 = service.getLinkFutureForTest(4L);

assertFalse(future3.isDone(), "Future for chunk 3 should not be completed");
assertFalse(future4.isDone(), "Future for chunk 4 should not be completed");
}

private ExternalLink createExternalLink(
String url, long chunkIndex, Map<String, String> headers, String expiration) {
ExternalLink link = new ExternalLink();
Expand Down
Loading