From 818d324bb9d075e049d72791e9cadb8e73f74070 Mon Sep 17 00:00:00 2001 From: Juniper Tyree <50025784+juntyr@users.noreply.github.com> Date: Wed, 18 Sep 2024 13:31:28 +0300 Subject: [PATCH] Try to syncify in async fsspec --- patches/pyodide.patch | 211 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 186 insertions(+), 25 deletions(-) diff --git a/patches/pyodide.patch b/patches/pyodide.patch index 8b92df116..fccb71c03 100644 --- a/patches/pyodide.patch +++ b/patches/pyodide.patch @@ -218,10 +218,10 @@ index 9a3c91ae..0632c904 100644 run: diff --git a/packages/aiohttp/patches/0001-pyodide-connection.patch b/packages/aiohttp/patches/0001-pyodide-connection.patch new file mode 100644 -index 00000000..af3661a0 +index 00000000..a053c83d --- /dev/null +++ b/packages/aiohttp/patches/0001-pyodide-connection.patch -@@ -0,0 +1,264 @@ +@@ -0,0 +1,293 @@ +diff --git a/aiohttp/__init__.py b/aiohttp/__init__.py +index 8bc7a4aa..b6debab6 100644 +--- a/aiohttp/__init__.py @@ -234,10 +234,10 @@ index 00000000..af3661a0 ++from . import patch +diff --git a/aiohttp/patch.py b/aiohttp/patch.py +new file mode 100644 -+index 00000000..a2de8dec ++index 00000000..9ac98439 +--- /dev/null ++++ b/aiohttp/patch.py -+@@ -0,0 +1,248 @@ ++@@ -0,0 +1,277 @@ ++from collections.abc import Iterable ++from contextlib import suppress ++from io import BytesIO @@ -255,8 +255,10 @@ index 00000000..af3661a0 ++ __slots__ = ("_bytesio", "_jsresp", "_exception") ++ ++ def __init__(self, _jsresp): -++ self._bytesio = None -++ self._jsresp = _jsresp +++ # self._bytesio = None +++ # self._jsresp = _jsresp +++ self._bytesio = BytesIO(_jsresp) +++ self._jsresp = None ++ self._exception = None ++ ++ async def read(self, size=-1, /): @@ -425,25 +427,52 @@ index 00000000..af3661a0 ++ loop=req.loop, ++ session=req._session, ++ ) -++ from js import Headers, fetch +++ +++ from js import XMLHttpRequest ++ from pyodide.ffi import to_js ++ -++ body = None -++ if req.body: -++ body = to_js(req.body._value) -++ jsresp = await fetch( -++ str(req.url), -++ method=req.method, -++ headers=Headers.new(headers.items()), -++ body=body, -++ ) +++ xhr = XMLHttpRequest.new() +++ xhr.responseType = "arraybuffer" +++ xhr.timeout = int(real_timeout.total * 1000) +++ +++ xhr.open(req.method, str(req.url), False) +++ for name, value in headers.items(): +++ if name.lower() not in ("user-agent",): +++ xhr.setRequestHeader(name, value) +++ +++ xhr.send(to_js(req.body) if req.body is not None else None) +++ +++ from email.parser import Parser +++ headers = dict(Parser().parsestr(xhr.getAllResponseHeaders())) +++ body = xhr.response.to_py().tobytes() +++ ++ resp.version = version -++ resp.status = jsresp.status -++ resp.reason = jsresp.statusText +++ resp.status = xhr.status +++ resp.reason = xhr.statusText ++ # This is not quite correct in handling of repeated headers -++ resp._headers = CIMultiDict(jsresp.headers) -++ resp._raw_headers = tuple(tuple(e) for e in jsresp.headers) -++ resp.content = Content(jsresp) +++ resp._headers = CIMultiDict(headers) +++ resp._raw_headers = tuple(tuple(e) for e in headers) +++ resp.content = Content(body) +++ +++ # from js import Headers, fetch +++ # from pyodide.ffi import to_js +++ +++ # body = None +++ # if req.body: +++ # body = to_js(req.body._value) +++ # jsresp = await fetch( +++ # str(req.url), +++ # method=req.method, +++ # headers=Headers.new(headers.items()), +++ # body=body, +++ # ) +++ # resp.version = version +++ # resp.status = jsresp.status +++ # resp.reason = jsresp.statusText +++ # # This is not quite correct in handling of repeated headers +++ # resp._headers = CIMultiDict(jsresp.headers) +++ # resp._raw_headers = tuple(tuple(e) for e in jsresp.headers) +++ # resp.content = Content(jsresp) ++ ++ # check response status ++ if raise_for_status is None: @@ -1758,10 +1787,10 @@ index 00000000..b4213bfc + summary: A packages to search for shared libraries on various platforms + license: Apache License Version 2.0 diff --git a/packages/fsspec/meta.yaml b/packages/fsspec/meta.yaml -index 3572b48d..fa6c9b9c 100644 +index 3572b48d..027d42fe 100644 --- a/packages/fsspec/meta.yaml +++ b/packages/fsspec/meta.yaml -@@ -1,11 +1,11 @@ +@@ -1,11 +1,13 @@ package: name: fsspec - version: 2023.6.0 @@ -1771,11 +1800,143 @@ index 3572b48d..fa6c9b9c 100644 source: - url: https://files.pythonhosted.org/packages/e3/bd/4c0a4619494188a9db5d77e2100ab7d544a42e76b2447869d8e124e981d8/fsspec-2023.6.0-py3-none-any.whl - sha256: 1cbad1faef3e391fba6dc005ae9b5bdcbf43005c9167ce78c915549c352c869a -+ url: https://files.pythonhosted.org/packages/ad/30/2281c062222dc39328843bd1ddd30ff3005ef8e30b2fd09c4d2792766061/fsspec-2024.2.0-py3-none-any.whl -+ sha256: 817f969556fa5916bc682e02ca2045f96ff7f586d45110fcb76022063ad2c7d8 ++ url: https://files.pythonhosted.org/packages/28/d3/c2e0403c735548abf991bba3f45ba39194dff4569f76a99fbe77078ba7c5/fsspec-2024.2.0.tar.gz ++ sha256: b6ad1a679f760dda52b1168c859d01b7b80648ea6f7f7c7f5a8a91dc3f3ecb84 ++ patches: ++ - patches/0001-syncify-awaitable.patch about: home: http://github.com/fsspec/filesystem_spec PyPI: https://pypi.org/project/fsspec +diff --git a/packages/fsspec/patches/0001-syncify-awaitable.patch b/packages/fsspec/patches/0001-syncify-awaitable.patch +new file mode 100644 +index 00000000..c22ecbce +--- /dev/null ++++ b/packages/fsspec/patches/0001-syncify-awaitable.patch +@@ -0,0 +1,124 @@ ++diff --git a/fsspec/asyn.py b/fsspec/asyn.py ++index fb4e05e..a36e423 100644 ++--- a/fsspec/asyn.py +++++ b/fsspec/asyn.py ++@@ -18,7 +18,7 @@ from .spec import AbstractBufferedFile, AbstractFileSystem ++ from .utils import glob_translate, is_exception, other_paths ++ ++ private = re.compile("_[^_]") ++-iothread = [None] # dedicated fsspec IO thread +++# iothread = [None] # dedicated fsspec IO thread ++ loop = [None] # global event loop for any non-async instance ++ _lock = None # global lock placeholder ++ get_running_loop = asyncio.get_running_loop ++@@ -43,7 +43,7 @@ def reset_lock(): ++ """ ++ global _lock ++ ++- iothread[0] = None +++ # iothread[0] = None ++ loop[0] = None ++ _lock = None ++ ++@@ -69,40 +69,54 @@ def sync(loop, func, *args, timeout=None, **kwargs): ++ >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, ++ timeout=timeout, **kwargs) ++ """ ++- timeout = timeout if timeout else None # convert 0 or 0.0 to None ++- # NB: if the loop is not running *yet*, it is OK to submit work ++- # and we will wait for it ++- if loop is None or loop.is_closed(): ++- raise RuntimeError("Loop is not running") ++- try: ++- loop0 = asyncio.events.get_running_loop() ++- if loop0 is loop: ++- raise NotImplementedError("Calling sync() from within a running loop") ++- except NotImplementedError: ++- raise ++- except RuntimeError: ++- pass +++ ++ coro = func(*args, **kwargs) ++- result = [None] ++- event = threading.Event() ++- asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) ++- while True: ++- # this loops allows thread to get interrupted ++- if event.wait(1): ++- break ++- if timeout is not None: ++- timeout -= 1 ++- if timeout < 0: ++- raise FSTimeoutError ++- ++- return_result = result[0] ++- if isinstance(return_result, asyncio.TimeoutError): ++- # suppress asyncio.TimeoutError, raise FSTimeoutError ++- raise FSTimeoutError from return_result ++- elif isinstance(return_result, BaseException): ++- raise return_result +++ awaitable = coro.__await__() +++ +++ try: +++ next(awaitable) +++ except StopIteration as result: +++ return result.value +++ except Exception as err: +++ raise err ++ else: ++- return return_result +++ raise RuntimeError("could not syncify an awaitable") +++ +++ +++ # timeout = timeout if timeout else None # convert 0 or 0.0 to None +++ # # NB: if the loop is not running *yet*, it is OK to submit work +++ # # and we will wait for it +++ # if loop is None or loop.is_closed(): +++ # raise RuntimeError("Loop is not running") +++ # try: +++ # loop0 = asyncio.events.get_running_loop() +++ # if loop0 is loop: +++ # raise NotImplementedError("Calling sync() from within a running loop") +++ # except NotImplementedError: +++ # raise +++ # except RuntimeError: +++ # pass +++ # coro = func(*args, **kwargs) +++ # result = [None] +++ # event = threading.Event() +++ # asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) +++ # while True: +++ # # this loops allows thread to get interrupted +++ # if event.wait(1): +++ # break +++ # if timeout is not None: +++ # timeout -= 1 +++ # if timeout < 0: +++ # raise FSTimeoutError +++ +++ # return_result = result[0] +++ # if isinstance(return_result, asyncio.TimeoutError): +++ # # suppress asyncio.TimeoutError, raise FSTimeoutError +++ # raise FSTimeoutError from return_result +++ # elif isinstance(return_result, BaseException): +++ # raise return_result +++ # else: +++ # return return_result ++ ++ ++ def sync_wrapper(func, obj=None): ++@@ -144,10 +158,10 @@ def get_loop(): ++ if loop[0] is None: ++ with _selector_policy(): ++ loop[0] = asyncio.new_event_loop() ++- th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") ++- th.daemon = True ++- th.start() ++- iothread[0] = th +++ # th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") +++ # th.daemon = True +++ # th.start() +++ # iothread[0] = th ++ return loop[0] ++ ++ diff --git a/packages/git2/git2/git2/__init__.py b/packages/git2/git2/git2/__init__.py new file mode 100644 index 00000000..30068c6a