Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"cachetools >=5.3,<6",
"fsspec >=2024.3.1",
"igwn-auth-utils",
"pywinpty; platform_system=='Windows'",
]

[project.entry-points."fsspec.specs"]
Expand Down
68 changes: 49 additions & 19 deletions src/pelicanfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,8 @@
import aiohttp
import cachetools
import fsspec.implementations.http as fshttp
from aiowebdav2.client import (
Client,
ClientOptions,
)
from aiowebdav2.exceptions import (
RemoteResourceNotFoundError,
ResponseErrorCodeError,
)
from aiowebdav2.client import Client, ClientOptions
from aiowebdav2.exceptions import RemoteResourceNotFoundError, ResponseErrorCodeError
from fsspec.asyn import AsyncFileSystem, sync
from fsspec.utils import glob_translate

Expand Down Expand Up @@ -201,6 +195,7 @@ async def get_webdav_client(options):

def sync_generator(async_gen_func, obj=None):
"""Wrap an async generator method into a sync generator."""

@functools.wraps(async_gen_func)
def wrapper(*args, **kwargs):
if obj:
Expand Down Expand Up @@ -244,6 +239,9 @@ def __init__(
asynchronous=False,
loop=None,
get_webdav_client=get_webdav_client,
oidc_timeout_seconds=300,
pty_buffer_size=1024,
select_timeout=0.1,
**kwargs,
):
super().__init__(self, asynchronous=asynchronous, loop=loop, **kwargs)
Expand All @@ -269,6 +267,11 @@ def __init__(
self.direct_reads = direct_reads
self.preferred_caches = preferred_caches

# OIDC device flow configuration
self.oidc_timeout_seconds = oidc_timeout_seconds
self.pty_buffer_size = pty_buffer_size
self.select_timeout = select_timeout

# These are all not implemented in the http fsspec and as such are not implemented in the pelican fsspec
# They will raise NotImplementedErrors when called
self._rm_file = self.http_file_system._rm_file
Expand Down Expand Up @@ -389,7 +392,7 @@ def _set_http_filesystem_token(self, token: str, session=None) -> None:
if session:
session.headers["Authorization"] = f"Bearer {token}"

def _handle_token_generation(self, url: str, director_response: DirectorResponse, operation: TokenOperation) -> str:
async def _handle_token_generation(self, url: str, director_response: DirectorResponse, operation: TokenOperation) -> str:
"""
Handle token generation if required by the director response.

Expand All @@ -415,8 +418,34 @@ def _handle_token_generation(self, url: str, director_response: DirectorResponse
return existing_token

try:
# Create token generator
token_generator = TokenGenerator(destination_url=url, dir_resp=director_response, operation=operation)
# Ensure director URL is set (for token generation validation)
await self._set_director_url()

# Construct pelican:// URL for OIDC device flow
pelican_url = None
if self.discovery_url and url:
# Extract federation host from discovery URL
parsed_discovery = urllib.parse.urlparse(self.discovery_url)
federation_host = parsed_discovery.netloc

# Extract path from the data URL
parsed_url = urllib.parse.urlparse(url)
path = parsed_url.path

# Construct pelican://<federation-host>/<path>
pelican_url = f"pelican://{federation_host}{path}"
logger.debug(f"Constructed pelican URL for token generation: {pelican_url}")

# Create token generator with OIDC configuration
token_generator = TokenGenerator(
destination_url=url,
dir_resp=director_response,
operation=operation,
pelican_url=pelican_url,
oidc_timeout_seconds=self.oidc_timeout_seconds,
pty_buffer_size=self.pty_buffer_size,
select_timeout=self.select_timeout,
)

# Get token (TokenContentIterator will automatically discover token location)
token = token_generator.get_token()
Expand Down Expand Up @@ -540,7 +569,7 @@ async def get_working_cache(self, fileloc: str) -> Tuple[str, DirectorResponse]:
# Handle token generation for cache requests if required
if director_response.x_pel_ns_hdr and director_response.x_pel_ns_hdr.require_token:
operation = self._get_token_operation("get_working_cache")
self._handle_token_generation(updated_url, director_response, operation)
await self._handle_token_generation(updated_url, director_response, operation)

# Set token in session headers if we have one (either existing or newly generated)
token_to_use = self._get_token()
Expand Down Expand Up @@ -697,7 +726,7 @@ async def wrapper(self, *args, **kwargs):

# Handle token generation if required
operation = self._get_token_operation(func.__name__)
self._handle_token_generation(data_url, director_response, operation)
await self._handle_token_generation(data_url, director_response, operation)

logger.debug(f"Running {func} with url: {data_url}")
return await func(self, data_url, *args[1:], **kwargs)
Expand Down Expand Up @@ -763,6 +792,7 @@ async def _ls_real(self, url, detail=True, client=None):
raise FileNotFoundError

if detail:

def get_item_detail(item):
full_path = f"{base_url}{item['path']}"
isdir = item.get("isdir") == "True"
Expand Down Expand Up @@ -951,7 +981,7 @@ async def _put_file(self, lpath, rpath, **kwargs):
data_url, director_response = await self.get_origin_url(path)

operation = self._get_token_operation("put_file")
self._handle_token_generation(data_url, director_response, operation)
await self._handle_token_generation(data_url, director_response, operation)

logger.debug(f"Running put_file from {lpath} to {data_url}...")

Expand All @@ -969,7 +999,7 @@ def open(self, path, mode, **kwargs):

# Handle token generation if required
operation = self._get_token_operation("open")
self._handle_token_generation(data_url, director_response, operation)
sync(self.loop, self._handle_token_generation, data_url, director_response, operation)

logger.debug(f"Running open on {data_url}...")
fp = self.http_file_system.open(data_url, mode, **kwargs)
Expand All @@ -988,7 +1018,7 @@ async def open_async(self, path, **kwargs):

# Handle token generation if required
operation = self._get_token_operation("open_async")
self._handle_token_generation(data_url, director_response, operation)
await self._handle_token_generation(data_url, director_response, operation)

logger.debug(f"Running open_async on {data_url}...")
fp = await self.http_file_system.open_async(data_url, **kwargs)
Expand Down Expand Up @@ -1018,7 +1048,7 @@ async def wrapper(self, *args, **kwargs):

# Handle token generation if required
operation = self._get_token_operation(func.__name__)
self._handle_token_generation(data_url, director_response, operation)
await self._handle_token_generation(data_url, director_response, operation)

try:
logger.debug(f"Calling {func} using the following url: {data_url}")
Expand Down Expand Up @@ -1055,7 +1085,7 @@ async def wrapper(self, *args, **kwargs):

# Handle token generation if required (single path)
operation = self._get_token_operation(func.__name__)
self._handle_token_generation(data_url, director_response, operation)
await self._handle_token_generation(data_url, director_response, operation)
else:
data_url = []
# For multiple paths, we'll use the first director_response for token generation
Expand All @@ -1075,7 +1105,7 @@ async def wrapper(self, *args, **kwargs):
if first_director_response:
operation = self._get_token_operation(func.__name__)
# Use the first URL for token generation (simplification)
self._handle_token_generation(data_url[0] if data_url else "", first_director_response, operation)
await self._handle_token_generation(data_url[0] if data_url else "", first_director_response, operation)

try:
logger.debug(f"Calling {func} using the following urls: {data_url}")
Expand Down
Loading