Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,26 @@ void checkIfClosed() throws DatabricksSQLException {
}
}

/**
* Marks the statement as closed without attempting to close it on the server. This should be used
* when the server has already indicated the statement is closed.
*/
public void markAsClosed() {
LOGGER.debug("Marking statement {} as closed (server already closed)", statementId);
if (resultSet != null) {
try {
this.resultSet.close();
} catch (DatabricksSQLException e) {
LOGGER.warn("Error closing result set: {}", e.getMessage());
}
this.resultSet = null;
}
this.connection.closeStatement(this);
DatabricksThreadContextHolder.clearStatementInfo();
shutDownExecutor();
this.isClosed = true;
}

/**
* Shuts down the ExecutorService used for asynchronous execution.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ public DatabricksResultSet executeStatement(
if (responseState != StatementState.SUCCEEDED && responseState != StatementState.CLOSED) {
handleFailedExecution(response, statementId, sql);
}

if (responseState == StatementState.CLOSED && parentStatement != null) {
LOGGER.debug("Statement {} returned CLOSED status, marking statement as closed", statementId);
((DatabricksStatement) parentStatement.getStatement()).markAsClosed();
}

return new DatabricksResultSet(
response.getStatus(),
typedStatementId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,93 @@ public void testIsInsertQuery() {
assertTrue(DatabricksStatement.isInsertQuery("(INSERT INTO users (id) VALUES (?))"));
}

@Test
public void testMarkAsClosed() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);

when(client.executeStatement(
eq(STATEMENT),
eq(new Warehouse(WAREHOUSE_ID)),
eq(new HashMap<>()),
eq(StatementType.QUERY),
any(IDatabricksSession.class),
eq(statement)))
.thenReturn(resultSet);

// Execute a query to set up result set
statement.executeQuery(STATEMENT);
assertFalse(statement.isClosed());

// Mark statement as closed without attempting to close on server
statement.markAsClosed();

// Verify statement is closed
assertTrue(statement.isClosed());

// Verify that closeStatement was NOT called on the client (only on connection)
verify(client, never()).closeStatement(any(StatementId.class));

// Verify that the statement cannot be used anymore
assertThrows(DatabricksSQLException.class, () -> statement.executeQuery(STATEMENT));
}

@Test
public void testMarkAsClosedWithResultSetCloseError() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);

// Create a mock result set that throws an exception on close
DatabricksResultSet mockResultSet = mock(DatabricksResultSet.class);
doThrow(new DatabricksSQLException("Error closing result set", "HY000"))
.when(mockResultSet)
.close();

when(client.executeStatement(
eq(STATEMENT),
eq(new Warehouse(WAREHOUSE_ID)),
eq(new HashMap<>()),
eq(StatementType.QUERY),
any(IDatabricksSession.class),
eq(statement)))
.thenReturn(mockResultSet);

// Execute a query to set up result set
statement.executeQuery(STATEMENT);
assertFalse(statement.isClosed());

// Mark statement as closed - should not throw even if result set close fails
assertDoesNotThrow(() -> statement.markAsClosed());

// Verify statement is still closed despite result set close error
assertTrue(statement.isClosed());

// Verify that closeStatement was NOT called on the client
verify(client, never()).closeStatement(any(StatementId.class));
}

@Test
public void testMarkAsClosedWithoutResultSet() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);

// Mark statement as closed without executing any query (no result set)
assertFalse(statement.isClosed());
statement.markAsClosed();

// Verify statement is closed
assertTrue(statement.isClosed());

// Verify that closeStatement was NOT called on the client
verify(client, never()).closeStatement(any(StatementId.class));
}

@Test
public void testRemoveEmptyEscapeClauseFromQuery() throws Exception {
IDatabricksConnectionContext connectionContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,4 +722,114 @@ public void testSeaSyncMetadataHeaderNotAddedWhenDisabled() throws Exception {
}),
eq(ExecuteStatementResponse.class));
}

@Test
public void testExecuteStatementWithClosedStatus() throws Exception {
// Set up connection and statement
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksSdkClient databricksSdkClient =
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);
DatabricksConnection connection =
new DatabricksConnection(connectionContext, databricksSdkClient);

// Mock session creation
CreateSessionResponse sessionResponse = new CreateSessionResponse().setSessionId(SESSION_ID);
when(apiClient.execute(any(Request.class), eq(CreateSessionResponse.class)))
.thenReturn(sessionResponse);
connection.open();

DatabricksStatement statement = spy(new DatabricksStatement(connection));
statement.setMaxRows(100);

// Create a response with CLOSED status
StatementStatus closedStatus = new StatementStatus().setState(StatementState.CLOSED);
ExecuteStatementResponse closedResponse =
new ExecuteStatementResponse()
.setStatementId(STATEMENT_ID.toSQLExecStatementId())
.setStatus(closedStatus)
.setResult(resultData)
.setManifest(
new ResultManifest()
.setFormat(Format.JSON_ARRAY)
.setSchema(new ResultSchema().setColumns(new ArrayList<>()).setColumnCount(0L))
.setTotalRowCount(0L));

when(apiClient.execute(any(Request.class), any()))
.thenAnswer(
invocationOnMock -> {
Request req = invocationOnMock.getArgument(0, Request.class);
if (req.getUrl().equals(STATEMENT_PATH)) {
return closedResponse;
} else if (req.getUrl().equals(SESSION_PATH)) {
return sessionResponse;
}
return null;
});

// Execute statement
databricksSdkClient.executeStatement(
STATEMENT,
warehouse,
new HashMap<>(),
StatementType.QUERY,
connection.getSession(),
statement);

// Verify that markAsClosed was called on the statement
verify(statement, times(1)).markAsClosed();
}

@Test
public void testExecuteStatementWithClosedStatusAndNoParentStatement() throws Exception {
// Set up connection without parent statement
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksSdkClient databricksSdkClient =
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);
DatabricksConnection connection =
new DatabricksConnection(connectionContext, databricksSdkClient);

// Mock session creation
CreateSessionResponse sessionResponse = new CreateSessionResponse().setSessionId(SESSION_ID);
when(apiClient.execute(any(Request.class), eq(CreateSessionResponse.class)))
.thenReturn(sessionResponse);
connection.open();

// Create a response with CLOSED status
StatementStatus closedStatus = new StatementStatus().setState(StatementState.CLOSED);
ExecuteStatementResponse closedResponse =
new ExecuteStatementResponse()
.setStatementId(STATEMENT_ID.toSQLExecStatementId())
.setStatus(closedStatus)
.setResult(resultData)
.setManifest(
new ResultManifest()
.setFormat(Format.JSON_ARRAY)
.setSchema(new ResultSchema().setColumns(new ArrayList<>()).setColumnCount(0L))
.setTotalRowCount(0L));

when(apiClient.execute(any(Request.class), any()))
.thenAnswer(
invocationOnMock -> {
Request req = invocationOnMock.getArgument(0, Request.class);
if (req.getUrl().equals(STATEMENT_PATH)) {
return closedResponse;
} else if (req.getUrl().equals(SESSION_PATH)) {
return sessionResponse;
}
return null;
});

// Execute statement with null parent statement - should not throw
assertDoesNotThrow(
() ->
databricksSdkClient.executeStatement(
STATEMENT,
warehouse,
new HashMap<>(),
StatementType.QUERY,
connection.getSession(),
null));
}
}
Loading