-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathObjectStorageLock.py
More file actions
502 lines (429 loc) · 20.5 KB
/
ObjectStorageLock.py
File metadata and controls
502 lines (429 loc) · 20.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
"""
ObjectStorageLock.py
This module implements a distributed lock using object storage (via libcloud).
The lock is designed for cross-process and cross-thread synchronization in Python,
providing a reliable mechanism for distributed coordination.
Key Features:
- Uses S3-compatible object storage as a central coordination point.
- Supports blocking and non-blocking lock acquisition with exponential backoff.
- Provides context manager interface for Pythonic usage.
- Handles lock expiration and automatic cleanup of stale locks.
- Uses unique owner IDs combining process ID, thread ID, and UUID.
- Implements atomic operations with fencing tokens for safety.
- Supports thread-local reentrant locking with lock depth tracking.
- Includes retry logic for transient S3 errors.
- Allows custom metadata for debugging and integration.
- Supports lock renewal for long-running tasks.
Edge Cases Handled:
- **Eventual Consistency**: Ownership checks and fencing tokens mitigate S3 consistency issues.
- **Network Failures**: Retries with exponential backoff handle transient errors.
- **Clock Skew**: Expiry times are based on monotonic clocks to avoid skew.
- **Stale Locks**: Automatic cleanup of expired locks prevents deadlocks.
- **Concurrent Access**: Fencing tokens ensure only one instance holds the lock.
Usage Example:
```python
from ObjectStorageLock import ObjectStorageLock
# Create a lock with custom metadata
lock = ObjectStorageLock("my-lock", metadata={"app": "my-app"})
# Use as context manager
with lock:
# Critical section
print("Lock acquired")
# Non-blocking acquisition
if lock.acquire(wait=False):
try:
print("Lock acquired")
finally:
lock.release()
# Renew lock for long-running tasks
with lock:
lock.renew(60) # Extend lock for 60 seconds
```
Dependencies:
- libcloud: For object storage operations.
- tenacity: For retrying transient errors.
- ujson (optional): For faster JSON serialization.
Optimizations for Linode Object Storage:
- This module implements distributed locking using Linode's S3-compatible Object Storage via Apache Libcloud, ensuring reliable coordination across servers.
- Linode's eventual consistency is mitigated with fencing tokens and retry logic (using Tenacity) to prevent race conditions during lock acquisition and release.
- Error handling addresses transient issues like network failures and object non-existence, with exponential backoff to enhance reliability in distributed environments.
- Lock operations are optimized for efficiency, leveraging Linode's metadata support for tracking lock states and expirations, while considering regional storage to minimize latency.
- The design includes reentrant locking and automatic stale lock cleanup to handle common distributed system challenges on Linode.
"""
import os
import threading
import time
import uuid
from io import BytesIO
import hashlib
import pickle
# Import from config module
import object_storage_config as oss_config
import object_storage_sync
from shared import g_logger
from object_storage_sync import (
publish_file,
fetch_file,
publish_bytes,
smart_fetch
)
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from models import LockBase
DEFAULT_RETRY_INTERVAL = oss_config.DEFAULT_RETRY_INTERVAL
MIN_RETRY_INTERVAL = oss_config.MIN_RETRY_INTERVAL
MAX_RETRY_INTERVAL = oss_config.MAX_RETRY_INTERVAL
MAX_RETRY_ATTEMPTS = oss_config.MAX_RETRY_ATTEMPTS
RETRY_MULTIPLIER = oss_config.RETRY_MULTIPLIER
class ObjectStorageLock(LockBase):
"""
A distributed lock implementation using S3-compatible object storage (via libcloud).
Supports thread-local reentrant locking, exponential backoff, and lock renewal.
Args:
lock_name: Unique name for the lock.
owner_prefix: Optional prefix for the owner ID (default: pid + thread ID).
metadata: Optional dictionary of custom metadata (e.g., {"app": "my-app"}).
retry_interval: Base interval (seconds) for acquisition retries.
Raises:
ConfigurationError: If object storage is not available or misconfigured.
"""
def __init__(
self,
lock_name,
owner_prefix=None,
metadata=None,
retry_interval=DEFAULT_RETRY_INTERVAL
):
if not oss_config.LIBCLOUD_AVAILABLE or not oss_config.STORAGE_ENABLED:
raise oss_config.ConfigurationError("Object storage is not available or not enabled")
if not oss_config.init_storage():
raise oss_config.ConfigurationError("Failed to initialize object storage")
self.lock_name = lock_name
self.lock_key = f"lock::{lock_name}"
self.lock_object_name = self._get_object_name(self.lock_key)
if owner_prefix is None:
owner_prefix = f"pid{os.getpid()}_tid{threading.get_ident()}"
self.owner_id = f"{owner_prefix}_{uuid.uuid4()}"
self._thread_local = threading.local()
self._thread_local.lock_count = 0
self.retry_interval = max(MIN_RETRY_INTERVAL, retry_interval)
self.metadata = metadata or {}
# Validate metadata size (S3 limit: 2KB)
metadata_bytes = pickle.dumps(self.metadata)
if len(metadata_bytes) > 2048:
raise ValueError("Metadata exceeds 2KB limit")
self._fencing_token = 0
def _get_object_name(self, key):
return oss_config.generate_object_name(key, prefix="lock")
def acquire(self, timeout_seconds=60, wait=False):
"""
Try to acquire the lock with exponential backoff.
Args:
timeout_seconds: Lock duration before expiration.
wait: If True, wait until lock is acquired or timeout expires.
Returns:
True if lock was acquired, False otherwise.
Raises:
TimeoutError: If wait=True and lock cannot be acquired within timeout.
"""
if self._thread_local.lock_count > 0:
self._thread_local.lock_count += 1
return True
start_time = time.monotonic()
attempt = 0
while True:
acquired = self._attempt_acquire(timeout_seconds)
if acquired:
self._thread_local.lock_count = 1
return True
if not wait:
return False
elapsed = time.monotonic() - start_time
if elapsed > timeout_seconds:
g_logger.warning(f"Timeout acquiring lock {self.lock_key} for owner {self.owner_id} after {elapsed:.2f}s")
return False
# Exponential backoff using global constants
delay = min(MAX_RETRY_INTERVAL, self.retry_interval * (2 ** attempt))
time.sleep(delay)
attempt += 1
@retry(
stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
wait=wait_exponential(multiplier=RETRY_MULTIPLIER, max=MAX_RETRY_INTERVAL),
retry=retry_if_exception_type((oss_config.StorageOperationError, oss_config.LibcloudError)),
reraise=True
)
def _attempt_acquire(self, timeout_seconds):
"""Attempt to acquire the lock once with fencing token."""
try:
now = time.monotonic()
current_lock = self._get_lock_info()
# Check for valid existing lock
if current_lock is not None:
expiry = current_lock.get('expiry')
if expiry is not None and now < expiry:
return False # Lock is still valid
# Increment fencing token
self._fencing_token += 1
expiry = now + timeout_seconds
lock_data = {
'owner_id': self.owner_id,
'expiry': expiry,
'created': now,
'lock_name': self.lock_name,
'fencing_token': self._fencing_token
}
self._put_lock_info(lock_data)
# Verify ownership
final_lock = self._get_lock_info()
if (
final_lock is not None and
final_lock.get('owner_id') == self.owner_id and
final_lock.get('fencing_token') == self._fencing_token
):
return True
return False
except TypeError as e:
g_logger.error(f"Error acquiring lock {self.lock_key} due to data issue: {e}")
return False
def _get_lock_info(self):
"""Get current lock information from storage."""
try:
content, metadata = smart_fetch(self.lock_object_name, cache_expiry=1)
if content:
return pickle.loads(content)
return None
except pickle.UnpicklingError as e:
g_logger.error(f"Error unpickling lock data for {self.lock_key}: {e}")
return None
except oss_config.ObjectDoesNotExistError:
return None # Lock does not exist, not an error
except (oss_config.StorageOperationError, oss_config.LibcloudError) as e:
g_logger.warning(f"Could not get lock info for {self.lock_key}: {e}")
return None
@retry(
stop=stop_after_attempt(MAX_RETRY_ATTEMPTS),
wait=wait_exponential(multiplier=RETRY_MULTIPLIER, max=MAX_RETRY_INTERVAL),
retry=retry_if_exception_type((oss_config.StorageOperationError, oss_config.LibcloudError)),
reraise=True
)
def _put_lock_info(self, lock_data):
"""Put lock information to storage."""
try:
content = pickle.dumps(lock_data)
publish_bytes(content, self.lock_object_name)
return True
except pickle.PicklingError as e:
g_logger.error(f"Error pickling lock data for {self.lock_key}: {e}")
raise TypeError(f"Failed to serialize lock data for {self.lock_key}") from e
except (oss_config.StorageOperationError, oss_config.LibcloudError) as e:
g_logger.warning(f"Storage error putting lock info for {self.lock_key}: {e}")
raise
def release(self):
"""
Release the lock if held by this instance.
Returns:
True if lock was released, False otherwise.
"""
if self._thread_local.lock_count == 0:
return False
self._thread_local.lock_count -= 1
if self._thread_local.lock_count > 0:
return True # Still locked due to reentrancy
success = False
try:
current_lock = self._get_lock_info()
if (
current_lock is not None and
current_lock.get('owner_id') == self.owner_id and
current_lock.get('fencing_token') == self._fencing_token
):
obj = oss_config._storage_container.get_object(object_name=self.lock_object_name)
oss_config._storage_container.delete_object(obj)
success = True
except oss_config.ObjectDoesNotExistError:
g_logger.warning(f"Lock {self.lock_key} was not found during release, assuming already released.")
success = True # If the object is already gone, it's released.
except (oss_config.StorageOperationError, oss_config.LibcloudError) as e:
g_logger.error(f"Storage error releasing lock {self.lock_key}: {e}")
success = False
self._thread_local.lock_count = 0
return success
def renew(self, timeout_seconds):
"""
Renew the lock's expiry time if held by this instance.
Args:
timeout_seconds: New duration for the lock.
Returns:
True if lock was renewed, False otherwise.
"""
if self._thread_local.lock_count == 0:
return False
try:
current_lock = self._get_lock_info()
if (
current_lock is not None and
current_lock.get('owner_id') == self.owner_id and
current_lock.get('fencing_token') == self._fencing_token
):
now = time.monotonic()
lock_data = {
'owner_id': self.owner_id,
'expiry': now + timeout_seconds,
'created': current_lock['created'],
'lock_name': self.lock_name,
'fencing_token': self._fencing_token
}
self._put_lock_info(lock_data)
return True
return False
except (oss_config.StorageOperationError, oss_config.LibcloudError, TypeError) as e:
g_logger.error(f"Error renewing lock {self.lock_key}: {e}")
return False
def __enter__(self):
if not self.acquire(wait=True):
raise TimeoutError(f"Could not acquire lock '{self.lock_key}'")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def locked(self) -> bool:
"""Check if the lock is currently held by this instance."""
return self._thread_local.lock_count > 0
if __name__ == '__main__':
import unittest
from unittest.mock import MagicMock, patch
import time
class TestObjectStorageLock(unittest.TestCase):
def setUp(self):
self.mock_container = MagicMock()
self.mock_driver = MagicMock()
self.patcher = patch('object_storage_config')
self.mock_config = self.patcher.start()
self.mock_config.LIBCLOUD_AVAILABLE = True
self.mock_config.STORAGE_ENABLED = True
self.mock_config.SERVER_ID = 'test-server'
self.mock_config.STORAGE_SYNC_PATH = 'test-prefix/'
self.mock_config._storage_container = self.mock_container
self.mock_config._storage_driver = self.mock_driver
self.mock_config.init_storage = MagicMock(return_value=True)
self.mock_config.ObjectDoesNotExistError = Exception
self.lock = ObjectStorageLock('test-lock', metadata={'app': 'test'})
def tearDown(self):
self.patcher.stop()
def test_lock_initialization(self):
self.assertFalse(self.lock.locked())
self.assertEqual(self.lock.lock_name, 'test-lock')
self.assertTrue(self.lock.lock_key.startswith('lock::'))
def test_lock_acquisition(self):
self.mock_driver.download_object_as_stream.return_value = []
self.mock_container.get_object.side_effect = self.mock_config.ObjectDoesNotExistError
result = self.lock.acquire(timeout_seconds=10)
self.assertTrue(result)
self.assertTrue(self.lock.locked())
def test_lock_release(self):
self.mock_driver.download_object_as_stream.return_value = []
self.mock_container.get_object.side_effect = self.mock_config.ObjectDoesNotExistError
self.lock.acquire()
result = self.lock.release()
self.assertTrue(result)
self.assertFalse(self.lock.locked())
def test_context_manager(self):
self.mock_driver.download_object_as_stream.return_value = []
self.mock_container.get_object.side_effect = self.mock_config.ObjectDoesNotExistError
with self.lock:
self.assertTrue(self.lock.locked())
self.assertFalse(self.lock.locked())
def test_lock_expiry(self):
expired_time = time.monotonic() - 100
expired_lock_data = {
'owner_id': 'other-owner',
'expiry': expired_time,
'created': expired_time - 10,
'lock_name': 'test-lock',
'fencing_token': 1
}
mock_stream = MagicMock()
mock_stream.__iter__.return_value = [pickle.dumps(expired_lock_data)]
self.mock_driver.download_object_as_stream.return_value = mock_stream
result = self.lock.acquire()
self.assertTrue(result)
self.assertTrue(self.lock.locked())
def test_concurrent_locks(self):
current_time = time.monotonic()
valid_lock_data = {
'owner_id': 'other-owner',
'expiry': current_time + 100,
'created': current_time,
'lock_name': 'test-lock',
'fencing_token': 1
}
mock_stream = MagicMock()
mock_stream.__iter__.return_value = [pickle.dumps(valid_lock_data)]
self.mock_driver.download_object_as_stream.return_value = mock_stream
result = self.lock.acquire()
self.assertFalse(result)
self.assertFalse(self.lock.locked())
def test_reentrant_lock(self):
self.mock_driver.download_object_as_stream.return_value = []
self.mock_container.get_object.side_effect = self.mock_config.ObjectDoesNotExistError
with self.lock:
with self.lock: # Nested acquisition
self.assertTrue(self.lock.locked())
self.assertTrue(self.lock.locked())
self.assertFalse(self.lock.locked())
def test_renew_lock(self):
self.mock_driver.download_object_as_stream.return_value = []
self.mock_container.get_object.side_effect = self.mock_config.ObjectDoesNotExistError
self.lock.acquire()
current_lock = {
'owner_id': self.lock.owner_id,
'expiry': time.monotonic() + 10,
'created': time.monotonic(),
'lock_name': 'test-lock',
'fencing_token': self.lock._fencing_token
}
mock_stream = MagicMock()
mock_stream.__iter__.return_value = [pickle.dumps(current_lock)]
self.mock_driver.download_object_as_stream.return_value = mock_stream
result = self.lock.renew(20)
self.assertTrue(result)
def test_transient_error_retry(self):
self.mock_container.get_object.side_effect = [Exception("Transient error"), None]
self.mock_driver.download_object_as_stream.return_value = []
result = self.lock.acquire()
self.assertTrue(result)
self.assertTrue(self.lock.locked())
def test_invalid_lock_data(self):
mock_stream = MagicMock()
mock_stream.__iter__.return_value = [b"invalid pickle"]
self.mock_driver.download_object_as_stream.return_value = mock_stream
result = self.lock.acquire()
self.assertFalse(result)
self.assertFalse(self.lock.locked())
def test_simulate_race_condition(self):
# Simulate a race condition with two threads trying to acquire the same lock
event = threading.Event()
acquired_by_first = threading.Event()
def thread_func(lock_instance):
try:
acquired = lock_instance.acquire(timeout_seconds=10, wait=True)
if acquired:
acquired_by_first.set() # First one to acquire sets this
time.sleep(1) # Hold it briefly
lock_instance.release()
event.set() # Signal that this thread has finished
except Exception as e:
g_logger.error(f"Thread error: {e}")
event.set()
# Mock the storage to force a delay or inconsistency for simulation
self.mock_driver.download_object_as_stream.side_effect = lambda: time.sleep(0.5) or [] # Simulate delay
self.mock_container.get_object.side_effect = self.mock_config.ObjectDoesNotExistError
thread1 = threading.Thread(target=thread_func, args=(self.lock,))
thread2 = threading.Thread(target=thread_func, args=(self.lock,)) # Same lock instance
thread1.start()
thread2.start()
thread1.join(timeout=5)
thread2.join(timeout=5)
# Check if both threads think they acquired it (indicating a race)
# In a real scenario, this might not always fail due to timing, but it's a simulation
self.assertTrue(event.is_set(), "Threads did not complete")
# Add more assertions based on expected behavior, e.g., only one should acquire
unittest.main()