Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions src/filelock/_unix.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import sys
from contextlib import suppress
from errno import ENOSYS
from errno import EAGAIN, ENOSYS, EWOULDBLOCK
from pathlib import Path
from typing import cast

Expand Down Expand Up @@ -42,27 +42,42 @@ def _acquire(self) -> None:
o_nofollow = getattr(os, "O_NOFOLLOW", None)
if o_nofollow is not None:
open_flags |= o_nofollow
if not Path(self.lock_file).exists():
open_flags |= os.O_CREAT
fd = os.open(self.lock_file, open_flags, self._context.mode)
with suppress(PermissionError): # This locked is not owned by this UID
open_flags |= os.O_CREAT
try:
fd = os.open(self.lock_file, open_flags, self._context.mode)
except PermissionError:
# Sticky-bit dirs (e.g. /tmp): O_CREAT fails if the file is owned by another user (#317).
# Fall back to opening the existing file without O_CREAT.
if not Path(self.lock_file).exists():
raise
try:
fd = os.open(self.lock_file, open_flags & ~os.O_CREAT, self._context.mode)
except FileNotFoundError:
return
with suppress(PermissionError): # fchmod fails if the lock file is not owned by this UID
os.fchmod(fd, self._context.mode)
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError as exception:
os.close(fd)
if exception.errno == ENOSYS: # NotImplemented error
if exception.errno == ENOSYS:
msg = "FileSystem does not appear to support flock; use SoftFileLock instead"
raise NotImplementedError(msg) from exception
if exception.errno not in {EAGAIN, EWOULDBLOCK}:
raise
else:
self._context.lock_file_fd = fd
# The file may have been unlinked by a concurrent _release() between our open() and flock().
# A lock on an unlinked inode is useless — discard and let the retry loop start fresh.
if os.fstat(fd).st_nlink == 0:
os.close(fd)
else:
self._context.lock_file_fd = fd

def _release(self) -> None:
# Do not remove the lockfile:
# https://github.com/tox-dev/py-filelock/issues/31
# https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition
fd = cast("int", self._context.lock_file_fd)
self._context.lock_file_fd = None
with suppress(OSError):
Path(self.lock_file).unlink()
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)

Expand Down
138 changes: 138 additions & 0 deletions tests/test_filelock.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,3 +836,141 @@ def test_mtime_zero_exit_branch(

with pytest.raises(expected_exc):
lock.acquire(timeout=0)


@pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock])
def test_lock_file_removed_after_release(tmp_path: Path, lock_type: type[BaseFileLock]) -> None:
lock_path = tmp_path / "test.lock"
lock = lock_type(str(lock_path))
with lock:
assert lock_path.exists()
assert not lock_path.exists()


@pytest.mark.skipif(sys.platform == "win32", reason="Unix flock semantics")
def test_concurrent_acquire_release_removes_lock_file(tmp_path: Path) -> None:
lock_path = tmp_path / "test.lock"
errors: list[Exception] = []

def worker() -> None:
try:
for _ in range(20):
lock = FileLock(str(lock_path), is_singleton=False)
with lock:
pass
except Exception as exc:
errors.append(exc)

threads = [threading.Thread(target=worker) for _ in range(4)]
for thread in threads:
thread.start()
for thread in threads:
thread.join(timeout=30)
assert not errors, errors
assert not lock_path.exists()


@pytest.mark.skipif(sys.platform == "win32", reason="Unix flock semantics")
def test_lock_acquired_after_release_unlinks(tmp_path: Path) -> None:
lock_path = tmp_path / "test.lock"
first = FileLock(str(lock_path), is_singleton=False)
second = FileLock(str(lock_path), is_singleton=False)

first.acquire()
assert lock_path.exists()
first.release()
assert not lock_path.exists()

second.acquire()
assert lock_path.exists()
second.release()
assert not lock_path.exists()


@pytest.mark.skipif(sys.platform == "win32", reason="Unix flock semantics")
def test_stale_inode_retry_on_unlinked_lock(tmp_path: Path, mocker: MockerFixture) -> None:
lock_path = tmp_path / "test.lock"
lock = FileLock(str(lock_path), is_singleton=False)

real_fstat = os.fstat
call_count = 0

def fstat_unlinked_once(fd: int) -> os.stat_result:
nonlocal call_count
call_count += 1
if call_count == 1:
Path(lock_path).unlink()
return real_fstat(fd)
return real_fstat(fd)

mocker.patch("os.fstat", side_effect=fstat_unlinked_once)
lock.acquire()
assert lock.is_locked
assert call_count == 2
lock.release()


@pytest.mark.skipif(sys.platform == "win32", reason="Unix flock semantics")
def test_permission_error_fallback_without_o_creat(tmp_path: Path, mocker: MockerFixture) -> None:
lock_path = tmp_path / "test.lock"
lock_path.touch()
lock = FileLock(str(lock_path), is_singleton=False)

real_open = os.open
call_count = 0

def open_no_creat(path: str, flags: int, mode: int = 0o777, *, dir_fd: int | None = None) -> int:
nonlocal call_count
call_count += 1
if call_count == 1 and flags & os.O_CREAT:
raise PermissionError(13, "Permission denied", path)
return real_open(path, flags, mode) if dir_fd is None else real_open(path, flags, mode, dir_fd=dir_fd)

mocker.patch("os.open", side_effect=open_no_creat)
lock.acquire()
assert lock.is_locked
assert call_count == 2
lock.release()


@pytest.mark.skipif(sys.platform == "win32", reason="Unix flock semantics")
def test_permission_error_propagates_when_file_missing(tmp_path: Path, mocker: MockerFixture) -> None:
lock_path = tmp_path / "test.lock"
lock = FileLock(str(lock_path), is_singleton=False)

real_open = os.open

def open_always_permission_error(path: str, flags: int, mode: int = 0o777, *, dir_fd: int | None = None) -> int:
if "test.lock" in path:
raise PermissionError(13, "Permission denied", path)
return real_open(path, flags, mode) if dir_fd is None else real_open(path, flags, mode, dir_fd=dir_fd)

mocker.patch("os.open", side_effect=open_always_permission_error)
with pytest.raises(PermissionError, match="Permission denied"):
lock.acquire(timeout=0)


@pytest.mark.skipif(sys.platform == "win32", reason="Unix flock semantics")
def test_sticky_bit_fallback_handles_concurrent_unlink(tmp_path: Path, mocker: MockerFixture) -> None:
lock_path = tmp_path / "test.lock"
lock_path.touch()
lock = FileLock(str(lock_path), is_singleton=False)

real_open = os.open
call_count = 0

def open_permission_then_unlink(path: str, flags: int, mode: int = 0o777, *, dir_fd: int | None = None) -> int:
nonlocal call_count
call_count += 1
if call_count == 1 and flags & os.O_CREAT and "test.lock" in path:
raise PermissionError(13, "Permission denied", path)
if call_count == 2 and not (flags & os.O_CREAT) and "test.lock" in path:
lock_path.unlink(missing_ok=True)
raise FileNotFoundError(2, "No such file or directory", path)
return real_open(path, flags, mode) if dir_fd is None else real_open(path, flags, mode, dir_fd=dir_fd)

mocker.patch("os.open", side_effect=open_permission_then_unlink)
lock.acquire()
assert lock.is_locked
assert call_count == 3
lock.release()