Skip to content

Conversation

anjaliratnam-msft
Copy link
Collaborator

This addresses the github issue where rm_files is not implemented. This was a simple fix where sync_wrapper(_rm_files) just needed to be set to rm_files. Tests were also added to make sure it works as expected.

adlfs/spec.py Outdated
@@ -1248,7 +1248,7 @@ async def _rm_files(
for file in file_paths:
self.invalidate_cache(self._parent(file))

sync_wrapper(_rm_files)
rm_files = sync_wrapper(_rm_files)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this is something that I missed as well when we went over the original GitHub feature request... Based on the fsspec specification there is no rm_files() shared interface; there's only an rm_file(). The original GitHub issue: #497 is also only requesting for rm_file().

So, it is not appropriate to be setting a sync wrapper like this because it does not appear rm_files to be a shared interface across file systems. Instead, it would probably make sense to add an async _rm_file that is a simplified wrapper over the _rm implementation to implement the feature request.

Even more interesting, it seems there used to be a _rm_file() implementation prior to this PR: #383 and because the underlying AsyncFileSystem mirrors methods, I suspect that adlfs might have actually at one point supported rm_file() and could be a regression. It would be great to confirm if adlfs ever supported rm_file() in a version prior to that PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the previous implementation of rm_file I found, but it looks like it was only ever used by rm and was not callable.

Copy link
Collaborator

@kyleknap kyleknap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's looking better. Just had a couple of follow up suggestions on the direction we take this.

adlfs/spec.py Outdated
if p != "":
await self._rm_files(container_name, [p.rstrip(delimiter)])
else:
await self._rmdir(container_name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So looking more at rm_file it seems like it's sole purpose is to just delete a single file and does not support deleting a directory provided. This seems to be the intention based on other implementation's versions of rm_file as well:

  • s3fs - Only deletes a single object which would be a blob in adlfs
  • local - Uses os.remove() which only handles removing files and not directories.

I think it would make sense to stick with this contract to be consistent, especially if rm_file was not actually never exposed publicly.

adlfs/spec.py Outdated
@@ -1278,6 +1278,31 @@ async def _rm_files(

sync_wrapper(_rm_files)

async def _rm_file(
self, path: typing.Union[str, typing.List[str]], delimiter: str = "/", **kwargs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And on a similar note, I'm not sure if we should be exposing a delimiter for this method since there is not really any recursive nature to this public contract and instead always just use / if we need any splitting logic. Also not supporting a delimiter seems consistent with the other implementations I linked.

Copy link
Collaborator

@kyleknap kyleknap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's looking better. Just had some more suggestions on how to approach this.

adlfs/spec.py Outdated
except Exception as e:
raise RuntimeError("Failed to remove %s for %s", path, e) from e

rm_file = sync_wrapper(_rm_file)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into fsspec core more, I don't think we need to be explicitly setting a sync wrapper here. Specifically, there is some dynamic method setting done here that will take the private _rm_file and convert it to a public synchronous rm_file. So, we should actually be able to remove this line.

adlfs/spec.py Outdated
pass
except Exception as e:
raise RuntimeError("Failed to remove %s for %s", path, e) from e

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we should also invalidate the self.dircache similar to what adlfs does here. Specifically, a cache is maintained for previously retrieved blobs so if we do not do this, we can get unexpected behavior where adlfs will try to retrieve the cached info even though it is deleted. For example:

fs = get_fs()
upload(fs)
# ls method populates the cache
fs.ls(f"{CONTAINER_NAME}/")
fs.rm_file(f"{CONTAINER_NAME}/small.bin")
# Even though the blob is deleted, the cached version will be returned, which is not correct
print("Still cached:", fs.info(f"{CONTAINER_NAME}/small.bin"))

We should make sure we add a test or assertion to make sure that the cache is invalidated.

adlfs/spec.py Outdated
Comment on lines 1295 to 1305
except ResourceNotFoundError:
pass
except FileNotFoundError:
pass
except Exception as e:
raise RuntimeError("Failed to remove %s for %s", path, e) from e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the rm_file() method, I don't think we should be swallowing exceptions as the expectation is that if a single requested file deletion fails, the expectation will be that the error is propagated. This is also the same as other fsspec implementations, (e.g. local and s3fs) it raises any exceptions encountered.

I realize this is just copying what _rm() is doing but _rm() is more designed for bulk deletions so it may be omitting errors to better avoid fast failures and just generally have an interface of not throwing errors for individual parts of a bulk delete.

path: str
File to delete.
"""
container_name, p, _ = self.split_path(path)
Copy link
Collaborator

@kyleknap kyleknap Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant question for you when you have the chance...

When using a version_aware file system and the rm_file, should we be deleting versioned objects if the version is specified in the path (e.g., data/root/a/file.txt?versionid=<some-verson-id>)? Azure Blob is similar to S3 in order to delete a version of the object/blob a version id must be provided to explicitly delete that version snapshot. It did not look like s3fs did this for its rm_file implementation but figured to ask.

Comment on lines 2150 to 2225
path = "data/top_file.txt"

fs.rm_file(path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the happy case, we should consider explicitly uploading the file to ensure that it existed at some point. Otherwise, it is difficult to disambiguate if we are actually deleting the file in the first place especially since we are swallowing file not found exceptions currently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants