-
Notifications
You must be signed in to change notification settings - Fork 0
improvements on concurrent scenarios #137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- FSBucket now creates a temp dir in the root dir, and uses it for temp files and locks, and ensures it's not listed in the object listings - atomic FSBucket.put_object() by storing initially in a temp file (in the same root dir), and renaming it atomically - prefix validations - added documentations
ensure the content from base bucket is fetched only once. #10
lock2 = self.lock_manager.get_lock("test_lock2") | ||
self.assertIsNot(lock1, lock2) | ||
|
||
def test_lock_actually_locks(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one seems redundant, since it tests if the existing threading.Lock actually works.
It wouldn't hurt, if it wouldn't take so much time to run.
Be aware - the unit-tests should be lightning fast! Tests running time is also very very important on big projects (for some repositories, inefficient tests costs tens of millions of $)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name2 = "path\\with\\backslashes" | ||
|
||
lock1 = self.lock_manager.get_lock(name1) | ||
lock2 = self.lock_manager.get_lock(name2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a BUG. The get_lock() should raise an exception since the \\
path is invalid!
self._cache = cache | ||
self._main = main | ||
self._lock_manager = lock_manager or namedlock.ThreadLockManager() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say that here's a bug, as using the threading synchronization mecanism would make this class fail when used with a cache used by multiple processes, like our main scenario - using the FileSystem as cache. The bucketbase.cached_immutable_bucket.CachedImmutableBucket.build_from_fs() let it have default value, so this implementation definitely will fail working in real-world scenario
@@ -311,6 +327,8 @@ def put_object(self, name: PurePosixPath | str, content: Union[str, bytes, bytea | |||
:raises io.UnsupportedOperation: If the object already exists | |||
""" | |||
self._lock_object(name) | |||
if self._base_bucket.exists(name): | |||
raise io.UnsupportedOperation(f"Object {name} already exists in AppendOnlySynchronizedBucket") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a BUG, since the Lock will remain opened, as on the previous line the lock is opened, and not closed here.
@@ -328,6 +346,8 @@ def put_object_stream(self, name: PurePosixPath | str, stream: BinaryIO) -> None | |||
:raises IOError: If stream operations fail | |||
""" | |||
self._lock_object(name) | |||
if self._base_bucket.exists(name): | |||
raise io.UnsupportedOperation(f"Object {name} already exists in AppendOnlySynchronizedBucket") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a BUG, since the Lock will remain opened, as on the previous line the lock is opened, and not closed here.
if only_existing: | ||
raise RuntimeError(f"Object {name} is not locked") | ||
# Sanitize name to be a valid filename | ||
safe_name = name.replace('/', '#').replace('\\', '#') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a bug, since the \\
si invalid character in S3 path
@@ -0,0 +1,53 @@ | |||
import threading |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The filename namedlock
doesn't follow the Python naming convention, and the class name. It should be named_lock_manager
from bucketbase.file_lock import FileLockForPath | ||
|
||
|
||
class NamedLockManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the class is abstract, it should derive from ABC
, and it would be cleaner to indicate in the name that it's an abstract class, like BaseNamedLockManager
or INamedLockManager
|
||
class NamedLockManager: | ||
"""Abstract base class for managing locks by name""" | ||
def get_lock(self, name: str, only_existing=False) -> FileLockForPath: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decorate the method with abstractmethod
.
if only_existing: | ||
raise RuntimeError(f"Object {name} is not locked") | ||
# Sanitize name to be a valid filename | ||
safe_name = name.replace('/', '#').replace('\\', '#') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hardcodings
return fetch_and_cache_func(name) | ||
|
||
def get_object(self, name: PurePosixPath | str) -> bytes: | ||
def fetch_and_cache(n): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code smell. Although this would compile and work in Python, avoid using inner functions in production code
Closed in favor of #140 |
(part2/2 of #12 ; to be merged after #136)
FSBucket write to tempdir first and atomic rename -- fixed the get_size issue in #136
fixes issue with main content being fetched more than once when cache in used in concurrent gets
refactored the locking mechanism
added documentations