Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async wrapper for sync FS #1745

Merged
merged 5 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ jobs:
fail-fast: false
matrix:
PY:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
Expand Down
34 changes: 34 additions & 0 deletions docs/source/async.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,37 @@ available as the attribute ``.loop``.

<script data-goatcounter="https://fsspec.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>

AsyncFileSystemWrapper
----------------------

The `AsyncFileSystemWrapper` class is an experimental feature that allows you to convert
a synchronous filesystem into an asynchronous one. This is useful for quickly integrating
synchronous filesystems into workflows that may expect `AsyncFileSystem` instances.

Basic Usage
~~~~~~~~~~~

To use `AsyncFileSystemWrapper`, wrap any synchronous filesystem to work in an asynchronous context.
In this example, the synchronous `LocalFileSystem` is wrapped, creating an `AsyncFileSystem` instance
backed by the normal, synchronous methods of `LocalFileSystem`:

.. code-block:: python

import asyncio
import fsspec
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper

async def async_copy_file():
sync_fs = fsspec.filesystem('file') # by-default synchronous, local filesystem
async_fs = AsyncFileSystemWrapper(sync_fs)
return await async_fs._copy('/source/file.txt', '/destination/file.txt')

asyncio.run(async_copy_file())

Limitations
-----------

This is experimental. Users should not expect this wrapper to magically make things faster.
It is primarily provided to allow usage of synchronous filesystems with interfaces that expect
`AsyncFileSystem` instances.
96 changes: 96 additions & 0 deletions fsspec/implementations/asyn_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import asyncio
import inspect
import functools
from fsspec.asyn import AsyncFileSystem


def async_wrapper(func, obj=None):
"""
Wraps a synchronous function to make it awaitable.

Parameters
----------
func : callable
The synchronous function to wrap.
obj : object, optional
The instance to bind the function to, if applicable.

Returns
-------
coroutine
An awaitable version of the function.
"""

@functools.wraps(func)
async def wrapper(*args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)

return wrapper


class AsyncFileSystemWrapper(AsyncFileSystem):
martindurant marked this conversation as resolved.
Show resolved Hide resolved
"""
A wrapper class to convert a synchronous filesystem into an asynchronous one.

This class takes an existing synchronous filesystem implementation and wraps all
its methods to provide an asynchronous interface.

Parameters
----------
sync_fs : AbstractFileSystem
The synchronous filesystem instance to wrap.
"""

def __init__(self, sync_fs, *args, **kwargs):
super().__init__(*args, **kwargs)
self.asynchronous = True
self.fs = sync_fs
self._wrap_all_sync_methods()

@property
def fsid(self):
return f"async_{self.fs.fsid}"

def _wrap_all_sync_methods(self):
"""
Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
"""
for method_name in dir(self.fs):
if method_name.startswith("_"):
continue

attr = inspect.getattr_static(self.fs, method_name)
if isinstance(attr, property):
continue

method = getattr(self.fs, method_name)
if callable(method) and not asyncio.iscoroutinefunction(method):
async_method = async_wrapper(method, obj=self)
setattr(self, f"_{method_name}", async_method)

@classmethod
def wrap_class(cls, sync_fs_class):
"""
Create a new class that can be used to instantiate an AsyncFileSystemWrapper
with lazy instantiation of the underlying synchronous filesystem.

Parameters
----------
sync_fs_class : type
The class of the synchronous filesystem to wrap.

Returns
-------
type
A new class that wraps the provided synchronous filesystem class.
"""

class GeneratedAsyncFileSystemWrapper(cls):
def __init__(self, *args, **kwargs):
sync_fs = sync_fs_class(*args, **kwargs)
super().__init__(sync_fs)

GeneratedAsyncFileSystemWrapper.__name__ = (
f"Async{sync_fs_class.__name__}Wrapper"
)
return GeneratedAsyncFileSystemWrapper
142 changes: 142 additions & 0 deletions fsspec/implementations/tests/test_asyn_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import asyncio
import pytest
import os

import fsspec
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
from fsspec.implementations.local import LocalFileSystem
from .test_local import csv_files, filetexts


def test_is_async():
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)
assert async_fs.async_impl


def test_class_wrapper():
fs_cls = LocalFileSystem
async_fs_cls = AsyncFileSystemWrapper.wrap_class(fs_cls)
assert async_fs_cls.__name__ == "AsyncLocalFileSystemWrapper"
async_fs = async_fs_cls()
assert async_fs.async_impl


@pytest.mark.asyncio
async def test_cats():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

result = await async_fs._cat(".test.fakedata.1.csv")
assert result == b"a,b\n1,2\n"

out = set(
(
await async_fs._cat([".test.fakedata.1.csv", ".test.fakedata.2.csv"])
).values()
)
assert out == {b"a,b\n1,2\n", b"a,b\n3,4\n"}

result = await async_fs._cat(".test.fakedata.1.csv", None, None)
assert result == b"a,b\n1,2\n"

result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=6)
assert result == b"a,b\n1,2\n"[1:6]

result = await async_fs._cat(".test.fakedata.1.csv", start=-1)
assert result == b"a,b\n1,2\n"[-1:]

result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=-2)
assert result == b"a,b\n1,2\n"[1:-2]

# test synchronous API is available as expected
result = async_fs.cat(".test.fakedata.1.csv", start=1, end=-2)
assert result == b"a,b\n1,2\n"[1:-2]

out = set(
(
await async_fs._cat(
[".test.fakedata.1.csv", ".test.fakedata.2.csv"], start=1, end=-1
)
).values()
)
assert out == {b"a,b\n1,2\n"[1:-1], b"a,b\n3,4\n"[1:-1]}


@pytest.mark.asyncio
async def test_basic_crud_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

await async_fs._touch(".test.fakedata.3.csv")
assert await async_fs._exists(".test.fakedata.3.csv")

data = await async_fs._cat(".test.fakedata.1.csv")
assert data == b"a,b\n1,2\n"

await async_fs._pipe(".test.fakedata.1.csv", b"a,b\n5,6\n")
data = await async_fs._cat(".test.fakedata.1.csv")
assert data == b"a,b\n5,6\n"

await async_fs._rm(".test.fakedata.1.csv")
assert not await async_fs._exists(".test.fakedata.1.csv")


@pytest.mark.asyncio
async def test_error_handling():
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

with pytest.raises(FileNotFoundError):
await async_fs._cat(".test.non_existent.csv")

with pytest.raises(FileNotFoundError):
await async_fs._rm(".test.non_existent.csv")


@pytest.mark.asyncio
async def test_concurrent_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

async def read_file(file_path):
return await async_fs._cat(file_path)

results = await asyncio.gather(
read_file(".test.fakedata.1.csv"),
read_file(".test.fakedata.2.csv"),
read_file(".test.fakedata.1.csv"),
)

assert results == [b"a,b\n1,2\n", b"a,b\n3,4\n", b"a,b\n1,2\n"]


@pytest.mark.asyncio
async def test_directory_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

await async_fs._makedirs("new_directory")
assert await async_fs._isdir("new_directory")

files = await async_fs._ls(".")
filenames = [os.path.basename(file) for file in files]

assert ".test.fakedata.1.csv" in filenames
assert ".test.fakedata.2.csv" in filenames
assert "new_directory" in filenames


@pytest.mark.asyncio
async def test_batch_operations():
with filetexts(csv_files, mode="b"):
fs = fsspec.filesystem("file")
async_fs = AsyncFileSystemWrapper(fs)

await async_fs._rm([".test.fakedata.1.csv", ".test.fakedata.2.csv"])
assert not await async_fs._exists(".test.fakedata.1.csv")
assert not await async_fs._exists(".test.fakedata.2.csv")
Loading