Skip to content

Commit

Permalink
Try to syncify in async fsspec
Browse files Browse the repository at this point in the history
  • Loading branch information
juntyr authored Sep 18, 2024
1 parent 93d19ac commit 818d324
Showing 1 changed file with 186 additions and 25 deletions.
211 changes: 186 additions & 25 deletions patches/pyodide.patch
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ index 9a3c91ae..0632c904 100644
run:
diff --git a/packages/aiohttp/patches/0001-pyodide-connection.patch b/packages/aiohttp/patches/0001-pyodide-connection.patch
new file mode 100644
index 00000000..af3661a0
index 00000000..a053c83d
--- /dev/null
+++ b/packages/aiohttp/patches/0001-pyodide-connection.patch
@@ -0,0 +1,264 @@
@@ -0,0 +1,293 @@
+diff --git a/aiohttp/__init__.py b/aiohttp/__init__.py
+index 8bc7a4aa..b6debab6 100644
+--- a/aiohttp/__init__.py
Expand All @@ -234,10 +234,10 @@ index 00000000..af3661a0
++from . import patch
+diff --git a/aiohttp/patch.py b/aiohttp/patch.py
+new file mode 100644
+index 00000000..a2de8dec
+index 00000000..9ac98439
+--- /dev/null
++++ b/aiohttp/patch.py
+@@ -0,0 +1,248 @@
+@@ -0,0 +1,277 @@
++from collections.abc import Iterable
++from contextlib import suppress
++from io import BytesIO
Expand All @@ -255,8 +255,10 @@ index 00000000..af3661a0
++ __slots__ = ("_bytesio", "_jsresp", "_exception")
++
++ def __init__(self, _jsresp):
++ self._bytesio = None
++ self._jsresp = _jsresp
++ # self._bytesio = None
++ # self._jsresp = _jsresp
++ self._bytesio = BytesIO(_jsresp)
++ self._jsresp = None
++ self._exception = None
++
++ async def read(self, size=-1, /):
Expand Down Expand Up @@ -425,25 +427,52 @@ index 00000000..af3661a0
++ loop=req.loop,
++ session=req._session,
++ )
++ from js import Headers, fetch
++
++ from js import XMLHttpRequest
++ from pyodide.ffi import to_js
++
++ body = None
++ if req.body:
++ body = to_js(req.body._value)
++ jsresp = await fetch(
++ str(req.url),
++ method=req.method,
++ headers=Headers.new(headers.items()),
++ body=body,
++ )
++ xhr = XMLHttpRequest.new()
++ xhr.responseType = "arraybuffer"
++ xhr.timeout = int(real_timeout.total * 1000)
++
++ xhr.open(req.method, str(req.url), False)
++ for name, value in headers.items():
++ if name.lower() not in ("user-agent",):
++ xhr.setRequestHeader(name, value)
++
++ xhr.send(to_js(req.body) if req.body is not None else None)
++
++ from email.parser import Parser
++ headers = dict(Parser().parsestr(xhr.getAllResponseHeaders()))
++ body = xhr.response.to_py().tobytes()
++
++ resp.version = version
++ resp.status = jsresp.status
++ resp.reason = jsresp.statusText
++ resp.status = xhr.status
++ resp.reason = xhr.statusText
++ # This is not quite correct in handling of repeated headers
++ resp._headers = CIMultiDict(jsresp.headers)
++ resp._raw_headers = tuple(tuple(e) for e in jsresp.headers)
++ resp.content = Content(jsresp)
++ resp._headers = CIMultiDict(headers)
++ resp._raw_headers = tuple(tuple(e) for e in headers)
++ resp.content = Content(body)
++
++ # from js import Headers, fetch
++ # from pyodide.ffi import to_js
++
++ # body = None
++ # if req.body:
++ # body = to_js(req.body._value)
++ # jsresp = await fetch(
++ # str(req.url),
++ # method=req.method,
++ # headers=Headers.new(headers.items()),
++ # body=body,
++ # )
++ # resp.version = version
++ # resp.status = jsresp.status
++ # resp.reason = jsresp.statusText
++ # # This is not quite correct in handling of repeated headers
++ # resp._headers = CIMultiDict(jsresp.headers)
++ # resp._raw_headers = tuple(tuple(e) for e in jsresp.headers)
++ # resp.content = Content(jsresp)
++
++ # check response status
++ if raise_for_status is None:
Expand Down Expand Up @@ -1758,10 +1787,10 @@ index 00000000..b4213bfc
+ summary: A packages to search for shared libraries on various platforms
+ license: Apache License Version 2.0
diff --git a/packages/fsspec/meta.yaml b/packages/fsspec/meta.yaml
index 3572b48d..fa6c9b9c 100644
index 3572b48d..027d42fe 100644
--- a/packages/fsspec/meta.yaml
+++ b/packages/fsspec/meta.yaml
@@ -1,11 +1,11 @@
@@ -1,11 +1,13 @@
package:
name: fsspec
- version: 2023.6.0
Expand All @@ -1771,11 +1800,143 @@ index 3572b48d..fa6c9b9c 100644
source:
- url: https://files.pythonhosted.org/packages/e3/bd/4c0a4619494188a9db5d77e2100ab7d544a42e76b2447869d8e124e981d8/fsspec-2023.6.0-py3-none-any.whl
- sha256: 1cbad1faef3e391fba6dc005ae9b5bdcbf43005c9167ce78c915549c352c869a
+ url: https://files.pythonhosted.org/packages/ad/30/2281c062222dc39328843bd1ddd30ff3005ef8e30b2fd09c4d2792766061/fsspec-2024.2.0-py3-none-any.whl
+ sha256: 817f969556fa5916bc682e02ca2045f96ff7f586d45110fcb76022063ad2c7d8
+ url: https://files.pythonhosted.org/packages/28/d3/c2e0403c735548abf991bba3f45ba39194dff4569f76a99fbe77078ba7c5/fsspec-2024.2.0.tar.gz
+ sha256: b6ad1a679f760dda52b1168c859d01b7b80648ea6f7f7c7f5a8a91dc3f3ecb84
+ patches:
+ - patches/0001-syncify-awaitable.patch
about:
home: http://github.com/fsspec/filesystem_spec
PyPI: https://pypi.org/project/fsspec
diff --git a/packages/fsspec/patches/0001-syncify-awaitable.patch b/packages/fsspec/patches/0001-syncify-awaitable.patch
new file mode 100644
index 00000000..c22ecbce
--- /dev/null
+++ b/packages/fsspec/patches/0001-syncify-awaitable.patch
@@ -0,0 +1,124 @@
+diff --git a/fsspec/asyn.py b/fsspec/asyn.py
+index fb4e05e..a36e423 100644
+--- a/fsspec/asyn.py
++++ b/fsspec/asyn.py
+@@ -18,7 +18,7 @@ from .spec import AbstractBufferedFile, AbstractFileSystem
+ from .utils import glob_translate, is_exception, other_paths
+
+ private = re.compile("_[^_]")
+-iothread = [None] # dedicated fsspec IO thread
++# iothread = [None] # dedicated fsspec IO thread
+ loop = [None] # global event loop for any non-async instance
+ _lock = None # global lock placeholder
+ get_running_loop = asyncio.get_running_loop
+@@ -43,7 +43,7 @@ def reset_lock():
+ """
+ global _lock
+
+- iothread[0] = None
++ # iothread[0] = None
+ loop[0] = None
+ _lock = None
+
+@@ -69,40 +69,54 @@ def sync(loop, func, *args, timeout=None, **kwargs):
+ >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
+ timeout=timeout, **kwargs)
+ """
+- timeout = timeout if timeout else None # convert 0 or 0.0 to None
+- # NB: if the loop is not running *yet*, it is OK to submit work
+- # and we will wait for it
+- if loop is None or loop.is_closed():
+- raise RuntimeError("Loop is not running")
+- try:
+- loop0 = asyncio.events.get_running_loop()
+- if loop0 is loop:
+- raise NotImplementedError("Calling sync() from within a running loop")
+- except NotImplementedError:
+- raise
+- except RuntimeError:
+- pass
++
+ coro = func(*args, **kwargs)
+- result = [None]
+- event = threading.Event()
+- asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
+- while True:
+- # this loops allows thread to get interrupted
+- if event.wait(1):
+- break
+- if timeout is not None:
+- timeout -= 1
+- if timeout < 0:
+- raise FSTimeoutError
+-
+- return_result = result[0]
+- if isinstance(return_result, asyncio.TimeoutError):
+- # suppress asyncio.TimeoutError, raise FSTimeoutError
+- raise FSTimeoutError from return_result
+- elif isinstance(return_result, BaseException):
+- raise return_result
++ awaitable = coro.__await__()
++
++ try:
++ next(awaitable)
++ except StopIteration as result:
++ return result.value
++ except Exception as err:
++ raise err
+ else:
+- return return_result
++ raise RuntimeError("could not syncify an awaitable")
++
++
++ # timeout = timeout if timeout else None # convert 0 or 0.0 to None
++ # # NB: if the loop is not running *yet*, it is OK to submit work
++ # # and we will wait for it
++ # if loop is None or loop.is_closed():
++ # raise RuntimeError("Loop is not running")
++ # try:
++ # loop0 = asyncio.events.get_running_loop()
++ # if loop0 is loop:
++ # raise NotImplementedError("Calling sync() from within a running loop")
++ # except NotImplementedError:
++ # raise
++ # except RuntimeError:
++ # pass
++ # coro = func(*args, **kwargs)
++ # result = [None]
++ # event = threading.Event()
++ # asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
++ # while True:
++ # # this loops allows thread to get interrupted
++ # if event.wait(1):
++ # break
++ # if timeout is not None:
++ # timeout -= 1
++ # if timeout < 0:
++ # raise FSTimeoutError
++
++ # return_result = result[0]
++ # if isinstance(return_result, asyncio.TimeoutError):
++ # # suppress asyncio.TimeoutError, raise FSTimeoutError
++ # raise FSTimeoutError from return_result
++ # elif isinstance(return_result, BaseException):
++ # raise return_result
++ # else:
++ # return return_result
+
+
+ def sync_wrapper(func, obj=None):
+@@ -144,10 +158,10 @@ def get_loop():
+ if loop[0] is None:
+ with _selector_policy():
+ loop[0] = asyncio.new_event_loop()
+- th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
+- th.daemon = True
+- th.start()
+- iothread[0] = th
++ # th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
++ # th.daemon = True
++ # th.start()
++ # iothread[0] = th
+ return loop[0]
+
+
diff --git a/packages/git2/git2/git2/__init__.py b/packages/git2/git2/git2/__init__.py
new file mode 100644
index 00000000..30068c6a
Expand Down

0 comments on commit 818d324

Please sign in to comment.