Enterprise-grade async artifact storage with grid architecture and session-based security
A production-ready Python library for storing and managing files across multiple storage backends (S3, IBM COS, filesystem, memory) with Redis-based metadata caching, strict session isolation, and grid-based federation architecture.
- 🔒 Session-based security - Every file belongs to a session, preventing data leaks
- 🏗️ Grid architecture - Organized hierarchical storage for infinite scalability
- 🌐 Multiple backends - Switch between S3, filesystem, memory without code changes
- ⚡ High performance - 3,000+ operations/second with async/await
- 🎯 Zero config - Works out of the box, configure only what you need
- 🔗 Presigned URLs - Secure file access without exposing credentials
- 🔄 Integration ready - Built-in support for chuk_sessions
- 📊 Production ready - Comprehensive monitoring, error handling, and batch operations
pip install chuk-artifacts
# Or with UV (recommended)
uv add chuk-artifactsfrom chuk_artifacts import ArtifactStore
# Works immediately - no configuration needed
async with ArtifactStore() as store:
    # Store a file
    file_id = await store.store(
        data=b"Hello, world!",
        mime="text/plain", 
        summary="My first file",
        filename="hello.txt"
    )
    
    # Get it back
    content = await store.retrieve(file_id)
    print(content.decode())  # "Hello, world!"
    
    # Share with a secure URL (15 minutes)
    url = await store.presign_short(file_id)
    print(f"Download: {url}")
    
    # Update the file
    await store.update_file(
        file_id,
        data=b"Hello, updated world!",
        summary="Updated greeting"
    )That's it! No AWS credentials, no Redis setup, no configuration files. Perfect for development and testing.
Files are organized in a predictable, hierarchical grid structure:
grid/
├── {sandbox_id}/          # Application/environment isolation
│   ├── {session_id}/      # User/workflow grouping  
│   │   ├── {artifact_id}  # Individual files
│   │   └── {artifact_id}
│   └── {session_id}/
│       ├── {artifact_id}
│       └── {artifact_id}
└── {sandbox_id}/
    └── ...
Why Grid Architecture?
- 🔒 Security: Natural isolation between applications and users
- 📈 Scalability: Supports billions of files across thousands of sessions
- 🌐 Federation: Easily distribute across multiple storage regions
- 🛠️ Operations: Predictable paths for backup, monitoring, and cleanup
- 🔍 Debugging: Clear hierarchical organization for troubleshooting
# Grid paths are generated automatically
file_id = await store.store(data, mime="text/plain", summary="Test")
# But you can inspect them
metadata = await store.metadata(file_id)
print(metadata['key'])  # grid/my-app/session-abc123/artifact-def456
# Parse any grid path
parsed = store.parse_grid_key(metadata['key'])
print(parsed)
# {
#   'sandbox_id': 'my-app',
#   'session_id': 'session-abc123', 
#   'artifact_id': 'artifact-def456',
#   'subpath': None
# }Every file belongs to a session. Sessions prevent users from accessing each other's files:
# Files are isolated by session
alice_file = await store.store(
    data=b"Alice's private data",
    mime="text/plain",
    summary="Private file",
    user_id="alice"  # Auto-creates session for Alice
)
bob_file = await store.store(
    data=b"Bob's private data", 
    mime="text/plain",
    summary="Private file",
    user_id="bob"  # Auto-creates session for Bob
)
# Users can only access their own files
alice_meta = await store.metadata(alice_file)
alice_files = await store.list_by_session(alice_meta['session_id'])  # Only Alice's files
# Cross-session operations are blocked for security
try:
    await store.copy_file(alice_file, target_session_id="bob_session")
except ArtifactStoreError:
    print("🔒 Cross-session access denied!")  # Security enforcedChuk Artifacts integrates seamlessly with chuk_sessions for advanced session management:
# Automatic session allocation with user mapping
file_id = await store.store(
    data=b"User document",
    mime="application/pdf",
    summary="Important document",
    user_id="[email protected]",  # Maps to session automatically
    meta={"department": "engineering", "confidential": True}
)
# Access session information
metadata = await store.metadata(file_id)
session_info = await store.get_session_info(metadata['session_id'])
print(f"User: {session_info['user_id']}")
print(f"Custom data: {session_info['custom_metadata']}")
# Extend session lifetime
await store.extend_session_ttl(metadata['session_id'], additional_hours=24)# Store various file types
pdf_id = await store.store(
    data=pdf_bytes,
    mime="application/pdf",
    summary="Q4 Financial Report",
    filename="reports/q4-2024.pdf",
    user_id="finance_team",
    meta={"department": "finance", "quarter": "Q4", "year": 2024}
)
# Read file content directly  
content = await store.read_file(pdf_id, as_text=False)  # Returns bytes
# Write text files easily
doc_id = await store.write_file(
    content="# Project Documentation\n\nThis project implements...",
    filename="docs/README.md",
    mime="text/markdown",
    user_id="dev_team",
    meta={"project": "chuk-artifacts", "version": "1.0"}
)
# Update files and metadata
await store.update_file(
    doc_id,
    data=b"# Updated Documentation\n\nThis project now supports...",
    summary="Updated project documentation",
    meta={"version": "1.1", "last_updated": "2024-01-15"}
)
# Check if file exists
if await store.exists(pdf_id):
    print("File found!")
# Safe deletion
deleted = await store.delete(pdf_id)
if deleted:
    print("File deleted successfully")# List files in a session
files = await store.list_by_session("session-123")
for file in files:
    print(f"{file['filename']}: {file['bytes']} bytes")
# List files with directory-like structure
docs = await store.list_files("session-123", prefix="docs/")
images = await store.list_files("session-123", prefix="images/")
# Get directory contents (more detailed)
reports = await store.get_directory_contents("session-123", "reports/")
for report in reports:
    print(f"{report['filename']}: {report['summary']}")
# Copy files (within same session only - security enforced)
backup_id = await store.copy_file(
    doc_id,
    new_filename="docs/README_backup.md",
    new_meta={"backup": True, "original_id": doc_id}
)
# Move/rename files
await store.move_file(
    backup_id,
    new_filename="backups/README_v1.md"
)# Rich metadata support
file_id = await store.store(
    data=image_bytes,
    mime="image/jpeg",
    summary="Product photo",
    filename="products/laptop-pro.jpg",
    user_id="marketing",
    meta={
        "product_id": "LPT-001",
        "category": "electronics", 
        "tags": ["laptop", "professional", "portable"],
        "dimensions": {"width": 1920, "height": 1080},
        "camera_info": {"model": "Canon EOS R5", "iso": 100},
        "photographer": "[email protected]"
    }
)
# Update metadata without changing file content
await store.update_metadata(
    file_id,
    summary="Updated product photo with new angle",
    meta={
        "tags": ["laptop", "professional", "portable", "workspace"],
        "last_edited": "2024-01-15T10:30:00Z"
    },
    merge=True  # Merge with existing metadata
)
# Extend TTL for important files
await store.extend_ttl(file_id, additional_seconds=86400)  # +24 hoursPerfect for development and testing:
# Automatic - no configuration needed
store = ArtifactStore()
# Or explicitly
from chuk_artifacts.config import configure_memory
configure_memory()
store = ArtifactStore()Pros:
- ✅ Zero setup required
- ✅ Fastest performance
- ✅ Perfect for testing
Cons:
- ❌ Non-persistent (lost on restart)
- ❌ Memory usage scales with file size
- ❌ Single process only
Local disk storage with full persistence:
from chuk_artifacts.config import configure_filesystem
configure_filesystem(root="./my-artifacts")
store = ArtifactStore()
# Or via environment
export ARTIFACT_PROVIDER=filesystem
export ARTIFACT_FS_ROOT=/data/artifactsPros:
- ✅ Persistent storage
- ✅ Easy debugging (files visible on disk)
- ✅ Good for development and small deployments
- ✅ Supports file:// presigned URLs
Cons:
- ❌ Not suitable for distributed systems
- ❌ No built-in redundancy
- ❌ Limited scalability
Production-ready cloud storage:
from chuk_artifacts.config import configure_s3
configure_s3(
    access_key="AKIA...",
    secret_key="...",
    bucket="production-artifacts",
    region="us-east-1"
)
store = ArtifactStore()
# Or via environment
export ARTIFACT_PROVIDER=s3
export AWS_ACCESS_KEY_ID=AKIA...
export AWS_SECRET_ACCESS_KEY=...
export AWS_REGION=us-east-1
export ARTIFACT_BUCKET=my-bucketPros:
- ✅ Unlimited scalability
- ✅ 99.999999999% durability
- ✅ Native presigned URLs
- ✅ Global CDN integration
- ✅ Lifecycle policies
Cons:
- ❌ Requires AWS account
- ❌ Network latency
- ❌ Pay-per-use pricing
Enterprise object storage with multiple authentication methods:
# HMAC Authentication (recommended)
from chuk_artifacts.config import configure_ibm_cos
configure_ibm_cos(
    access_key="your_hmac_key",
    secret_key="your_hmac_secret",
    bucket="enterprise-artifacts",
    endpoint="https://s3.us-south.cloud-object-storage.appdomain.cloud"
)
# IAM Authentication
from chuk_artifacts.config import configure_ibm_cos_iam
configure_ibm_cos_iam(
    api_key="your_ibm_api_key",
    instance_crn="crn:v1:bluemix:public:cloud-object-storage:global:...",
    bucket="enterprise-artifacts"
)Pros:
- ✅ Enterprise-grade security
- ✅ GDPR/compliance friendly
- ✅ Multiple regions globally
- ✅ Competitive pricing
Cons:
- ❌ IBM Cloud account required
- ❌ More complex setup than S3
Fast, in-memory metadata storage:
store = ArtifactStore(session_provider="memory")Pros:
- ✅ Fastest metadata access
- ✅ No external dependencies
- ✅ Perfect for development
Cons:
- ❌ Non-persistent
- ❌ Single instance only
- ❌ Memory usage scales with metadata
Persistent, distributed metadata storage:
from chuk_artifacts.config import configure_redis_session
configure_redis_session("redis://localhost:6379/0")
store = ArtifactStore()
# Or via environment
export SESSION_PROVIDER=redis
export SESSION_REDIS_URL=redis://prod-redis:6379/0Pros:
- ✅ Persistent metadata
- ✅ Shared across multiple instances
- ✅ Sub-millisecond access times
- ✅ Production ready
- ✅ Automatic expiration
Cons:
- ❌ Requires Redis server
- ❌ Additional infrastructure
| Variable | Description | Default | Examples | 
|---|---|---|---|
| Core Configuration | |||
| ARTIFACT_PROVIDER | Storage backend | memory | s3,filesystem,ibm_cos,ibm_cos_iam | 
| ARTIFACT_BUCKET | Bucket/container name | artifacts | my-files,prod-storage | 
| ARTIFACT_SANDBOX_ID | Sandbox identifier | Auto-generated | myapp,prod-env,user-portal | 
| SESSION_PROVIDER | Session metadata storage | memory | redis | 
| Filesystem Configuration | |||
| ARTIFACT_FS_ROOT | Filesystem root directory | ./artifacts | /data/files,~/storage | 
| Session Configuration | |||
| SESSION_REDIS_URL | Redis connection URL | - | redis://localhost:6379/0 | 
| AWS/S3 Configuration | |||
| AWS_ACCESS_KEY_ID | AWS access key | - | AKIA... | 
| AWS_SECRET_ACCESS_KEY | AWS secret key | - | abc123... | 
| AWS_REGION | AWS region | us-east-1 | us-west-2,eu-west-1 | 
| S3_ENDPOINT_URL | Custom S3 endpoint | - | https://minio.example.com | 
| IBM COS Configuration | |||
| IBM_COS_ENDPOINT | IBM COS endpoint | Auto-detected | https://s3.us-south.cloud-object-storage.appdomain.cloud | 
| IBM_COS_APIKEY | IBM Cloud API key (IAM) | - | abc123... | 
| IBM_COS_INSTANCE_CRN | COS instance CRN (IAM) | - | crn:v1:bluemix:public:... | 
# Zero configuration - uses memory providers
from chuk_artifacts import ArtifactStore
store = ArtifactStore()
# Or use helper functions
from chuk_artifacts.config import development_setup
store = development_setup()from chuk_artifacts.config import testing_setup
store = testing_setup("./dev-storage")  # Filesystem + metadata persistencefrom chuk_artifacts.config import production_setup
store = production_setup(
    storage_type="s3",
    access_key="AKIA...",
    secret_key="...",
    bucket="prod-artifacts",
    region="us-east-1",
    session_provider="redis"
)version: '3.8'
services:
  app:
    image: myapp
    environment:
      # Storage
      ARTIFACT_PROVIDER: s3
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
      AWS_REGION: us-east-1
      ARTIFACT_BUCKET: myapp-artifacts
      ARTIFACT_SANDBOX_ID: myapp-prod
      
      # Sessions  
      SESSION_PROVIDER: redis
      SESSION_REDIS_URL: redis://redis:6379/0
    depends_on:
      - redis
      
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
volumes:
  redis_data:apiVersion: v1
kind: ConfigMap
metadata:
  name: artifacts-config
data:
  ARTIFACT_PROVIDER: "s3"
  AWS_REGION: "us-east-1"
  ARTIFACT_BUCKET: "k8s-artifacts"
  SESSION_PROVIDER: "redis"
  SESSION_REDIS_URL: "redis://redis-service:6379/0"
---
apiVersion: v1
kind: Secret
metadata:
  name: artifacts-secrets
type: Opaque
data:
  AWS_ACCESS_KEY_ID: <base64-encoded>
  AWS_SECRET_ACCESS_KEY: <base64-encoded>Generate secure, time-limited URLs for file access without exposing your storage credentials:
# Generate download URLs with different durations
url = await store.presign(file_id, expires=3600)      # Custom duration
short_url = await store.presign_short(file_id)        # 15 minutes  
medium_url = await store.presign_medium(file_id)      # 1 hour (default)
long_url = await store.presign_long(file_id)          # 24 hours
# URLs work with all storage providers
print(f"S3 URL: https://bucket.s3.amazonaws.com/grid/...")
print(f"Filesystem URL: file:///data/artifacts/grid/...")
print(f"Memory URL: memory://grid/...")
# Generate upload URLs for client-side uploads
upload_url, artifact_id = await store.presign_upload(
    session_id="user_session",
    filename="large-video.mp4",
    mime_type="video/mp4",
    expires=1800  # 30 minutes for large uploads
)
# Client uploads directly to storage
# POST upload_url with file data
# Register the uploaded file
await store.register_uploaded_artifact(
    artifact_id,
    mime="video/mp4",
    summary="User uploaded video",
    filename="large-video.mp4",
    meta={"uploaded_by": "alice", "file_size_mb": 150}
)
# Combined upload + registration
upload_url, artifact_id = await store.presign_upload_and_register(
    mime="image/jpeg",
    summary="Profile picture",
    filename="avatar.jpg",
    session_id="user_session",
    meta={"avatar": True}
)Process multiple files efficiently:
# Prepare batch data
files = [
    {
        "data": image1_bytes,
        "mime": "image/jpeg",
        "summary": "Product image 1", 
        "filename": "products/laptop-front.jpg",
        "meta": {"product_id": "LPT-001", "angle": "front"}
    },
    {
        "data": image2_bytes,
        "mime": "image/jpeg",
        "summary": "Product image 2",
        "filename": "products/laptop-side.jpg",
        "meta": {"product_id": "LPT-001", "angle": "side"}
    },
    {
        "data": spec_bytes,
        "mime": "application/pdf",
        "summary": "Product specifications",
        "filename": "products/laptop-specs.pdf",
        "meta": {"product_id": "LPT-001", "type": "specification"}
    }
]
# Store all files atomically
file_ids = await store.store_batch(
    files, 
    session_id="product_catalog",
    ttl=86400  # 24 hours
)
# All files share the same session and metadata TTL
print(f"Stored {len([id for id in file_ids if id])} files successfully")from fastapi import FastAPI, UploadFile, HTTPException
from chuk_artifacts import ArtifactStore, ArtifactEnvelope
app = FastAPI()
store = ArtifactStore(
    storage_provider="s3",
    session_provider="redis"
)
@app.post("/upload", response_model=ArtifactEnvelope)
async def handle_upload(file: UploadFile, user_id: str):
    """Handle file upload with automatic session management"""
    content = await file.read()
    
    file_id = await store.store(
        data=content,
        mime=file.content_type or "application/octet-stream",
        summary=f"Uploaded: {file.filename}",
        filename=file.filename,
        user_id=user_id,  # Auto-creates and manages session
        meta={
            "original_name": file.filename,
            "upload_timestamp": datetime.utcnow().isoformat(),
            "user_agent": request.headers.get("user-agent")
        }
    )
    
    return ArtifactEnvelope(
        artifact_id=file_id,
        mime_type=file.content_type,
        bytes=len(content),
        summary=f"Uploaded: {file.filename}",
        meta={"status": "uploaded"}
    )
@app.get("/files/{user_id}")
async def list_user_files(user_id: str):
    """List all files for a user"""
    # Get user's session
    session_id = f"user_{user_id}"  # Or use your session mapping
    files = await store.list_by_session(session_id)
    
    return [
        {
            "file_id": f["artifact_id"],
            "filename": f["filename"],
            "size": f["bytes"],
            "uploaded": f["stored_at"]
        }
        for f in files
    ]
@app.get("/download/{file_id}")
async def get_download_url(file_id: str, user_id: str):
    """Generate secure download URL"""
    try:
        # Verify file belongs to user (optional security check)
        metadata = await store.metadata(file_id)
        expected_session = f"user_{user_id}"
        if metadata["session_id"] != expected_session:
            raise HTTPException(403, "Access denied")
        
        url = await store.presign_medium(file_id)
        return {"download_url": url, "expires_in": 3600}
    except ArtifactNotFoundError:
        raise HTTPException(404, "File not found")from mcp import Server
from chuk_artifacts import ArtifactStore
# MCP Server with artifact support
server = Server("artifacts-mcp")
store = ArtifactStore()
@server.tool("upload_file")
async def mcp_upload_file(
    data_b64: str, 
    filename: str, 
    session_id: str,
    mime_type: str = "application/octet-stream"
):
    """MCP tool for file uploads"""
    import base64
    
    try:
        data = base64.b64decode(data_b64)
        file_id = await store.store(
            data=data,
            mime=mime_type,
            summary=f"Uploaded via MCP: {filename}",
            filename=filename,
            session_id=session_id,
            meta={"upload_method": "mcp", "tool": "upload_file"}
        )
        
        # Generate immediate download URL
        download_url = await store.presign_medium(file_id)
        
        return {
            "file_id": file_id,
            "filename": filename,
            "size": len(data),
            "download_url": download_url,
            "message": f"Successfully uploaded {filename}"
        }
    except Exception as e:
        return {"error": f"Upload failed: {str(e)}"}
@server.tool("list_files")
async def mcp_list_files(session_id: str, directory: str = ""):
    """MCP tool for listing files"""
    try:
        if directory:
            files = await store.get_directory_contents(session_id, directory)
        else:
            files = await store.list_by_session(session_id)
            
        return {
            "files": [
                {
                    "id": f["artifact_id"],
                    "name": f["filename"],
                    "size": f["bytes"],
                    "type": f["mime"],
                    "created": f["stored_at"],
                    "summary": f["summary"]
                }
                for f in files
            ],
            "total": len(files)
        }
    except Exception as e:
        return {"error": f"List failed: {str(e)}"}
@server.tool("get_file_content")
async def mcp_get_file_content(file_id: str, as_text: bool = True):
    """MCP tool for reading file content"""
    try:
        if as_text:
            content = await store.read_file(file_id, as_text=True)
            return {"content": content, "type": "text"}
        else:
            import base64
            content = await store.read_file(file_id, as_text=False)
            return {
                "content": base64.b64encode(content).decode(),
                "type": "base64",
                "size": len(content)
            }
    except Exception as e:
        return {"error": f"Read failed: {str(e)}"}class DocumentManager:
    def __init__(self):
        self.store = ArtifactStore(
            storage_provider="s3",
            session_provider="redis"
        )
    
    async def create_document(
        self, 
        content: str, 
        path: str, 
        user_id: str,
        doc_type: str = "text"
    ):
        """Create a new document"""
        doc_id = await self.store.write_file(
            content=content,
            filename=path,
            mime="text/plain" if doc_type == "text" else "text/markdown",
            summary=f"Document: {path}",
            user_id=user_id,
            meta={
                "document_type": doc_type,
                "version": 1,
                "created_by": user_id,
                "last_modified": datetime.utcnow().isoformat()
            }
        )
        return doc_id
    
    async def update_document(self, doc_id: str, content: str, user_id: str):
        """Update document content and bump version"""
        # Get current metadata
        metadata = await self.store.metadata(doc_id)
        current_version = metadata.get("meta", {}).get("version", 1)
        
        # Update with new content and metadata
        await self.store.update_file(
            doc_id,
            data=content.encode(),
            meta={
                **metadata.get("meta", {}),
                "version": current_version + 1,
                "last_modified": datetime.utcnow().isoformat(),
                "last_modified_by": user_id
            }
        )
    
    async def get_document(self, doc_id: str) -> dict:
        """Get document content and metadata"""
        content = await self.store.read_file(doc_id, as_text=True)
        metadata = await self.store.metadata(doc_id)
        
        return {
            "content": content,
            "filename": metadata["filename"],
            "version": metadata.get("meta", {}).get("version", 1),
            "created": metadata["stored_at"],
            "modified": metadata.get("meta", {}).get("last_modified"),
            "size": metadata["bytes"]
        }
    
    async def list_user_documents(self, user_id: str, folder: str = ""):
        """List documents for a user, optionally in a folder"""
        # Get user's session - you'd implement your own user->session mapping
        session_id = f"user_{user_id}"
        
        if folder:
            docs = await self.store.get_directory_contents(session_id, folder)
        else:
            docs = await self.store.list_by_session(session_id)
        
        return [
            {
                "id": doc["artifact_id"],
                "filename": doc["filename"],
                "version": doc.get("meta", {}).get("version", 1),
                "size": doc["bytes"],
                "modified": doc.get("meta", {}).get("last_modified")
            }
            for doc in docs
        ]Comprehensive error handling for production applications:
from chuk_artifacts import (
    ArtifactStoreError,
    ArtifactNotFoundError,
    ArtifactExpiredError,
    ArtifactCorruptedError,
    ProviderError,
    SessionError
)
async def robust_file_operation(store, file_id):
    """Example of comprehensive error handling"""
    try:
        # Attempt file operation
        data = await store.retrieve(file_id)
        return {"success": True, "data": data}
        
    except ArtifactNotFoundError:
        return {"success": False, "error": "File not found or expired"}
        
    except ArtifactExpiredError:
        return {"success": False, "error": "File has expired"}
        
    except ArtifactCorruptedError:
        # Log for investigation
        logger.error(f"Corrupted metadata for file {file_id}")
        return {"success": False, "error": "File metadata corrupted"}
        
    except ProviderError as e:
        # Storage backend error
        logger.error(f"Storage provider error: {e}")
        return {"success": False, "error": "Storage system temporarily unavailable"}
        
    except SessionError as e:
        # Session/metadata system error
        logger.error(f"Session error: {e}")
        return {"success": False, "error": "Session system error"}
        
    except ArtifactStoreError as e:
        # This includes security violations like cross-session access
        logger.warning(f"Access denied for file {file_id}: {e}")
        return {"success": False, "error": "Access denied"}
        
    except Exception as e:
        # Unexpected error
        logger.exception(f"Unexpected error accessing file {file_id}")
        return {"success": False, "error": "Internal system error"}
# Usage in web handlers
@app.get("/api/files/{file_id}")
async def download_file(file_id: str):
    result = await robust_file_operation(store, file_id)
    if result["success"]:
        return {"download_url": await store.presign(file_id)}
    else:
        raise HTTPException(500, result["error"])Chuk Artifacts is built for high performance:
# Performance benchmarks (typical results)
"""
✅ File Storage:     3,083 files/sec
✅ File Retrieval:   4,693 reads/sec  
✅ File Updates:     2,156 updates/sec
✅ Batch Operations: 1,811 batch items/sec
✅ Session Listing:  ~2ms for 20+ files
✅ Metadata Access:  <1ms with Redis
"""
# Performance tips
async def optimized_operations():
    # Use batch operations for multiple files
    file_ids = await store.store_batch(multiple_files)  # Much faster than individual stores
    
    # Prefer read_file for text content (avoids encoding overhead)
    content = await store.read_file(file_id, as_text=True)  # vs retrieve + decode
    
    # Use appropriate TTL values
    await store.store(data, mime="text/plain", summary="Temp file", ttl=300)  # 5 minutes
    
    # Reuse store instances (connection pooling)
    async with ArtifactStore() as store:  # Connection pool maintained
        for file in files:
            await store.store(...)  # Reuses connections# Built-in monitoring capabilities
async def monitor_store_health():
    # Validate configuration and connectivity
    status = await store.validate_configuration()
    print(f"Storage: {status['storage']['status']}")
    print(f"Sessions: {status['session']['status']}")
    print(f"Session Manager: {status['session_manager']['status']}")
    
    # Get operational statistics
    stats = await store.get_stats()
    print(f"Provider: {stats['storage_provider']}")
    print(f"Bucket: {stats['bucket']}")
    print(f"Session Stats: {stats['session_manager']}")
    
    # Get sandbox information
    sandbox_info = await store.get_sandbox_info()
    print(f"Sandbox: {sandbox_info['sandbox_id']}")
    print(f"Grid Pattern: {sandbox_info['grid_prefix_pattern']}")
    
    # Clean up expired sessions
    cleaned = await store.cleanup_expired_sessions()
    print(f"Cleaned up {cleaned} expired sessions")
# Integration with monitoring systems
import logging
# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# The library logs important events automatically:
# - File storage/retrieval operations
# - Session management
# - Error conditions
# - Performance metrics# Run comprehensive smoke tests
python examples/smoke_run.py
# Expected output:
"""
🚀 Comprehensive Artifact Store Smoke Test
==================================================
🧪 Testing: Memory sessions + filesystem storage
   ✅ Configuration validated
   ✅ Basic operations for test-hello.txt (16 bytes)
   ✅ Generated presigned URLs (short/medium/long)
   ✅ Batch storage of 3 items
   ✅ Session listing (2 files)
   ✅ Statistics: filesystem/memory
📊 Test Summary: 34/34 passed
🎉 All tests passed! Artifact Store is working perfectly.
"""
# Run integration demo
python examples/integration_demo.py
# Run grid architecture demo
python examples/grid_demo.pyimport asyncio
import tempfile
from pathlib import Path
from chuk_artifacts import ArtifactStore
async def test_your_use_case():
    """Test your specific use case"""
    # Use temporary directory for testing
    with tempfile.TemporaryDirectory() as tmpdir:
        store = ArtifactStore(
            storage_provider="filesystem",
            fs_root=tmpdir,
            session_provider="memory"
        )
        
        # Test file operations
        file_id = await store.store(
            data=b"test content",
            mime="text/plain",
            summary="Test file",
            user_id="test_user"
        )
        
        # Verify storage
        assert await store.exists(file_id)
        content = await store.retrieve(file_id)
        assert content == b"test content"
        
        # Test metadata
        metadata = await store.metadata(file_id)
        assert metadata["bytes"] == 12
        assert metadata["mime"] == "text/plain"
        
        # Test session isolation
        files = await store.list_by_session(metadata["session_id"])
        assert len(files) == 1
        assert files[0]["artifact_id"] == file_id
        
        print("✅ All tests passed!")
# Run your tests
asyncio.run(test_your_use_case())# Test with different provider combinations
async def test_provider_combinations():
    providers = [
        ("memory", "memory"),
        ("memory", "filesystem"), 
        ("redis", "filesystem"),
        ("redis", "s3")  # Requires credentials
    ]
    
    for session_provider, storage_provider in providers:
        print(f"Testing {session_provider} + {storage_provider}")
        
        store = ArtifactStore(
            session_provider=session_provider,
            storage_provider=storage_provider
        )
        
        # Basic functionality test
        file_id = await store.store(
            data=b"test",
            mime="text/plain",
            summary="Provider test"
        )
        
        data = await store.retrieve(file_id)
        assert data == b"test"
        
        await store.close()
        print(f"✅ {session_provider} + {storage_provider} works!")# ✅ Good: Each user gets their own session
user_session = f"user_{user.id}"
await store.store(data, mime="text/plain", session_id=user_session)
# ✅ Good: Organization-level isolation
org_session = f"org_{organization.id}"
await store.store(data, mime="application/pdf", session_id=org_session)
# ❌ Bad: Shared sessions across users
shared_session = "global"  # All users can see each other's filesasync def secure_file_access(file_id: str, requesting_user_id: str):
    """Verify user can access file before serving"""
    try:
        metadata = await store.metadata(file_id)
        expected_session = f"user_{requesting_user_id}"
        
        if metadata["session_id"] != expected_session:
            raise HTTPException(403, "Access denied")
            
        return await store.presign(file_id)
        
    except ArtifactNotFoundError:
        raise HTTPException(404, "File not found")# ✅ Good: Use environment variables for secrets
import os
store = ArtifactStore(
    storage_provider=os.getenv("ARTIFACT_PROVIDER", "memory"),
    # Credentials come from environment
)
# ✅ Good: Use IAM roles in production (AWS/IBM)
# No hardcoded credentials needed
# ❌ Bad: Hardcoded credentials
store = ArtifactStore(
    storage_provider="s3",
    access_key="AKIA123...",  # Never do this!
    secret_key="secret123"
)# Before: Simple file operations
import os
import shutil
def save_file(content: bytes, filename: str):
    os.makedirs("uploads", exist_ok=True)
    with open(f"uploads/{filename}", "wb") as f:
        f.write(content)
    return f"uploads/{filename}"
def load_file(filepath: str):
    with open(filepath, "rb") as f:
        return f.read()
# After: Session-based artifact storage
from chuk_artifacts import ArtifactStore
store = ArtifactStore()
async def save_file(content: bytes, filename: str, user_id: str):
    file_id = await store.store(
        data=content,
        mime="application/octet-stream",
        summary=f"Uploaded: {filename}",
        filename=filename,
        user_id=user_id  # Automatic session management
    )
    return file_id
async def load_file(file_id: str):
    return await store.retrieve(file_id)# Before: Direct S3 operations
import boto3
s3 = boto3.client("s3")
def upload_to_s3(data: bytes, key: str):
    s3.put_object(Bucket="mybucket", Key=key, Body=data)
    return key
def download_from_s3(key: str):
    response = s3.get_object(Bucket="mybucket", Key=key)
    return response["Body"].read()
# After: Managed artifact storage with metadata
from chuk_artifacts import ArtifactStore
store = ArtifactStore(storage_provider="s3")
async def upload_file(data: bytes, filename: str, user_id: str):
    file_id = await store.store(
        data=data,
        mime="application/octet-stream",
        summary=f"User upload: {filename}",
        filename=filename,
        user_id=user_id,
        meta={"original_name": filename}
    )
    return file_id
async def download_file(file_id: str):
    return await store.retrieve(file_id)
# Benefits:
# - Automatic grid organization
# - Session-based security  
# - Rich metadata support
# - Presigned URL generation
# - Multiple provider supportA: No! The default memory providers work great for development and testing. Only use Redis for production when you need persistence or multi-instance deployment.
A: Yes! Just change the ARTIFACT_PROVIDER environment variable. The API stays identical across all providers.
A: Sessions are just strings. Create any mapping that makes sense:
# User-based sessions
session_id = f"user_{user.id}"
# Organization-based sessions  
session_id = f"org_{organization.id}"
# Project-based sessions
session_id = f"project_{project.uuid}"
# Multi-tenant sessions
session_id = f"tenant_{tenant_id}_user_{user_id}"A: Files and their metadata are automatically cleaned up:
# Manual cleanup
expired_count = await store.cleanup_expired_sessions()
# Files expire based on TTL when stored
await store.store(data, mime="text/plain", ttl=3600)  # 1 hourA: Absolutely! Initialize the store at application startup:
# FastAPI example
from fastapi import FastAPI
from chuk_artifacts import ArtifactStore
app = FastAPI()
store = None
@app.on_event("startup")
async def startup():
    global store
    store = ArtifactStore()
@app.on_event("shutdown") 
async def shutdown():
    await store.close()
@app.post("/upload")
async def upload(file: UploadFile):
    file_id = await store.store(...)
    return {"file_id": file_id}A: Use presigned upload URLs for client-side uploads:
# Generate upload URL
upload_url, artifact_id = await store.presign_upload(
    session_id="user_session",
    filename="large-video.mp4", 
    mime_type="video/mp4",
    expires=1800  # 30 minutes for large files
)
# Client uploads directly to storage
# Then register the uploaded file
await store.register_uploaded_artifact(
    artifact_id,
    mime="video/mp4",
    summary="Large video file"
)A: Yes! Features for production deployment:
- High performance: 3,000+ operations/second
- Multiple storage backends: S3, IBM COS, filesystem
- Session-based security: Prevent cross-user access
- Redis support: For distributed deployments
- Grid architecture: Infinite scalability
- Comprehensive error handling: Graceful failure modes
- Monitoring: Built-in health checks and statistics
- Docker/K8s ready: Environment-based configuration
A: Grid paths provide natural organization and distribution:
grid/
├── app-prod/           # Production environment
│   ├── user-alice/     # Alice's files
│   └── user-bob/       # Bob's files  
├── app-staging/        # Staging environment
│   └── test-session/   # Test files
└── app-analytics/      # Analytics application
    └── batch-001/      # Batch processing
Benefits:
- Federation: Distribute sandboxes across regions
- Backup: Backup specific sandboxes or sessions
- Cleanup: Remove entire environments cleanly
- Monitoring: Monitor per-application usage
- Security: Clear isolation boundaries
- Quick Start: pip install chuk-artifactsand try the 30-second example
- Development: Use memory providers for fast iteration
- Testing: Switch to filesystem provider for debugging
- Production: Configure S3 + Redis for scale
- Integration: Add to your web framework of choice
Ready to build something awesome with enterprise-grade file storage? 🚀
- Documentation: Full API Reference
- Examples: ./examples/
- Tests: Run python examples/smoke_run.py
- Issues: GitHub Issues
- Discussions: GitHub Discussions