Skip to content

feat: implement unified storage system with multiple backends #979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

MervinPraison
Copy link
Owner

Summary

Unified Storage System Implementation

  • Created BaseStorage abstract class with async/sync support
  • Implemented 8 storage backends: MongoDB, PostgreSQL, Redis, DynamoDB, S3, GCS, Azure
  • Enhanced Memory class with provider selection API
  • Maintained full backward compatibility

Key Features

  • Minimal code changes on client side
  • Configuration-first approach
  • Multi-storage support
  • Quality-based storage decisions
  • Connection pooling and performance optimization

New API Examples

# MongoDB storage
memory 

…gents

- Add unified storage interface (BaseStorage) with async/await support
- Implement 8+ storage backends: SQLite, MongoDB, PostgreSQL, Redis, DynamoDB, S3, GCS, Azure
- Enhance Memory class with backward-compatible multi-storage support
- Add configuration-first approach for easy provider switching
- Support primary + cache storage patterns
- Maintain full backward compatibility with existing code
- Add graceful dependency handling for optional backends

Resolves #971

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Copy link
Contributor

coderabbitai bot commented Jul 17, 2025

Warning

Rate limit exceeded

@MervinPraison has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 0 minutes and 28 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 39fbb76 and ea5c325.

📒 Files selected for processing (11)
  • src/praisonai-agents/praisonaiagents/__init__.py (2 hunks)
  • src/praisonai-agents/praisonaiagents/memory/__init__.py (1 hunks)
  • src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py (1 hunks)
  • src/praisonai-agents/praisonaiagents/storage/__init__.py (1 hunks)
  • src/praisonai-agents/praisonaiagents/storage/base.py (1 hunks)
  • src/praisonai-agents/praisonaiagents/storage/cloud_storage.py (1 hunks)
  • src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py (1 hunks)
  • src/praisonai-agents/praisonaiagents/storage/mongodb_storage.py (1 hunks)
  • src/praisonai-agents/praisonaiagents/storage/postgresql_storage.py (1 hunks)
  • src/praisonai-agents/praisonaiagents/storage/redis_storage.py (1 hunks)
  • src/praisonai-agents/praisonaiagents/storage/sqlite_storage.py (1 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @MervinPraison, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades the data persistence capabilities of the praisonai-agents library by implementing a robust, unified storage system. It abstracts away the complexities of different database and cloud storage technologies behind a common asynchronous interface, offering developers a wide range of scalable and performant options for managing agent memory and data. The design ensures seamless integration and future extensibility while preserving compatibility with existing codebases.

Highlights

  • Unified Storage Abstraction: I've introduced a new BaseStorage abstract class that defines a common asynchronous interface for all storage operations (read, write, delete, search, list keys, clear, and batch operations). This provides a consistent API for interacting with various data stores.
  • Expanded Storage Backend Support: This PR adds comprehensive support for 8 new storage backends: MongoDB, PostgreSQL, Redis, DynamoDB, AWS S3, Google Cloud Storage, Azure Blob Storage, and an enhanced SQLite implementation. Each backend adheres to the new BaseStorage interface, allowing for flexible configuration.
  • Enhanced Memory Management: The core Memory class has been refactored into EnhancedMemory, which now leverages the new unified storage system. It supports configuring both a primary storage and an optional cache storage, enabling more sophisticated memory strategies.
  • Backward Compatibility: Crucially, the changes maintain full backward compatibility. The memory/__init__.py now intelligently imports EnhancedMemory if the necessary storage dependencies are available, otherwise falling back to the original Memory implementation. Existing synchronous memory methods are wrapped to utilize the new asynchronous storage backends.
  • Configuration-First Approach: The new storage system is designed with a configuration-first approach, allowing users to easily select and configure their preferred storage providers with minimal code changes on the client side.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a unified storage system with multiple backends, enhancing modularity and extensibility. The review focuses on improving robustness, efficiency, and maintainability, including correcting asyncio event loop management, fixing a missing import, addressing performance issues in cloud storage and Redis backends, and improving database operation robustness.

Comment on lines +511 to +516
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.store(key, data))
finally:
loop.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Creating a new event loop with asyncio.new_event_loop() for each synchronous call is inefficient and can cause issues. It will fail if an event loop is already running in the thread, which is common in async frameworks or in Jupyter notebooks.

A safer approach for running an async function from a sync context is asyncio.run(). While it also has limitations (it can't be called when a loop is already running), it's an improvement over managing loops manually.

This issue is present in store_short_term, search_short_term, reset_short_term, and reset_all.

Suggested change
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.store(key, data))
finally:
loop.close()
return asyncio.run(self.store(key, data))


try:
import boto3
from boto3.dynamodb.conditions import Key, Attr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This line will raise a NameError at runtime because TypeDeserializer is not imported. You need to add from boto3.dynamodb.types import TypeDeserializer at the top of the file to resolve this.

Comment on lines +248 to +284
async def search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Search for records matching the query."""
await self._ensure_connection()

loop = asyncio.get_event_loop()

try:
# List all objects with prefix
response = await loop.run_in_executor(
None,
self.s3_client.list_objects_v2,
{"Bucket": self.bucket, "Prefix": self.prefix}
)

results = []
limit = query.get("limit", 100)

for obj in response.get("Contents", []):
if len(results) >= limit:
break

s3_key = obj["Key"]
key = self._strip_prefix(s3_key)

# Read and filter object
record = await self.read(key)
if record and self._matches_query(record, query):
results.append(record)

# Sort by updated_at descending
results.sort(key=lambda x: x.get("updated_at", 0), reverse=True)

return results

except Exception as e:
self.logger.error(f"Failed to search: {e}")
return []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The search implementation for S3 is inefficient. It lists all objects under the prefix and then reads each object's full content to perform client-side filtering. For a large number of objects, this will be slow and can incur significant costs due to the high number of GET requests.

Consider using S3 Select for server-side filtering to improve performance. At a minimum, this performance limitation should be clearly documented in the docstring to warn users.

This same issue exists for GCSStorage and AzureStorage as well.

Comment on lines +347 to +373
async def clear(self) -> bool:
"""Clear all records from storage."""
await self._ensure_connection()

loop = asyncio.get_event_loop()

try:
# List all objects
response = await loop.run_in_executor(
None,
self.s3_client.list_objects_v2,
{"Bucket": self.bucket, "Prefix": self.prefix}
)

# Delete all objects
for obj in response.get("Contents", []):
await loop.run_in_executor(
None,
self.s3_client.delete_object,
{"Bucket": self.bucket, "Key": obj["Key"]}
)

return True

except Exception as e:
self.logger.error(f"Failed to clear storage: {e}")
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The clear method deletes objects one by one in a loop, which is inefficient for a large number of objects. Cloud storage providers offer batch deletion APIs that are faster and more cost-effective.

For S3, you should use s3_client.delete_objects(). This same issue applies to GCSStorage (use bucket.delete_blobs()) and AzureStorage (use container_client.delete_blobs()).

    async def clear(self) -> bool:
        """Clear all records from storage."""
        await self._ensure_connection()
        
        loop = asyncio.get_event_loop()
        
        try:
            # Use paginator to handle large number of objects
            paginator = self.s3_client.get_paginator('list_objects_v2')
            pages = paginator.paginate(Bucket=self.bucket, Prefix=self.prefix)

            for page in pages:
                if 'Contents' not in page:
                    continue
                
                objects_to_delete = [{'Key': obj['Key']} for obj in page['Contents']]
                if not objects_to_delete:
                    continue

                await loop.run_in_executor(
                    None,
                    self.s3_client.delete_objects,
                    {'Bucket': self.bucket, 'Delete': {'Objects': objects_to_delete}}
                )
            
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to clear storage: {e}")
            return False

Comment on lines +217 to +264
async def search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Search for records matching the query.

Note: Redis doesn't have native search capabilities like MongoDB/PostgreSQL.
This implementation scans all keys and filters client-side, which may be slow
for large datasets. Consider using RedisSearch module for production use.
"""
await self._ensure_connection()

try:
# Get all keys with our prefix
pattern = f"{self.key_prefix}*"
keys = await self.redis.keys(pattern)

if not keys:
return []

# Get all records
raw_data = await self.redis.mget(keys)
results = []

# Process and filter records
for i, data in enumerate(raw_data):
if data:
try:
json_str = self._decompress_data(data)
record = json.loads(json_str)
record["id"] = self._strip_prefix(keys[i].decode())

# Apply filters
if self._matches_query(record, query):
results.append(record)

except (json.JSONDecodeError, UnicodeDecodeError) as e:
self.logger.error(f"Failed to decode record: {e}")
continue

# Sort by updated_at descending
results.sort(key=lambda x: x.get("updated_at", 0), reverse=True)

# Apply limit
limit = query.get("limit", 100)
return results[:limit]

except RedisError as e:
self.logger.error(f"Failed to search: {e}")
return []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The search method uses the KEYS command, which is a blocking operation that can severely degrade Redis performance in production, especially with a large number of keys. It's strongly recommended to use SCAN instead, which iterates through the keyspace without blocking the server.

This same issue applies to the clear and count methods.

    async def search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        Search for records matching the query.
        
        Note: This implementation scans keys and filters client-side, which may be slow
        for large datasets. Consider using RedisSearch module for production use.
        """
        await self._ensure_connection()
        
        try:
            results = []
            limit = query.get("limit", 100)
            
            # Use SCAN instead of KEYS to avoid blocking the server
            async for key_bytes in self.redis.scan_iter(match=f"{self.key_prefix}*"):
                data = await self.redis.get(key_bytes)
                if data:
                    try:
                        json_str = self._decompress_data(data)
                        record = json.loads(json_str)
                        record["id"] = self._strip_prefix(key_bytes.decode())
                        
                        if self._matches_query(record, query):
                            results.append(record)
                            
                    except (json.JSONDecodeError, UnicodeDecodeError) as e:
                        self.logger.error(f"Failed to decode record: {e}")
                        continue
            
            results.sort(key=lambda x: x.get("updated_at", 0), reverse=True)
            
            return results[:limit]
            
        except RedisError as e:
            self.logger.error(f"Failed to search: {e}")
            return []

Comment on lines +210 to +216
result = await conn.execute(
f"DELETE FROM {self.schema}.{self.table_name} WHERE id = $1",
key
)
# Extract affected rows from result string like "DELETE 1"
affected_rows = int(result.split()[-1]) if result.split() else 0
return affected_rows > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Parsing the status string returned by conn.execute (e.g., "DELETE 1") to determine the number of affected rows is brittle, as the format is not guaranteed to be stable across asyncpg versions.

A more robust approach is to use the RETURNING clause to confirm the deletion.

Suggested change
result = await conn.execute(
f"DELETE FROM {self.schema}.{self.table_name} WHERE id = $1",
key
)
# Extract affected rows from result string like "DELETE 1"
affected_rows = int(result.split()[-1]) if result.split() else 0
return affected_rows > 0
status = await conn.fetchval(
f"DELETE FROM {self.schema}.{self.table_name} WHERE id = $1 RETURNING id",
key
)
return status is not None

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async Initialization Flaws in SQLiteStorage

The SQLiteStorage constructor's asynchronous initialization is flawed. The check asyncio.get_event_loop().is_running() is problematic: asyncio.get_event_loop() is deprecated and can raise RuntimeError if no loop exists, while is_running() is not a valid method on loop objects, causing an AttributeError. Additionally, the asyncio.create_task() call is neither awaited nor stored, leading to a race condition where the constructor returns before database initialization completes. This can cause subsequent operations to fail or the task to be garbage collected. The initialization should use asyncio.get_running_loop() with proper error handling and ensure the task is managed.

src/praisonai-agents/praisonaiagents/storage/sqlite_storage.py#L44-L46

# Initialize database
asyncio.create_task(self._init_db()) if asyncio.get_event_loop().is_running() else self._init_db_sync()

Fix in CursorFix in Web


Bug: Async Calls in Sync Batch Writer

The DynamoDBStorage class incorrectly wraps batch.put_item and batch.delete_item calls with await loop.run_in_executor inside the synchronous boto3.table.batch_writer() context. This misuses the batch writer, which is designed for synchronous operations, leading to potential resource leaks, inconsistent batch behavior, and thread safety concerns.

src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py#L429-L437

# Delete all items
with self.table.batch_writer() as batch:
for item in response.get("Items", []):
await loop.run_in_executor(
None,
batch.delete_item,
{"Key": {"id": item["id"]}}
)

src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py#L469-L474

await loop.run_in_executor(
None,
batch.put_item,
{"Item": item}
)

Fix in CursorFix in Web


Bug: Async/Sync Bridging Causes Event Loop Conflicts

The EnhancedMemory class contains an incorrect async/sync bridging pattern in methods like store_short_term, search_short_term, reset_short_term, and reset_all. These methods create a new event loop with asyncio.new_event_loop() and set it as the current loop with asyncio.set_event_loop(loop). This can interfere with existing event loops, cause a RuntimeError if called within an already running async context, and does not restore the original event loop, potentially breaking other async code. The pattern should be replaced with asyncio.run() or proper handling of existing event loops.

src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py#L510-L517

# Use async storage
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.store(key, data))
finally:
loop.close()
else:

Fix in CursorFix in Web


Bug: S3Storage Class Fails Boto3 API Calls

The S3Storage class incorrectly calls boto3 methods via asyncio.run_in_executor. It passes a dictionary (e.g., {'Bucket': self.bucket}) as a single positional argument, but boto3 methods expect keyword arguments (e.g., Bucket=self.bucket). This error affects all boto3 API calls within the class (e.g., head_bucket, get_object, put_object, list_objects_v2), causing runtime errors.

src/praisonai-agents/praisonaiagents/storage/cloud_storage.py#L144-L149

try:
await loop.run_in_executor(
None,
self.s3_client.head_bucket,
{"Bucket": self.bucket}
)

Fix in CursorFix in Web


Bug: SQL Injection via JSONB Key Interpolation

The PostgreSQLStorage.search method is vulnerable to SQL injection when searching by metadata. The PostgreSQL ->> operator requires the JSONB key to be a literal string, not a parameterized value. The current implementation attempts to parameterize the key (e.g., metadata ->> $1), which results in invalid SQL. To execute a valid query, the key must be directly interpolated into the SQL string. Since this key originates from user input (query["metadata"]), direct interpolation creates a SQL injection vulnerability.

src/praisonai-agents/praisonaiagents/storage/postgresql_storage.py#L246-L251

for key, value in query["metadata"].items():
param_count += 1
where_conditions.append(f"metadata ->> ${param_count} = ${param_count + 1}")
params.extend([key, str(value)])
param_count += 1

Fix in CursorFix in Web


Bug: Circular Import Issue in Memory Factory

A circular import bug exists in the Memory factory function within enhanced_memory.py. When STORAGE_AVAILABLE is False, the function attempts to import Memory from .memory (i.e., memory/__init__.py), which itself imports from enhanced_memory.py, leading to a circular dependency and an ImportError at runtime.

src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py#L641-L647

"""
if STORAGE_AVAILABLE:
return EnhancedMemory(config, verbose)
else:
# Import and return original Memory class
from .memory import Memory as OriginalMemory
return OriginalMemory(config, verbose)

Fix in CursorFix in Web


BugBot free trial expires on July 22, 2025
Learn more in the Cursor dashboard.

Was this report helpful? Give feedback by reacting with 👍 or 👎

@MervinPraison
Copy link
Owner Author

@claude review this pull request and do a detailed analysis and fix this if the existing code doesn't have the solution implemented. Making sure it has backward compatibility, no existing features removed. After making those changes, again review the applied changes. Use @web to search if you dont know any information or to find the latest documentation or to find the latest version. Run the code if you think you need to run it to test it. Minimal code change to start with if required any changes.

Copy link
Contributor

github-actions bot commented Jul 17, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant