-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Improvements] Manage segment cache and memory #1670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
dd2331d
877d712
cb15ae4
48b9cfe
db78fa1
06e7a84
1f30cbc
9e909ef
8a6f537
097cc51
42fbc6d
2386cdd
ba45af9
9724918
034940c
15cc717
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -116,6 +116,7 @@ class Settings(BaseSettings): # type: ignore | |
| is_persistent: bool = False | ||
| persist_directory: str = "./chroma" | ||
|
|
||
| chroma_memory_limit: int = 0 | ||
|
||
| chroma_server_host: Optional[str] = None | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we introduce a config - called segment_manager_cache_policy and make this one of many types? |
||
| chroma_server_headers: Optional[Dict[str, str]] = None | ||
| chroma_server_http_port: Optional[str] = None | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,9 @@ | |
| VectorReader, | ||
| S, | ||
| ) | ||
| import time | ||
| import os | ||
|
|
||
| from chromadb.config import System, get_class | ||
| from chromadb.db.system import SysDB | ||
| from overrides import override | ||
|
|
@@ -37,6 +40,16 @@ | |
| SegmentType.HNSW_LOCAL_MEMORY: "chromadb.segment.impl.vector.local_hnsw.LocalHnswSegment", | ||
| SegmentType.HNSW_LOCAL_PERSISTED: "chromadb.segment.impl.vector.local_persistent_hnsw.PersistentLocalHnswSegment", | ||
| } | ||
| def get_size(start_path: str): | ||
| total_size = 0 | ||
| for dirpath, _, filenames in os.walk(start_path): | ||
| for f in filenames: | ||
| fp = os.path.join(dirpath, f) | ||
| # skip if it is symbolic link | ||
| if not os.path.islink(fp): | ||
| total_size += os.path.getsize(fp) | ||
|
|
||
| return total_size | ||
|
|
||
|
|
||
| class LocalSegmentManager(SegmentManager): | ||
|
|
@@ -140,16 +153,53 @@ def delete_segments(self, collection_id: UUID) -> Sequence[UUID]: | |
| "LocalSegmentManager.get_segment", | ||
| OpenTelemetryGranularity.OPERATION_AND_SEGMENT, | ||
| ) | ||
| def _get_segment_disk_size(self, collection_id: UUID): | ||
| segments = self._sysdb.get_segments(collection=collection_id, scope=SegmentScope.VECTOR) | ||
| size = get_size(os.path.join(self._system.settings.require("persist_directory"), str(segments[0]["id"]))) | ||
| return size | ||
|
|
||
|
|
||
| def _cleanup_segment(self, collection_id: UUID, target_size: int): | ||
| # Dictionary to store the size of each segment | ||
| segment_sizes = {id: self._get_segment_disk_size(id) for id in self._segment_cache.keys()} | ||
| total_size = sum(segment_sizes.values()) | ||
| new_segment_size = self._get_segment_disk_size(collection_id) | ||
|
|
||
| while total_size + new_segment_size > target_size and self._segment_cache.keys(): | ||
| oldest_key = min( | ||
| (k for k in self._segment_cache if SegmentScope.VECTOR in self._segment_cache[k]), | ||
| key=lambda k: self._segment_cache[k][SegmentScope.VECTOR]["last_used"], | ||
| default=None | ||
| ) | ||
|
|
||
| if oldest_key is not None: | ||
| # Stop the instance and remove from cache | ||
| instance = self._instance(self._segment_cache[oldest_key][SegmentScope.VECTOR]) | ||
| instance.stop() | ||
| del self._instances[self._segment_cache[oldest_key][SegmentScope.VECTOR]["id"]] | ||
|
|
||
| # Update total_size and remove the segment from cache and sizes dictionary | ||
| total_size -= segment_sizes[oldest_key] | ||
| del segment_sizes[oldest_key] | ||
| del self._segment_cache[oldest_key] | ||
| else: | ||
| break | ||
|
|
||
|
|
||
| @override | ||
| def get_segment(self, collection_id: UUID, type: Type[S]) -> S: | ||
|
|
||
| if type == MetadataReader: | ||
| scope = SegmentScope.METADATA | ||
| elif type == VectorReader: | ||
| scope = SegmentScope.VECTOR | ||
| else: | ||
| raise ValueError(f"Invalid segment type: {type}") | ||
|
|
||
| if scope not in self._segment_cache[collection_id]: | ||
| memory_limit = self._system.settings.require("chroma_memory_limit") | ||
| if type == VectorReader and self._system.settings.require("is_persistent") and memory_limit > 0: | ||
| self._cleanup_segment(collection_id, memory_limit) | ||
|
||
| segments = self._sysdb.get_segments(collection=collection_id, scope=scope) | ||
| known_types = set([k.value for k in SEGMENT_TYPE_IMPLS.keys()]) | ||
| # Get the first segment of a known type | ||
|
|
@@ -158,6 +208,7 @@ def get_segment(self, collection_id: UUID, type: Type[S]) -> S: | |
|
|
||
| # Instances must be atomically created, so we use a lock to ensure that only one thread | ||
| # creates the instance. | ||
| self._segment_cache[collection_id][scope]["last_used"] = time.time() | ||
| with self._lock: | ||
| instance = self._instance(self._segment_cache[collection_id][scope]) | ||
| return cast(S, instance) | ||
|
|
@@ -209,4 +260,5 @@ def _segment(type: SegmentType, scope: SegmentScope, collection: Collection) -> | |
| topic=collection["topic"], | ||
| collection=collection["id"], | ||
| metadata=metadata, | ||
| last_used=0 | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would update the documentation.