Skip to content

Commit

Permalink
Asyncio Listing and Inventory Report Integration (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanseaston committed Aug 18, 2023
1 parent 616fdd4 commit 1758134
Show file tree
Hide file tree
Showing 4 changed files with 1,624 additions and 20 deletions.
194 changes: 174 additions & 20 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from . import __version__ as version
from .checkers import get_consistency_checker
from .credentials import GoogleCredentials
from .inventory_report import InventoryReport
from .retry import retry_request, validate_response

logger = logging.getLogger("gcsfs")
Expand Down Expand Up @@ -537,25 +538,32 @@ async def _get_object(self, path):
raise FileNotFoundError(path)
return self._process_object(bucket, res)

async def _list_objects(self, path, prefix="", versions=False):
async def _list_objects(self, path, prefix="", versions=False, **kwargs):

bucket, key, generation = self.split_path(path)
path = path.rstrip("/")

try:
clisting = self._ls_from_cache(path)
hassubdirs = clisting and any(
c["name"].rstrip("/") == path and c["type"] == "directory"
for c in clisting
)
if clisting and not hassubdirs:
return clisting
except FileNotFoundError:
# not finding a bucket in list of "my" buckets is OK
if key:
raise
# NOTE: the inventory report logic is experimental.
inventory_report_info = kwargs.get("inventory_report_info", None)

# Only attempt to list from the cache when the user does not use
# the inventory report service.
if not inventory_report_info:
try:
clisting = self._ls_from_cache(path)
hassubdirs = clisting and any(
c["name"].rstrip("/") == path and c["type"] == "directory"
for c in clisting
)
if clisting and not hassubdirs:
return clisting
except FileNotFoundError:
# not finding a bucket in list of "my" buckets is OK
if key:
raise

items, prefixes = await self._do_list_objects(
path, prefix=prefix, versions=versions
path, prefix=prefix, versions=versions, **kwargs
)

pseudodirs = [
Expand All @@ -574,28 +582,170 @@ async def _list_objects(self, path, prefix="", versions=False):
else:
return []
out = pseudodirs + items
# Don't cache prefixed/partial listings
if not prefix:

use_snapshot_listing = (
False
if not inventory_report_info
else inventory_report_info.get("use_snapshot_listing")
)

# Don't cache prefixed/partial listings, in addition to
# not using the inventory report service to do listing directly.
if not prefix and use_snapshot_listing is False:
self.dircache[path] = out
return out

async def _do_list_objects(
self, path, max_results=None, delimiter="/", prefix="", versions=False
self, path, max_results=None, delimiter="/", prefix="", versions=False, **kwargs
):

"""Object listing for the given {bucket}/{prefix}/ path."""
bucket, _path, generation = self.split_path(path)
_path = "" if not _path else _path.rstrip("/") + "/"
prefix = f"{_path}{prefix}" or None

# Page size of 5000 is officially supported across GCS.
default_page_size = 5000

# NOTE: the inventory report logic is experimental.
inventory_report_info = kwargs.get("inventory_report_info", None)

# Check if the user has configured inventory report option.
if inventory_report_info is not None:
items, prefixes = await InventoryReport.fetch_snapshot(
gcs_file_system=self,
inventory_report_info=inventory_report_info,
prefix=prefix,
)

use_snapshot_listing = inventory_report_info.get("use_snapshot_listing")

# If the user wants to rely on the snapshot from the inventory report
# for listing, directly return the results.
if use_snapshot_listing:
return items, prefixes

# Otherwise, use the snapshot to initiate concurrent listing.
return await self._concurrent_list_objects_helper(
items=items,
bucket=bucket,
delimiter=delimiter,
prefix=prefix,
versions=versions,
generation=generation,
page_size=default_page_size,
)

# If the user has not configured inventory report, proceed to use
# sequential listing.
else:
return await self._sequential_list_objects_helper(
bucket=bucket,
delimiter=delimiter,
start_offset=None,
end_offset=None,
prefix=prefix,
versions=versions,
generation=generation,
page_size=default_page_size,
)

async def _concurrent_list_objects_helper(
self, items, bucket, delimiter, prefix, versions, generation, page_size
):
"""
Lists objects using coroutines, using the object names from the inventory
report to split up the ranges.
"""

# Extract out the names of the objects fetched from the inventory report.
snapshot_object_names = [item["name"] for item in items]
snapshot_object_names = sorted(snapshot_object_names)

# Determine the number of coroutines needed to concurrent listing.
# Ideally, want each coroutine to fetch a single page of objects.
num_coroutines = len(snapshot_object_names) // page_size + 1
num_objects_per_coroutine = len(snapshot_object_names) // num_coroutines

start_offsets = []
end_offsets = []

# Calculate the split splits of each coroutine (start offset and end offset).
for i in range(num_coroutines):
range_start = i * num_objects_per_coroutine
if i == num_coroutines - 1:
range_end = len(snapshot_object_names)
else:
range_end = range_start + num_objects_per_coroutine

if range_start == 0:
prefix_start = None
else:
prefix_start = snapshot_object_names[range_start]

if range_end == len(snapshot_object_names):
prefix_end = None
else:
prefix_end = snapshot_object_names[range_end]

start_offsets.append(prefix_start)
end_offsets.append(prefix_end)

# Assign the coroutine all at once, and wait for them to finish listing.
results = await asyncio.gather(
*[
self._sequential_list_objects_helper(
bucket=bucket,
delimiter=delimiter,
start_offset=start_offsets[i],
end_offset=end_offsets[i],
prefix=prefix,
versions=versions,
generation=generation,
page_size=page_size,
)
for i in range(0, len(start_offsets))
]
)

items = []
prefixes = []

# Concatenate the items and prefixes from each coroutine for final results.
for i in range(len(results)):
items_from_process, prefixes_from_process = results[i]
items.extend(items_from_process)
prefixes.extend(prefixes_from_process)

return items, prefixes

async def _sequential_list_objects_helper(
self,
bucket,
delimiter,
start_offset,
end_offset,
prefix,
versions,
generation,
page_size,
):
"""
Sequential list objects within the start and end offset range.
"""

prefixes = []
items = []

page = await self._call(
"GET",
"b/{}/o",
bucket,
delimiter=delimiter,
prefix=prefix,
maxResults=max_results,
startOffset=start_offset,
endOffset=end_offset,
maxResults=page_size,
json_out=True,
versions="true" if versions or generation else None,
)
Expand All @@ -605,13 +755,16 @@ async def _do_list_objects(
next_page_token = page.get("nextPageToken", None)

while next_page_token is not None:

page = await self._call(
"GET",
"b/{}/o",
bucket,
delimiter=delimiter,
prefix=prefix,
maxResults=max_results,
startOffset=start_offset,
endOffset=end_offset,
maxResults=page_size,
pageToken=next_page_token,
json_out=True,
versions="true" if generation else None,
Expand All @@ -623,6 +776,7 @@ async def _do_list_objects(
next_page_token = page.get("nextPageToken", None)

items = [self._process_object(bucket, i) for i in items]

return items, prefixes

async def _list_buckets(self):
Expand Down Expand Up @@ -856,7 +1010,7 @@ async def _ls(self, path, detail=False, prefix="", versions=False, **kwargs):
else:
out = []
for entry in await self._list_objects(
path, prefix=prefix, versions=versions
path, prefix=prefix, versions=versions, **kwargs
):
if versions and "generation" in entry:
entry = entry.copy()
Expand Down
Loading

0 comments on commit 1758134

Please sign in to comment.