Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python/adbc_driver_manager): handle KeyboardInterrupt #1509

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/native-unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,20 @@ jobs:
- name: Test Python Driver Flight SQL
shell: bash -l {0}
run: |
# Can't use Docker on macOS
pushd $(pwd)/go/adbc
go build -o testserver ./driver/flightsql/cmd/testserver
popd
$(pwd)/go/adbc/testserver -host 0.0.0.0 -port 41414 &
while ! curl --http2-prior-knowledge -H "content-type: application/grpc" -v localhost:41414 -XPOST;
do
echo "Waiting for test server..."
jobs
sleep 5
done
export ADBC_TEST_FLIGHTSQL_URI=grpc://localhost:41414
env BUILD_ALL=0 BUILD_DRIVER_FLIGHTSQL=1 ./ci/scripts/python_test.sh "$(pwd)" "$(pwd)/build" "$HOME/local"
kill %1
- name: Build Python Driver PostgreSQL
shell: bash -l {0}
run: |
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/python_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ test_subproject() {
fi

echo "=== Testing ${subproject} ==="
echo env ${options[@]} python -m pytest -vv "${source_dir}/python/${subproject}/tests"
env ${options[@]} python -m pytest -vv "${source_dir}/python/${subproject}/tests"
echo env ${options[@]} python -m pytest -vvs --full-trace "${source_dir}/python/${subproject}/tests"
env ${options[@]} python -m pytest -vvs --full-trace "${source_dir}/python/${subproject}/tests"
echo
}

Expand Down
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ services:
dockerfile: ci/docker/flightsql-test.dockerfile
args:
GO: ${GO}
healthcheck:
test: ["CMD", "curl", "--http2-prior-knowledge", "-XPOST", "-H", "content-type: application/grpc"]
interval: 5s
timeout: 30s
retries: 3
start_period: 5m
ports:
- "41414:41414"
volumes:
Expand Down
20 changes: 20 additions & 0 deletions python/adbc_driver_flightsql/tests/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.

import re
import threading
import time

import google.protobuf.any_pb2 as any_pb2
import google.protobuf.wrappers_pb2 as wrappers_pb2
Expand Down Expand Up @@ -45,6 +47,24 @@ def test_query_cancel(test_dbapi):
cur.fetchone()


def test_query_cancel_async(test_dbapi):
with test_dbapi.cursor() as cur:
cur.execute("forever")

def _cancel():
time.sleep(2)
cur.adbc_cancel()

t = threading.Thread(target=_cancel, daemon=True)
t.start()

with pytest.raises(
test_dbapi.OperationalError,
match=re.escape("CANCELLED: [FlightSQL] context canceled"),
):
cur.fetchone()


def test_query_error_fetch(test_dbapi):
with test_dbapi.cursor() as cur:
cur.execute("error_do_get")
Expand Down
2 changes: 2 additions & 0 deletions python/adbc_driver_manager/MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ include NOTICE.txt
include adbc_driver_manager/adbc.h
include adbc_driver_manager/adbc_driver_manager.cc
include adbc_driver_manager/adbc_driver_manager.h
include adbc_driver_manager/_blocking_impl.cc
include adbc_driver_manager/_blocking_impl.h
include adbc_driver_manager/_lib.pxd
include adbc_driver_manager/_lib.pyi
include adbc_driver_manager/_reader.pyi
Expand Down
269 changes: 269 additions & 0 deletions python/adbc_driver_manager/adbc_driver_manager/_blocking_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "_blocking_impl.h"

#if defined(_WIN32)
#define NOMINMAX
#define WIN32_LEAN_AND_MEAN
#include <errno.h>
#include <fcntl.h>
#include <io.h>
#include <windows.h>
#else
#include <fcntl.h>
#include <pthread.h>
#include <unistd.h>
#endif

#include <csignal>
#include <cstring>
#include <iostream>
#include <mutex>
#include <thread>

namespace pyadbc_driver_manager {

// This is somewhat derived from io_util.cc in arrow, but that implementation
// isn't easily used outside of Arrow's monolith.
namespace {
static std::once_flag kInitOnce;
// We may encounter errors below that we can't do anything about. Use this to
// print out an error, once.
static std::once_flag kWarnOnce;
// This thread reads from a pipe forever. Whenever it reads something, it
// calls the callback below.
static std::thread kCancelThread;

static std::mutex cancel_mutex;
// This callback is registered by the Python side; basically it will call
// cancel() on an ADBC object.
static void (*cancel_callback)(void*) = nullptr;
// Callback state (a pointer to the ADBC PyObject).
static void* cancel_callback_data = nullptr;
// A nonblocking self-pipe.
static int pipe[2];
#if defined(_WIN32)
void (*old_sigint)(int);
#else
// The old signal handler (most likely Python's).
struct sigaction old_sigint;
// Our signal handler (below).
struct sigaction our_sigint;
#endif

std::string MakePipe() {
int rc = 0;
#if defined(__linux__) && defined(__GLIBC__)
rc = pipe2(pipe, O_CLOEXEC);
#elif defined(_WIN32)
rc = _pipe(pipe, 4096, _O_BINARY);
#else
rc = ::pipe(pipe);
#endif

if (rc != 0) {
return std::strerror(errno);
}

#if (!defined(__linux__) || !defined(__GLIBC__)) && !defined(_WIN32)
{
int flags = fcntl(pipe[0], F_GETFD, 0);
if (flags < 0) {
return std::strerror(errno);
}
rc = fcntl(pipe[0], F_SETFD, flags | FD_CLOEXEC);
if (rc < 0) {
return std::strerror(errno);
}

flags = fcntl(pipe[1], F_GETFD, 0);
if (flags < 0) {
return std::strerror(errno);
}
rc = fcntl(pipe[1], F_SETFD, flags | FD_CLOEXEC);
if (rc < 0) {
return std::strerror(errno);
}
}
#endif

// Make the write side nonblocking (the read side should stay blocking!)
#if defined(_WIN32)
const auto handle = reinterpret_cast<HANDLE>(_get_osfhandle(pipe[1]));
DWORD mode = PIPE_NOWAIT;
if (!SetNamedPipeHandleState(handle, &mode, nullptr, nullptr)) {
DWORD last_error = GetLastError();
LPVOID message;

FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
/*lpSource=*/nullptr, last_error,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
reinterpret_cast<LPSTR>(&message), /*nSize=*/0, /*Arguments=*/nullptr);

std::string buffer = "(";
buffer += std::to_string(last_error);
buffer += ") ";
buffer += reinterpret_cast<char*>(message);
LocalFree(message);
return buffer;
}
#else
{
int flags = fcntl(pipe[1], F_GETFL, 0);
if (flags < 0) {
return std::strerror(errno);
}
rc = fcntl(pipe[1], F_SETFL, flags | O_NONBLOCK);
if (rc < 0) {
return std::strerror(errno);
}
}
#endif

return "";
}

void InterruptThread() {
#if defined(__APPLE__)
pthread_setname_np("AdbcInterrupt");
#endif

while (true) {
char buf = 0;
// Anytime something is written to the pipe, attempt to call the callback
auto bytes_read = read(pipe[0], &buf, 1);
if (bytes_read < 0) {
if (errno == EINTR) continue;

// XXX: we failed reading from the pipe
std::string message = std::strerror(errno);
std::call_once(kWarnOnce, [&]() {
std::cerr << "adbc_driver_manager (native code): error handling interrupt: "
<< message << std::endl;
});
} else if (bytes_read > 0) {
// Save the callback locally instead of calling it under the lock, since
// otherwise we may deadlock with the Python side trying to call us
void (*local_callback)(void*) = nullptr;
void* local_callback_data = nullptr;

{
std::lock_guard<std::mutex> lock(cancel_mutex);
if (cancel_callback != nullptr) {
local_callback = cancel_callback;
local_callback_data = cancel_callback_data;
}
cancel_callback = nullptr;
cancel_callback_data = nullptr;
}

if (local_callback != nullptr) {
local_callback(local_callback_data);
}
}
}
}

// We can't do much about failures here, so ignore the result. If the pipe is
// full, that's fine; it just means the thread has fallen behind in processing
// earlier interrupts.
void SigintHandler(int) {
#if defined(_WIN32)
(void)_write(pipe[1], "X", 1);
#else
(void)write(pipe[1], "X", 1);
#endif
}

} // namespace

std::string InitBlockingCallback() {
std::string error;
std::call_once(kInitOnce, [&]() {
error = MakePipe();
if (!error.empty()) {
return;
}

#if !defined(_WIN32)
our_sigint.sa_handler = &SigintHandler;
our_sigint.sa_flags = 0;
sigemptyset(&our_sigint.sa_mask);
#endif

kCancelThread = std::thread(InterruptThread);
#if defined(__linux__)
pthread_setname_np(kCancelThread.native_handle(), "AdbcInterrupt");
#endif
kCancelThread.detach();
});
return error;
}

std::string SetBlockingCallback(void (*callback)(void*), void* data) {
std::lock_guard<std::mutex> lock(cancel_mutex);
cancel_callback = callback;
cancel_callback_data = data;

#if defined(_WIN32)
if (old_sigint == nullptr) {
old_sigint = signal(SIGINT, &SigintHandler);
if (old_sigint == SIG_ERR) {
old_sigint = nullptr;
return std::strerror(errno);
}
}
#else
// Don't set the handler again if we're somehow called twice
if (old_sigint.sa_handler == nullptr && old_sigint.sa_sigaction == nullptr) {
int rc = sigaction(SIGINT, &our_sigint, &old_sigint);
if (rc != 0) {
return std::strerror(errno);
}
}
#endif
return "";
}

std::string ClearBlockingCallback() {
std::lock_guard<std::mutex> lock(cancel_mutex);
cancel_callback = nullptr;
cancel_callback_data = nullptr;

#if defined(_WIN32)
if (old_sigint != nullptr) {
auto rc = signal(SIGINT, old_sigint);
old_sigint = nullptr;
if (rc == SIG_ERR) {
return std::strerror(errno);
}
}
#else
if (old_sigint.sa_handler != nullptr || old_sigint.sa_sigaction != nullptr) {
int rc = sigaction(SIGINT, &old_sigint, nullptr);
std::memset(&old_sigint, 0, sizeof(old_sigint));
if (rc != 0) {
return std::strerror(errno);
}
}
#endif
return "";
}

} // namespace pyadbc_driver_manager
38 changes: 38 additions & 0 deletions python/adbc_driver_manager/adbc_driver_manager/_blocking_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Allow KeyboardInterrupt to function with ADBC in Python.
///
/// Call SetBlockingCallback to register a callback. This will temporarily
/// suppress the Python SIGINT handler. When SIGINT is received, this module
/// will handle it by calling the callback.

#include <string>

namespace pyadbc_driver_manager {

/// \brief Set up internal state to handle.
/// \return An error message (or empty string).
std::string InitBlockingCallback();
/// \brief Set the callback for when SIGINT is received.
/// \return An error message (or empty string).
std::string SetBlockingCallback(void (*callback)(void*), void* data);
/// \brief Clear the callback for when SIGINT is received.
/// \return An error message (or empty string).
std::string ClearBlockingCallback();

} // namespace pyadbc_driver_manager
Loading
Loading