Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry bulk rm #608

Merged
merged 9 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 53 additions & 33 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .checkers import get_consistency_checker
from .credentials import GoogleCredentials
from .inventory_report import InventoryReport
from .retry import retry_request, validate_response
from .retry import errs, retry_request, validate_response

logger = logging.getLogger("gcsfs")

Expand Down Expand Up @@ -1168,6 +1168,8 @@ async def _rm_file(self, path, **kwargs):
await self._rmdir(path)

async def _rm_files(self, paths):
import random

template = (
"\n--===============7330845974216740156==\n"
"Content-Type: application/http\n"
Expand All @@ -1178,46 +1180,64 @@ async def _rm_files(self, paths):
"accept: application/json\ncontent-length: 0\n"
)
errors = []
success = []
# Splitting requests into 100 chunk batches
# See https://cloud.google.com/storage/docs/batch
for chunk in _chunks(paths, 100):
parts = []
for i, p in enumerate(chunk):
bucket, key, generation = self.split_path(p)
query = f"?generation={generation}" if generation else ""
parts.append(
template.format(
i=i + 1,
bucket=quote(bucket),
key=quote(key),
query=query,
for retry in range(1, 6):
for chunk in _chunks(paths, 100):
parts = []
for i, p in enumerate(chunk):
bucket, key, generation = self.split_path(p)
query = f"?generation={generation}" if generation else ""
parts.append(
template.format(
i=i + 1,
bucket=quote(bucket),
key=quote(key),
query=query,
)
)
body = "".join(parts)
headers, content = await self._call(
"POST",
f"{self._location}/batch/storage/v1",
headers={
"Content-Type": 'multipart/mixed; boundary="=========='
'=====7330845974216740156=="'
},
data=body + "\n--===============7330845974216740156==--",
)
body = "".join(parts)
headers, content = await self._call(
"POST",
f"{self._location}/batch/storage/v1",
headers={
"Content-Type": 'multipart/mixed; boundary="=========='
'=====7330845974216740156=="'
},
data=body + "\n--===============7330845974216740156==--",
)

boundary = headers["Content-Type"].split("=", 1)[1]
parents = [self._parent(p) for p in paths]
[self.invalidate_cache(parent) for parent in parents + list(paths)]
txt = content.decode()
if any(
not ("200 OK" in c or "204 No Content" in c)
for c in txt.split(boundary)[1:-1]
):
pattern = '"message": "([^"]+)"'
out = set(re.findall(pattern, txt))
errors.extend(out)
boundary = headers["Content-Type"].split("=", 1)[1]
parents = [self._parent(p) for p in paths]
[self.invalidate_cache(parent) for parent in parents + list(paths)]
txt = content.decode()
responses = txt.split(boundary)[1:-1]
remaining = []
for path, response in zip(paths, responses):
m = re.search("HTTP/[0-9.]+ ([0-9]+)", response)
code = int(m.groups()[0]) if m else None
if code in [200, 204]:
success.append(path)
elif code in errs:
remaining.append(path)
else:
msg = re.search("({.*})", response.replace("\n", ""))
if msg:
errors.append(json.loads(msg.groups()[0])["error"])
else:
errors.append((path, code))
if errors:
break
if remaining:
paths = remaining
await asyncio.sleep(min(random.random() + 2 ** (retry - 1), 32))
else:
break

if errors:
raise OSError(errors)
return success

@property
def on_google(self):
Expand Down
17 changes: 10 additions & 7 deletions gcsfs/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,18 @@ class ChecksumError(Exception):
)


errs = list(range(500, 505)) + [
# Request Timeout
408,
# Too Many Requests
429,
]
errs += [str(e) for e in errs]


def is_retriable(exception):
"""Returns True if this exception is retriable."""
errs = list(range(500, 505)) + [
# Request Timeout
408,
# Too Many Requests
429,
]
errs += [str(e) for e in errs]

if isinstance(exception, HttpError):
return exception.code in errs

Expand Down
Loading