-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathSqliteLock.py
More file actions
166 lines (138 loc) · 6.53 KB
/
SqliteLock.py
File metadata and controls
166 lines (138 loc) · 6.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
SqliteLock.py
Provides a robust, distributed lock implementation using a SQLite backend via the
diskcache library. This is designed for safe multi-process and multi-threaded
coordination.
"""
# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import os
import sqlite3
import threading
import time
import uuid
from abc import ABC, abstractmethod
# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import diskcache
from filelock import Timeout # Note: filelock.Timeout is the exception class
# =============================================================================
# LOCAL IMPORTS
# =============================================================================
from models import g_logger, LockBase
# =============================================================================
# SQLITE-BASED LOCK IMPLEMENTATION
# =============================================================================
class DiskcacheSqliteLock(LockBase):
"""
A distributed lock using a diskcache.Cache instance (with a SQLite backend).
This lock is designed for multi-process and multi-threaded environments. It uses
atomic transactions, ownership verification, and lock expiry to ensure safety.
"""
def __init__(self, lock_name, cache_instance, owner_prefix=None):
"""
Initializes the lock instance.
Args:
lock_name (str): A unique name for the lock.
cache_instance (diskcache.Cache): The diskcache instance to use for storage.
owner_prefix (str, optional): A prefix for the owner ID. If None, it is
auto-generated from the process and thread IDs.
"""
self.cache = cache_instance
self.lock_key = f"lock::{lock_name}"
if owner_prefix is None:
owner_prefix = f"pid{os.getpid()}_tid{threading.get_ident()}"
self.owner_id = f"{owner_prefix}_{uuid.uuid4()}"
self._locked = False
self._lock_expiry = 0
def acquire(self, timeout_seconds=60, wait=False):
"""
Tries to acquire the lock, with an option to wait.
Args:
timeout_seconds (int): The duration (in seconds) the lock is held before it expires.
Also used as the maximum wait time if `wait` is True.
wait (bool): If True, the method will block and wait until the lock is acquired
or the timeout is reached. If False, it returns immediately.
Returns:
bool: True if the lock was acquired, False otherwise.
"""
if self._locked and time.monotonic() < self._lock_expiry:
return True # Already hold a valid lock
start_time = time.monotonic()
while True:
if self._attempt_acquire(timeout_seconds):
return True
if not wait or (time.monotonic() - start_time) > timeout_seconds:
return False
# Exponential backoff with jitter to prevent thundering herd problem
sleep_time = min(0.1 * (2 ** (time.monotonic() - start_time)), 1.0) + (uuid.uuid4().int % 100 / 1000)
time.sleep(sleep_time)
def _attempt_acquire(self, timeout_seconds):
"""Internal method that makes a single attempt to acquire the lock."""
try:
with self.cache.transact():
now = time.monotonic()
current_value = self.cache.get(self.lock_key)
# If a lock exists and is still valid, we cannot acquire it.
if current_value is not None:
_, expiry_time = current_value
if now < expiry_time:
return False
# Set our lock with an expiry time.
expiry = now + timeout_seconds
self.cache.set(self.lock_key, (self.owner_id, expiry), expire=timeout_seconds + 5)
# Verify that we actually acquired the lock.
final_value = self.cache.get(self.lock_key)
if final_value and final_value[0] == self.owner_id:
self._locked = True
self._lock_expiry = expiry
return True
return False # Lost the race
except (diskcache.Timeout, Timeout) as e:
g_logger.warning(f"Timeout error while acquiring lock '{self.lock_key}': {e}")
except (sqlite3.Error, OSError) as e:
g_logger.error(f"Unexpected database or I/O error acquiring lock '{self.lock_key}': {e}")
self._locked = False
return False
def release(self) -> bool:
"""
Releases the lock if it is currently held by this instance.
Returns:
bool: True if the lock was successfully released, False otherwise.
"""
if not self.locked():
return False
try:
with self.cache.transact():
# Verify ownership before deleting to prevent releasing a lock acquired by another process.
current_value = self.cache.get(self.lock_key)
if current_value and current_value[0] == self.owner_id:
self.cache.delete(self.lock_key)
self._locked = False
self._lock_expiry = 0
return True
return False # Lock was not ours
except (sqlite3.Error, OSError) as e:
g_logger.error(f"Database or I/O error releasing lock '{self.lock_key}': {e}")
return False
def __enter__(self):
"""Acquires the lock when entering a `with` block."""
if not self.acquire(wait=True):
raise TimeoutError(f"Could not acquire lock '{self.lock_key}' within the timeout period.")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Releases the lock when exiting a `with` block."""
self.release()
def locked(self) -> bool:
"""Checks if the lock is currently held by this instance and is not expired."""
if not self._locked:
return False
if time.monotonic() >= self._lock_expiry:
self._locked = False # Our lock has expired
return False
return True
def renew(self, timeout_seconds):
"""Renews the lock with a new timeout."""
pass