Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion conda_libmamba_solver/shards_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,15 @@ def connect(dburi="cache.db"):

dburi: uri-style sqlite database filename; accepts certain ?= parameters.
"""
conn = sqlite3.connect(dburi, uri=True)
conn = sqlite3.connect(dburi, uri=True, timeout=30.0)
conn.row_factory = sqlite3.Row
with conn as c:
try:
mode = c.execute("PRAGMA journal_mode = WAL").fetchone()[0]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a big fan of WAL mode. It could fail not because of filesystem locks, which are used by all sqlite3 modes, but because shared memory is not available (if conda's cache is on a shared filesystem, accessed by two computers). conda-index users requested we drop WAL mode for this reason.
On the other hand, this is only a cache; and there are lots of reasons why conda might not work if two conda's try to use the same cache concurrently.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that we turn foreign_keys = ON but we only have one table and no foreign keys. Probably good practice anyway.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fine, I think. I remembered that PR. I think in this case we need it.

except sqlite3.DatabaseError:
mode = None
if mode and mode.lower().startswith("wal"):
c.execute("PRAGMA synchronous = NORMAL")
c.execute("PRAGMA foreign_keys = ON")
return conn

Expand Down
23 changes: 23 additions & 0 deletions news/924-fix-shards-db-locked
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
### Enhancements

* <news item>

### Bug fixes

* Open the sharded repodata cache (`repodata_shards.db`) in WAL mode with a
longer SQLite busy timeout, so the pipelined cache reader thread no longer
races with the network writer thread and raises
`sqlite3.OperationalError: database is locked`. Fall back to the default
journal mode on filesystems where WAL is not supported. (#924)

### Deprecations

* <news item>

### Docs

* <news item>

### Other

* <news item>
53 changes: 53 additions & 0 deletions tests/test_shards.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import sqlite3
import tempfile
import threading
import time
from contextlib import contextmanager
from pathlib import Path
Expand Down Expand Up @@ -633,6 +634,58 @@ def test_shards_cache_recovery(tmp_path: Path):
assert cache.retrieve("notfound") is None


def test_shards_cache_uses_wal(tmp_path: Path):
"""WAL journal mode is enabled on a fresh cache."""
with shards_cache.ShardCache(tmp_path) as cache:
mode = cache.conn.execute("PRAGMA journal_mode").fetchone()[0]
assert mode == "wal"


def test_shards_cache_concurrent_read_write(tmp_path: Path):
"""Concurrent readers and writers must not raise OperationalError (#924)."""
compressor = zstandard.ZstdCompressor(level=1)
errors: list[Exception] = []
stop = threading.Event()

def writer(base):
try:
with shards_cache.ShardCache(base, create=False) as cache_copy:
for i in range(200):
if stop.is_set():
break
shard = shards_cache.AnnotatedRawShard(
f"https://shard{i}",
f"pkg{i}",
compressor.compress(msgpack.dumps({f"pkg{i}": "data"})),
)
cache_copy.insert(shard)
except Exception as exc:
errors.append(exc)

def reader(base):
try:
with shards_cache.ShardCache(base, create=False) as cache_copy:
for i in range(200):
if stop.is_set():
break
urls = [f"https://shard{j}" for j in range(i + 1)]
cache_copy.retrieve_multiple(urls)
except Exception as exc:
errors.append(exc)

with shards_cache.ShardCache(tmp_path) as cache:
w = threading.Thread(target=writer, args=(cache.base,))
r = threading.Thread(target=reader, args=(cache.base,))
w.start()
r.start()
w.join(timeout=10)
r.join(timeout=10)
stop.set()

# No sqlite3.OperationalError from either thread
assert errors == []


NUM_FAKE_SHARDS = 64


Expand Down
Loading