Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/aleph/services/storage/fileystem_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,20 @@ async def read(self, filename: str) -> Optional[bytes]:

async def write(self, filename: str, content: bytes):
file_path = self.folder / filename
file_path.write_bytes(content)
temp_path = self.folder / f"{filename}.tmp"

try:
# Write to temporary file first
temp_path.write_bytes(content)

# Atomic rename - this operation is atomic on POSIX systems
# If crash happens before this, temp file exists but target doesn't
temp_path.replace(file_path)

except Exception:
# Clean up temp file if write failed
temp_path.unlink(missing_ok=True)
raise

async def delete(self, filename: str):
file_path = self.folder / filename
Expand Down
11 changes: 10 additions & 1 deletion src/aleph/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from hashlib import sha256
from typing import Any, Final, Optional, cast

import orjson
from aleph_message.models import ItemType

import aleph.toolkit.json as aleph_json
Expand Down Expand Up @@ -194,8 +195,16 @@ async def get_hash_content(

# Try to retrieve the data from the DB, then from the network or IPFS.
content = await self.storage_engine.read(filename=content_hash)

if content is not None:
source = ContentSource.DB
# check json and fix if corrupted
try:
json_content = aleph_json.loads(content)
source = ContentSource.DB
except orjson.JSONDecodeError as e:
LOGGER.warning("Can't decode JSON, Change source...")
await self.storage_engine.delete(filename=content_hash)
content = None

if content is None and use_network:
content = await self._fetch_content_from_network(
Expand Down
Loading