Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Feature: urllib3 instead of curl" #2261

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Revert "Feature: urllib3 instead of curl (#2134)"
This reverts commit 07c8852.
auvipy authored Mar 15, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 52c3682053ac22f8ea45743ad7c18b81a489381a
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ omit =
*/python?.?/*
*/site-packages/*
*/pypy/*
*kombu/async/http/urllib3_client.py
*kombu/async/http/curl.py
*kombu/five.py
*kombu/transport/mongodb.py
*kombu/transport/filesystem.py
2 changes: 1 addition & 1 deletion .github/workflows/linter.yml
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ jobs:
python-version: ["3.13"]
steps:
- name: Install system packages
run: sudo apt-get update && sudo apt-get install libssl-dev
run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev
- name: Check out code from GitHub
uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
4 changes: 2 additions & 2 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ jobs:
steps:
- name: Install apt packages
if: startsWith(matrix.os, 'blacksmith-4vcpu-ubuntu')
run: sudo apt-get update && sudo apt-get install libssl-dev
run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: useblacksmith/setup-python@v6
@@ -98,7 +98,7 @@ jobs:

steps:
- name: Install apt packages
run: sudo apt-get update && sudo apt-get install libssl-dev
run: sudo apt-get update && sudo apt-get install libcurl4-openssl-dev libssl-dev

- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
2 changes: 1 addition & 1 deletion docs/reference/index.rst
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ Kombu Asynchronous
kombu.asynchronous.debug
kombu.asynchronous.http
kombu.asynchronous.http.base
kombu.asynchronous.http.urllib3_client
kombu.asynchronous.http.curl
kombu.asynchronous.aws
kombu.asynchronous.aws.connection
kombu.asynchronous.aws.sqs
11 changes: 0 additions & 11 deletions docs/reference/kombu.asynchronous.http.urllib3_client.rst

This file was deleted.

5 changes: 2 additions & 3 deletions kombu/asynchronous/aws/connection.py
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@

from vine import promise, transform

from kombu.asynchronous.aws.ext import AWSRequest, get_cert_path, get_response
from kombu.asynchronous.aws.ext import AWSRequest, get_response
from kombu.asynchronous.http import Headers, Request, get_client


@@ -92,8 +92,7 @@ def getrequest(self):
headers = Headers(self.headers)
return self.Request(self.path, method=self.method, headers=headers,
body=self.body, connect_timeout=self.timeout,
request_timeout=self.timeout,
validate_cert=True, ca_certs=get_cert_path(True))
request_timeout=self.timeout, validate_cert=False)

def getresponse(self, callback=None):
request = self.getrequest()
4 changes: 1 addition & 3 deletions kombu/asynchronous/aws/ext.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@
import boto3
from botocore import exceptions
from botocore.awsrequest import AWSRequest
from botocore.httpsession import get_cert_path
from botocore.response import get_response
except ImportError:
boto3 = None
@@ -20,9 +19,8 @@ class BotoCoreError(Exception):
exceptions.BotoCoreError = BotoCoreError
AWSRequest = _void()
get_response = _void()
get_cert_path = _void()


__all__ = (
'exceptions', 'AWSRequest', 'get_response', 'get_cert_path',
'exceptions', 'AWSRequest', 'get_response'
)
17 changes: 11 additions & 6 deletions kombu/asynchronous/http/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from kombu.asynchronous import get_event_loop
from kombu.asynchronous.http.base import BaseClient, Headers, Request, Response
from kombu.asynchronous.http.base import Headers, Request, Response
from kombu.asynchronous.hub import Hub

__all__ = ('Client', 'Headers', 'Response', 'Request', 'get_client')
if TYPE_CHECKING:
from kombu.asynchronous.http.curl import CurlClient

Check warning on line 10 in kombu/asynchronous/http/__init__.py

Codecov / codecov/patch

kombu/asynchronous/http/__init__.py#L10

Added line #L10 was not covered by tests

__all__ = ('Client', 'Headers', 'Response', 'Request')


def Client(hub: Hub | None = None, **kwargs: int) -> BaseClient:
def Client(hub: Hub | None = None, **kwargs: int) -> CurlClient:
"""Create new HTTP client."""
from .urllib3_client import Urllib3Client
return Urllib3Client(hub, **kwargs)
from .curl import CurlClient
return CurlClient(hub, **kwargs)


def get_client(hub: Hub | None = None, **kwargs: int) -> BaseClient:
def get_client(hub: Hub | None = None, **kwargs: int) -> CurlClient:
"""Get or create HTTP client bound to the current event loop."""
hub = hub or get_event_loop()
try:
8 changes: 1 addition & 7 deletions kombu/asynchronous/http/base.py
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@
if TYPE_CHECKING:
from types import TracebackType

__all__ = ('Headers', 'Response', 'Request', 'BaseClient')
__all__ = ('Headers', 'Response', 'Request')

PYPY = hasattr(sys, 'pypy_version_info')

@@ -236,12 +236,6 @@ def header_parser(keyt=normalize_header):


class BaseClient:
"""Base class for HTTP clients.
This class provides the basic structure and functionality for HTTP clients.
Subclasses should implement specific HTTP client behavior.
"""

Headers = Headers
Request = Request
Response = Response
289 changes: 289 additions & 0 deletions kombu/asynchronous/http/curl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
"""HTTP Client using pyCurl."""

from __future__ import annotations

from collections import deque
from functools import partial
from io import BytesIO
from time import time

from kombu.asynchronous.hub import READ, WRITE, Hub, get_event_loop
from kombu.exceptions import HttpError
from kombu.utils.encoding import bytes_to_str

from .base import BaseClient

try:
import pycurl
except ImportError: # pragma: no cover
pycurl = Curl = METH_TO_CURL = None
else:
from pycurl import Curl

METH_TO_CURL = {
'GET': pycurl.HTTPGET,
'POST': pycurl.POST,
'PUT': pycurl.UPLOAD,
'HEAD': pycurl.NOBODY,
}

__all__ = ('CurlClient',)

DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; pycurl)'
EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH'])


class CurlClient(BaseClient):
"""Curl HTTP Client."""

Curl = Curl

def __init__(self, hub: Hub | None = None, max_clients: int = 10):
if pycurl is None:
raise ImportError('The curl client requires the pycurl library.')
hub = hub or get_event_loop()
super().__init__(hub)
self.max_clients = max_clients

self._multi = pycurl.CurlMulti()
self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
self._curls = [self.Curl() for i in range(max_clients)]
self._free_list = self._curls[:]
self._pending = deque()
self._fds = {}

self._socket_action = self._multi.socket_action
self._timeout_check_tref = self.hub.call_repeatedly(
1.0, self._timeout_check,
)

# pycurl 7.29.0 workaround
dummy_curl_handle = pycurl.Curl()
self._multi.add_handle(dummy_curl_handle)
self._multi.remove_handle(dummy_curl_handle)

def close(self):
self._timeout_check_tref.cancel()
for _curl in self._curls:
_curl.close()
self._multi.close()

def add_request(self, request):
self._pending.append(request)
self._process_queue()
self._set_timeout(0)
return request

# the next two methods are used for linux/epoll workaround:
# we temporarily remove all curl fds from hub, so curl cannot
# close a fd which is still inside epoll
def _pop_from_hub(self):
for fd in self._fds:
self.hub.remove(fd)

def _push_to_hub(self):
for fd, events in self._fds.items():
if events & READ:
self.hub.add_reader(fd, self.on_readable, fd)

Check warning on line 88 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L88

Added line #L88 was not covered by tests
if events & WRITE:
self.hub.add_writer(fd, self.on_writable, fd)

def _handle_socket(self, event, fd, multi, data, _pycurl=pycurl):
if event == _pycurl.POLL_REMOVE:
if fd in self._fds:
self._fds.pop(fd, None)
else:
if event == _pycurl.POLL_IN:
self._fds[fd] = READ
elif event == _pycurl.POLL_OUT:
self._fds[fd] = WRITE
elif event == _pycurl.POLL_INOUT:
self._fds[fd] = READ | WRITE

def _set_timeout(self, msecs):
self.hub.call_later(msecs, self._timeout_check)

def _timeout_check(self, _pycurl=pycurl):
self._pop_from_hub()
try:
while 1:
try:
ret, _ = self._multi.socket_all()
except pycurl.error as exc:
ret = exc.args[0]
if ret != _pycurl.E_CALL_MULTI_PERFORM:
break
finally:
self._push_to_hub()
self._process_pending_requests()

def on_readable(self, fd, _pycurl=pycurl):
return self._on_event(fd, _pycurl.CSELECT_IN)

def on_writable(self, fd, _pycurl=pycurl):
return self._on_event(fd, _pycurl.CSELECT_OUT)

def _on_event(self, fd, event, _pycurl=pycurl):
self._pop_from_hub()
try:
while 1:
try:
ret, _ = self._socket_action(fd, event)
except pycurl.error as exc:
ret = exc.args[0]

Check warning on line 134 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L128-L134

Added lines #L128 - L134 were not covered by tests
if ret != _pycurl.E_CALL_MULTI_PERFORM:
break

Check warning on line 136 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L136

Added line #L136 was not covered by tests
finally:
self._push_to_hub()
self._process_pending_requests()

Check warning on line 139 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L138-L139

Added lines #L138 - L139 were not covered by tests

def _process_pending_requests(self):
while 1:
q, succeeded, failed = self._multi.info_read()

Check warning on line 143 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L142-L143

Added lines #L142 - L143 were not covered by tests
for curl in succeeded:
self._process(curl)

Check warning on line 145 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L145

Added line #L145 was not covered by tests
for curl, errno, reason in failed:
self._process(curl, errno, reason)

Check warning on line 147 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L147

Added line #L147 was not covered by tests
if q == 0:
break
self._process_queue()

Check warning on line 150 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L149-L150

Added lines #L149 - L150 were not covered by tests

def _process_queue(self):
while 1:
started = 0

Check warning on line 154 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L153-L154

Added lines #L153 - L154 were not covered by tests
while self._free_list and self._pending:
started += 1
curl = self._free_list.pop()
request = self._pending.popleft()
headers = self.Headers()
buf = BytesIO()
curl.info = {

Check warning on line 161 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L156-L161

Added lines #L156 - L161 were not covered by tests
'headers': headers,
'buffer': buf,
'request': request,
'curl_start_time': time(),
}
self._setup_request(curl, request, buf, headers)
self._multi.add_handle(curl)

Check warning on line 168 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L167-L168

Added lines #L167 - L168 were not covered by tests
if not started:
break

Check warning on line 170 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L170

Added line #L170 was not covered by tests

def _process(self, curl, errno=None, reason=None, _pycurl=pycurl):
info, curl.info = curl.info, None
self._multi.remove_handle(curl)
self._free_list.append(curl)
buffer = info['buffer']

Check warning on line 176 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L173-L176

Added lines #L173 - L176 were not covered by tests
if errno:
code = 599
error = HttpError(code, reason)
error.errno = errno
effective_url = None
buffer.close()
buffer = None

Check warning on line 183 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L178-L183

Added lines #L178 - L183 were not covered by tests
else:
error = None
code = curl.getinfo(_pycurl.HTTP_CODE)
effective_url = curl.getinfo(_pycurl.EFFECTIVE_URL)
buffer.seek(0)

Check warning on line 188 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L185-L188

Added lines #L185 - L188 were not covered by tests
# try:
request = info['request']
request.on_ready(self.Response(

Check warning on line 191 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L190-L191

Added lines #L190 - L191 were not covered by tests
request=request, code=code, headers=info['headers'],
buffer=buffer, effective_url=effective_url, error=error,
))

def _setup_request(self, curl, request, buffer, headers, _pycurl=pycurl):
setopt = curl.setopt
setopt(_pycurl.URL, bytes_to_str(request.url))

# see tornado curl client
request.headers.setdefault('Expect', '')
request.headers.setdefault('Pragma', '')

setopt(
_pycurl.HTTPHEADER,
['{}: {}'.format(*h) for h in request.headers.items()],
)

setopt(
_pycurl.HEADERFUNCTION,
partial(request.on_header or self.on_header, request.headers),
)
setopt(
_pycurl.WRITEFUNCTION, request.on_stream or buffer.write,
)
setopt(
_pycurl.FOLLOWLOCATION, request.follow_redirects,
)
setopt(
_pycurl.USERAGENT,
bytes_to_str(request.user_agent or DEFAULT_USER_AGENT),
)
if request.network_interface:
setopt(_pycurl.INTERFACE, request.network_interface)
setopt(
_pycurl.ENCODING, 'gzip,deflate' if request.use_gzip else 'none',
)
if request.proxy_host:
if not request.proxy_port:
raise ValueError('Request with proxy_host but no proxy_port')

Check warning on line 230 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L230

Added line #L230 was not covered by tests
setopt(_pycurl.PROXY, request.proxy_host)
setopt(_pycurl.PROXYPORT, request.proxy_port)
if request.proxy_username:
setopt(_pycurl.PROXYUSERPWD, '{}:{}'.format(
request.proxy_username, request.proxy_password or ''))

setopt(_pycurl.SSL_VERIFYPEER, 1 if request.validate_cert else 0)
setopt(_pycurl.SSL_VERIFYHOST, 2 if request.validate_cert else 0)
if request.ca_certs is not None:
setopt(_pycurl.CAINFO, request.ca_certs)

setopt(_pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)

for meth in METH_TO_CURL.values():
setopt(meth, False)
try:
meth = METH_TO_CURL[request.method]
except KeyError:
curl.setopt(_pycurl.CUSTOMREQUEST, request.method)
else:
curl.unsetopt(_pycurl.CUSTOMREQUEST)
setopt(meth, True)

Check warning on line 252 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L251-L252

Added lines #L251 - L252 were not covered by tests

if request.method in ('POST', 'PUT'):
body = request.body.encode('utf-8') if request.body else b''
reqbuffer = BytesIO(body)
setopt(_pycurl.READFUNCTION, reqbuffer.read)

Check warning on line 257 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L255-L257

Added lines #L255 - L257 were not covered by tests
if request.method == 'POST':

def ioctl(cmd):

Check warning on line 260 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L260

Added line #L260 was not covered by tests
if cmd == _pycurl.IOCMD_RESTARTREAD:
reqbuffer.seek(0)
setopt(_pycurl.IOCTLFUNCTION, ioctl)
setopt(_pycurl.POSTFIELDSIZE, len(body))

Check warning on line 264 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L262-L264

Added lines #L262 - L264 were not covered by tests
else:
setopt(_pycurl.INFILESIZE, len(body))

Check warning on line 266 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L266

Added line #L266 was not covered by tests
elif request.method == 'GET':
assert not request.body

Check warning on line 268 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L268

Added line #L268 was not covered by tests

if request.auth_username is not None:
auth_mode = {
'basic': _pycurl.HTTPAUTH_BASIC,
'digest': _pycurl.HTTPAUTH_DIGEST
}[request.auth_mode or 'basic']
setopt(_pycurl.HTTPAUTH, auth_mode)
userpwd = '{}:{}'.format(
request.auth_username, request.auth_password or '',
)
setopt(_pycurl.USERPWD, userpwd)
else:
curl.unsetopt(_pycurl.USERPWD)

Check warning on line 281 in kombu/asynchronous/http/curl.py

Codecov / codecov/patch

kombu/asynchronous/http/curl.py#L281

Added line #L281 was not covered by tests

if request.client_cert is not None:
setopt(_pycurl.SSLCERT, request.client_cert)
if request.client_key is not None:
setopt(_pycurl.SSLKEY, request.client_key)

if request.on_prepare is not None:
request.on_prepare(curl)
219 changes: 0 additions & 219 deletions kombu/asynchronous/http/urllib3_client.py

This file was deleted.

4 changes: 3 additions & 1 deletion requirements/docs.txt
Original file line number Diff line number Diff line change
@@ -3,4 +3,6 @@ git+https://github.com/celery/sphinx_celery.git
-r extras/mongodb.txt
-r extras/sqlalchemy.txt
-r extras/azureservicebus.txt
-r extras/sqs.txt
# we cannot use directly extras/sqs.txt
# since readthedocs cannot install pycurl
boto3>=1.26.143
1 change: 1 addition & 0 deletions requirements/extras/sqs.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
boto3>=1.26.143
pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython"
urllib3>=1.26.16
1 change: 1 addition & 0 deletions requirements/test-ci.txt
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ pymongo>=4.1.1; sys_platform != 'win32'
-r extras/azureservicebus.txt
-r extras/azurestoragequeues.txt
boto3>=1.26.143; sys_platform != 'win32'
pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython"
urllib3>=1.26.16; sys_platform != 'win32'
-r extras/consul.txt
-r extras/zookeeper.txt
2 changes: 1 addition & 1 deletion t/unit/asynchronous/aws/case.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
import t.skip

pytest.importorskip('boto3')
pytest.importorskip('urllib3')
pytest.importorskip('pycurl')


@t.skip.if_pypy
7 changes: 0 additions & 7 deletions t/unit/asynchronous/aws/test_connection.py
Original file line number Diff line number Diff line change
@@ -90,13 +90,6 @@ def assert_request_created_with(self, url, conn):
validate_cert=VALIDATES_CERT,
)

def test_request_with_cert_path_https(self):
x = AsyncHTTPSConnection("https://example.com")
request = x.getrequest()
assert request.validate_cert is True
assert request.ca_certs is not None
assert request.ca_certs.endswith('.pem')

def test_getresponse(self):
client = Mock(name='client')
client.add_request = passthrough(name='client.add_request')
157 changes: 157 additions & 0 deletions t/unit/asynchronous/http/test_curl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations

from io import BytesIO
from unittest.mock import ANY, Mock, call, patch

import pytest

import t.skip
from kombu.asynchronous.http.curl import READ, WRITE, CurlClient

pytest.importorskip('pycurl')


@t.skip.if_pypy
@pytest.mark.usefixtures('hub')
class test_CurlClient:

class Client(CurlClient):
Curl = Mock(name='Curl')

def test_when_pycurl_missing(self, patching):
patching('kombu.asynchronous.http.curl.pycurl', None)
with pytest.raises(ImportError):
self.Client()

def test_max_clients_set(self):
x = self.Client(max_clients=303)
assert x.max_clients == 303

def test_init(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
x = self.Client()
assert x._multi is not None
assert x._pending is not None
assert x._free_list is not None
assert x._fds is not None
assert x._socket_action == x._multi.socket_action
assert len(x._curls) == x.max_clients
assert x._timeout_check_tref

x._multi.setopt.assert_has_calls([
call(_pycurl.M_TIMERFUNCTION, x._set_timeout),
call(_pycurl.M_SOCKETFUNCTION, x._handle_socket),
])

def test_close(self):
with patch('kombu.asynchronous.http.curl.pycurl'):
x = self.Client()
x._timeout_check_tref = Mock(name='timeout_check_tref')
x.close()
x._timeout_check_tref.cancel.assert_called_with()
for _curl in x._curls:
_curl.close.assert_called_with()
x._multi.close.assert_called_with()

def test_add_request(self):
with patch('kombu.asynchronous.http.curl.pycurl'):
x = self.Client()
x._process_queue = Mock(name='_process_queue')
x._set_timeout = Mock(name='_set_timeout')
request = Mock(name='request')
x.add_request(request)
assert request in x._pending
x._process_queue.assert_called_with()
x._set_timeout.assert_called_with(0)

def test_handle_socket(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
x = self.Client()
fd = Mock(name='fd1')

# POLL_REMOVE
x._fds[fd] = fd
x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl)
assert fd not in x._fds
x._handle_socket(_pycurl.POLL_REMOVE, fd, x._multi, None, _pycurl)

# POLL_IN
fds = [fd, Mock(name='fd2'), Mock(name='fd3')]
x._fds = {f: f for f in fds}
x._handle_socket(_pycurl.POLL_IN, fd, x._multi, None, _pycurl)
assert x._fds[fd] == READ

# POLL_OUT
x._handle_socket(_pycurl.POLL_OUT, fd, x._multi, None, _pycurl)
assert x._fds[fd] == WRITE

# POLL_INOUT
x._handle_socket(_pycurl.POLL_INOUT, fd, x._multi, None, _pycurl)
assert x._fds[fd] == READ | WRITE

# UNKNOWN EVENT
x._handle_socket(0xff3f, fd, x._multi, None, _pycurl)

# FD NOT IN FDS
x._fds.clear()
x._handle_socket(0xff3f, fd, x._multi, None, _pycurl)

def test_set_timeout(self):
hub = Mock(name='hub')
x = self.Client(hub)
x._set_timeout(100)
hub.call_later.assert_called_with(100, x._timeout_check)

def test_timeout_check(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
hub = Mock(name='hub')
x = self.Client(hub)
fd1, fd2 = Mock(name='fd1'), Mock(name='fd2')
x._fds = {fd1: READ}
x._process_pending_requests = Mock(name='process_pending')

def _side_effect():
x._fds = {fd2: WRITE}
return 333, 1

x._multi.socket_all.side_effect = _side_effect
_pycurl.error = KeyError

x._timeout_check(_pycurl=_pycurl)
hub.remove.assert_called_with(fd1)
hub.add_writer.assert_called_with(fd2, x.on_writable, fd2)

x._multi.socket_all.return_value = None
x._multi.socket_all.side_effect = _pycurl.error(333)
x._timeout_check(_pycurl=_pycurl)

def test_on_readable_on_writeable(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
x = self.Client()
x._on_event = Mock(name='on_event')
fd = Mock(name='fd')
x.on_readable(fd, _pycurl=_pycurl)
x._on_event.assert_called_with(fd, _pycurl.CSELECT_IN)
x.on_writable(fd, _pycurl=_pycurl)
x._on_event.assert_called_with(fd, _pycurl.CSELECT_OUT)

def test_setup_request_sets_proxy_when_specified(self):
with patch('kombu.asynchronous.http.curl.pycurl') as _pycurl:
x = self.Client()
proxy_host = 'http://www.example.com'
request = Mock(
name='request', headers={}, auth_mode=None, proxy_host=None
)
proxied_request = Mock(
name='request', headers={}, auth_mode=None,
proxy_host=proxy_host, proxy_port=123
)
x._setup_request(
x.Curl, request, BytesIO(), x.Headers(), _pycurl=_pycurl
)
with pytest.raises(AssertionError):
x.Curl.setopt.assert_any_call(_pycurl.PROXY, ANY)
x._setup_request(
x.Curl, proxied_request, BytesIO(), x.Headers(), _pycurl
)
x.Curl.setopt.assert_any_call(_pycurl.PROXY, proxy_host)
2 changes: 1 addition & 1 deletion t/unit/asynchronous/http/test_http.py
Original file line number Diff line number Diff line change
@@ -147,7 +147,7 @@ def test_as_context(self):
class test_Client:

def test_get_client(self, hub):
pytest.importorskip('urllib3')
pytest.importorskip('pycurl')
client = http.get_client()
assert client.hub is hub
client2 = http.get_client(hub)
257 changes: 0 additions & 257 deletions t/unit/asynchronous/http/test_urllib3.py

This file was deleted.