Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncio Listing and Inventory Report Integration #573

Merged
merged 18 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 174 additions & 20 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from . import __version__ as version
from .checkers import get_consistency_checker
from .credentials import GoogleCredentials
from .inventory_report import InventoryReport
from .retry import retry_request, validate_response

logger = logging.getLogger("gcsfs")
Expand Down Expand Up @@ -537,25 +538,32 @@ async def _get_object(self, path):
raise FileNotFoundError(path)
return self._process_object(bucket, res)

async def _list_objects(self, path, prefix="", versions=False):
async def _list_objects(self, path, prefix="", versions=False, **kwargs):

bucket, key, generation = self.split_path(path)
path = path.rstrip("/")

try:
clisting = self._ls_from_cache(path)
hassubdirs = clisting and any(
c["name"].rstrip("/") == path and c["type"] == "directory"
for c in clisting
)
if clisting and not hassubdirs:
return clisting
except FileNotFoundError:
# not finding a bucket in list of "my" buckets is OK
if key:
raise
# NOTE: the inventory report logic is experimental.
inventory_report_info = kwargs.get("inventory_report_info", None)
martindurant marked this conversation as resolved.
Show resolved Hide resolved

# Only attempt to list from the cache when the user does not use
martindurant marked this conversation as resolved.
Show resolved Hide resolved
# the inventory report service.
if not inventory_report_info:
try:
clisting = self._ls_from_cache(path)
hassubdirs = clisting and any(
c["name"].rstrip("/") == path and c["type"] == "directory"
for c in clisting
)
if clisting and not hassubdirs:
return clisting
except FileNotFoundError:
# not finding a bucket in list of "my" buckets is OK
if key:
raise

items, prefixes = await self._do_list_objects(
path, prefix=prefix, versions=versions
path, prefix=prefix, versions=versions, **kwargs
)

pseudodirs = [
Expand All @@ -574,28 +582,170 @@ async def _list_objects(self, path, prefix="", versions=False):
else:
return []
out = pseudodirs + items
# Don't cache prefixed/partial listings
if not prefix:

use_snapshot_listing = (
False
if not inventory_report_info
else inventory_report_info.get("use_snapshot_listing")
hanseaston marked this conversation as resolved.
Show resolved Hide resolved
)

# Don't cache prefixed/partial listings, in addition to
# not using the inventory report service to do listing directly.
if not prefix and use_snapshot_listing is False:
martindurant marked this conversation as resolved.
Show resolved Hide resolved
self.dircache[path] = out
return out

async def _do_list_objects(
self, path, max_results=None, delimiter="/", prefix="", versions=False
self, path, max_results=None, delimiter="/", prefix="", versions=False, **kwargs
):

"""Object listing for the given {bucket}/{prefix}/ path."""
bucket, _path, generation = self.split_path(path)
_path = "" if not _path else _path.rstrip("/") + "/"
prefix = f"{_path}{prefix}" or None

# Page size of 5000 is officially supported across GCS.
hanseaston marked this conversation as resolved.
Show resolved Hide resolved
default_page_size = 5000

# NOTE: the inventory report logic is experimental.
inventory_report_info = kwargs.get("inventory_report_info", None)

# Check if the user has configured inventory report option.
if inventory_report_info is not None:
items, prefixes = await InventoryReport.fetch_snapshot(
gcs_file_system=self,
inventory_report_info=inventory_report_info,
prefix=prefix,
)

use_snapshot_listing = inventory_report_info.get("use_snapshot_listing")

# If the user wants to rely on the snapshot from the inventory report
# for listing, directly return the results.
if use_snapshot_listing:
martindurant marked this conversation as resolved.
Show resolved Hide resolved
return items, prefixes

# Otherwise, use the snapshot to initiate concurrent listing.
return await self._concurrent_list_objects_helper(
items=items,
bucket=bucket,
delimiter=delimiter,
prefix=prefix,
versions=versions,
generation=generation,
page_size=default_page_size,
)

# If the user has not configured inventory report, proceed to use
# sequential listing.
else:
return await self._sequential_list_objects_helper(
bucket=bucket,
delimiter=delimiter,
start_offset=None,
end_offset=None,
prefix=prefix,
versions=versions,
generation=generation,
page_size=default_page_size,
)

async def _concurrent_list_objects_helper(
self, items, bucket, delimiter, prefix, versions, generation, page_size
):
"""
Lists objects using coroutines, using the object names from the inventory
report to split up the ranges.
"""

# Extract out the names of the objects fetched from the inventory report.
snapshot_object_names = [item["name"] for item in items]
snapshot_object_names = sorted(snapshot_object_names)
martindurant marked this conversation as resolved.
Show resolved Hide resolved

# Determine the number of coroutines needed to concurrent listing.
# Ideally, want each coroutine to fetch a single page of objects.
num_coroutines = len(snapshot_object_names) // page_size + 1
num_objects_per_coroutine = len(snapshot_object_names) // num_coroutines

start_offsets = []
end_offsets = []

# Calculate the split splits of each coroutine (start offset and end offset).
for i in range(num_coroutines):
martindurant marked this conversation as resolved.
Show resolved Hide resolved
range_start = i * num_objects_per_coroutine
if i == num_coroutines - 1:
range_end = len(snapshot_object_names)
else:
range_end = range_start + num_objects_per_coroutine

if range_start == 0:
prefix_start = None
else:
prefix_start = snapshot_object_names[range_start]

if range_end == len(snapshot_object_names):
prefix_end = None
else:
prefix_end = snapshot_object_names[range_end]

start_offsets.append(prefix_start)
end_offsets.append(prefix_end)

# Assign the coroutine all at once, and wait for them to finish listing.
results = await asyncio.gather(
*[
self._sequential_list_objects_helper(
bucket=bucket,
delimiter=delimiter,
start_offset=start_offsets[i],
end_offset=end_offsets[i],
prefix=prefix,
versions=versions,
generation=generation,
page_size=page_size,
)
for i in range(0, len(start_offsets))
]
)

items = []
prefixes = []

# Concatenate the items and prefixes from each coroutine for final results.
for i in range(len(results)):
items_from_process, prefixes_from_process = results[i]
items.extend(items_from_process)
prefixes.extend(prefixes_from_process)

return items, prefixes

async def _sequential_list_objects_helper(
self,
bucket,
delimiter,
start_offset,
end_offset,
prefix,
versions,
generation,
page_size,
):
"""
Sequential list objects within the start and end offset range.
"""

prefixes = []
items = []

page = await self._call(
"GET",
"b/{}/o",
bucket,
delimiter=delimiter,
prefix=prefix,
maxResults=max_results,
startOffset=start_offset,
endOffset=end_offset,
maxResults=page_size,
json_out=True,
versions="true" if versions or generation else None,
)
Expand All @@ -605,13 +755,16 @@ async def _do_list_objects(
next_page_token = page.get("nextPageToken", None)

while next_page_token is not None:

page = await self._call(
"GET",
"b/{}/o",
bucket,
delimiter=delimiter,
prefix=prefix,
maxResults=max_results,
startOffset=start_offset,
endOffset=end_offset,
maxResults=page_size,
pageToken=next_page_token,
json_out=True,
versions="true" if generation else None,
Expand All @@ -623,6 +776,7 @@ async def _do_list_objects(
next_page_token = page.get("nextPageToken", None)

items = [self._process_object(bucket, i) for i in items]

return items, prefixes

async def _list_buckets(self):
Expand Down Expand Up @@ -856,7 +1010,7 @@ async def _ls(self, path, detail=False, prefix="", versions=False, **kwargs):
else:
out = []
for entry in await self._list_objects(
path, prefix=prefix, versions=versions
path, prefix=prefix, versions=versions, **kwargs
):
if versions and "generation" in entry:
entry = entry.copy()
Expand Down
Loading
Loading