Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chromadb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class Settings(BaseSettings): # type: ignore
is_persistent: bool = False
persist_directory: str = "./chroma"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would update the documentation.

chroma_memory_limit_bytes: int = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do I turn this capability on and off? Is 0 implicitly off?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. 0 is unlimited

chroma_server_host: Optional[str] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we introduce a config - called segment_manager_cache_policy and make this one of many types?

chroma_server_headers: Optional[Dict[str, str]] = None
chroma_server_http_port: Optional[str] = None
Expand Down
58 changes: 55 additions & 3 deletions chromadb/segment/impl/manager/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
VectorReader,
S,
)
import time
import os

from chromadb.config import System, get_class
from chromadb.db.system import SysDB
from overrides import override
Expand All @@ -31,14 +34,25 @@
elif platform.system() == "Windows":
import ctypes


SEGMENT_TYPE_IMPLS = {
SegmentType.SQLITE: "chromadb.segment.impl.metadata.sqlite.SqliteMetadataSegment",
SegmentType.HNSW_LOCAL_MEMORY: "chromadb.segment.impl.vector.local_hnsw.LocalHnswSegment",
SegmentType.HNSW_LOCAL_PERSISTED: "chromadb.segment.impl.vector.local_persistent_hnsw.PersistentLocalHnswSegment",
}


def get_directory_size(directory: str) -> int:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this into utils

total_size = 0
for dirpath, _, filenames in os.walk(directory):
for f in filenames:
fp = os.path.join(dirpath, f)
# skip if it is symbolic link
if not os.path.islink(fp):
total_size += os.path.getsize(fp)

return total_size


class LocalSegmentManager(SegmentManager):
_sysdb: SysDB
_system: System
Expand Down Expand Up @@ -72,8 +86,8 @@ def __init__(self, system: System):
else:
self._max_file_handles = ctypes.windll.msvcrt._getmaxstdio() # type: ignore
segment_limit = (
self._max_file_handles
// PersistentLocalHnswSegment.get_file_handle_count()
self._max_file_handles
// PersistentLocalHnswSegment.get_file_handle_count()
)
self._vector_instances_file_handle_cache = LRUCache(
segment_limit, callback=lambda _, v: v.close_persistent_index()
Expand Down Expand Up @@ -140,8 +154,41 @@ def delete_segments(self, collection_id: UUID) -> Sequence[UUID]:
"LocalSegmentManager.get_segment",
OpenTelemetryGranularity.OPERATION_AND_SEGMENT,
)
def _get_segment_disk_size(self, collection_id: UUID) -> int:
segments = self._sysdb.get_segments(collection=collection_id, scope=SegmentScope.VECTOR)
if len(segments) == 0:
return 0
size = get_directory_size(
os.path.join(self._system.settings.require("persist_directory"), str(segments[0]["id"])))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: leave comment stating assumption of one vector segment for this line - otherwise hardcoded [0] is confusing

return size

def _cleanup_segment(self, collection_id: UUID, target_size: int):
segment_sizes = {id: self._get_segment_disk_size(id) for id in self._segment_cache if
SegmentScope.VECTOR in self._segment_cache[id]}
total_size = sum(segment_sizes.values())
new_segment_size = self._get_segment_disk_size(collection_id)

while total_size + new_segment_size >= target_size and self._segment_cache.keys():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this LRU logic into a cleaner abstraction with eviction handles - like https://github.com/chroma-core/chroma/blob/main/chromadb/utils/lru_cache.py

oldest_key = min(
(k for k in self._segment_cache if SegmentScope.VECTOR in self._segment_cache[k]),
key=lambda k: self._segment_cache[k][SegmentScope.VECTOR]["last_used"],
default=None
)

if oldest_key is not None:
# Stop the instance and remove from cache
instance = self._instance(self._segment_cache[oldest_key][SegmentScope.VECTOR])
instance.stop()
# Update total_size and remove the segment from cache and sizes dictionary
total_size -= segment_sizes[oldest_key]
del segment_sizes[oldest_key]
del self._segment_cache[oldest_key]
else:
break

@override
def get_segment(self, collection_id: UUID, type: Type[S]) -> S:

if type == MetadataReader:
scope = SegmentScope.METADATA
elif type == VectorReader:
Expand All @@ -150,6 +197,9 @@ def get_segment(self, collection_id: UUID, type: Type[S]) -> S:
raise ValueError(f"Invalid segment type: {type}")

if scope not in self._segment_cache[collection_id]:
memory_limit = self._system.settings.require("chroma_memory_limit_bytes")
if type == VectorReader and self._system.settings.require("is_persistent") and memory_limit > 0:
self._cleanup_segment(collection_id, memory_limit)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: cleanup_segment could be renamed a bit more explicitly / when we have a proper lru cache abstraction for this

segments = self._sysdb.get_segments(collection=collection_id, scope=scope)
known_types = set([k.value for k in SEGMENT_TYPE_IMPLS.keys()])
# Get the first segment of a known type
Expand All @@ -158,6 +208,7 @@ def get_segment(self, collection_id: UUID, type: Type[S]) -> S:

# Instances must be atomically created, so we use a lock to ensure that only one thread
# creates the instance.
self._segment_cache[collection_id][scope]["last_used"] = time.time()
with self._lock:
instance = self._instance(self._segment_cache[collection_id][scope])
return cast(S, instance)
Expand Down Expand Up @@ -209,4 +260,5 @@ def _segment(type: SegmentType, scope: SegmentScope, collection: Collection) ->
topic=collection["topic"],
collection=collection["id"],
metadata=metadata,
last_used=0
)
2 changes: 1 addition & 1 deletion chromadb/test/db/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ def test_update_segment(sysdb: SysDB) -> None:
scope=SegmentScope.VECTOR,
topic="test_topic_a",
collection=sample_collections[0]["id"],
metadata=metadata,
metadata=metadata
)

sysdb.reset_state()
Expand Down
6 changes: 5 additions & 1 deletion chromadb/test/property/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import hypothesis.strategies as st
from typing import Any, Optional, List, Dict, Union, cast
from typing_extensions import TypedDict
import uuid
import numpy as np
import numpy.typing as npt
import chromadb.api.types as types
Expand Down Expand Up @@ -237,16 +238,17 @@ def embedding_function_strategy(
@dataclass
class Collection:
name: str
id: uuid.UUID
metadata: Optional[types.Metadata]
dimension: int
dtype: npt.DTypeLike
topic: str
known_metadata_keys: types.Metadata
known_document_keywords: List[str]
has_documents: bool = False
has_embeddings: bool = False
embedding_function: Optional[types.EmbeddingFunction[Embeddable]] = None


@st.composite
def collections(
draw: st.DrawFn,
Expand Down Expand Up @@ -309,7 +311,9 @@ def collections(
embedding_function = draw(embedding_function_strategy(dimension, dtype))

return Collection(
id=uuid.uuid4(),
name=name,
topic="topic",
metadata=metadata,
dimension=dimension,
dtype=dtype,
Expand Down
125 changes: 125 additions & 0 deletions chromadb/test/property/test_segment_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import uuid

import pytest
import chromadb.test.property.strategies as strategies
from unittest.mock import patch
from dataclasses import asdict
import random
from hypothesis.stateful import (
Bundle,
RuleBasedStateMachine,
rule,
initialize,
multiple,
precondition,
invariant,
run_state_machine_as_test,
MultipleResults,
)
from typing import Dict
from chromadb.segment import (
VectorReader
)
from chromadb.segment import SegmentManager

from chromadb.segment.impl.manager.local import LocalSegmentManager
from chromadb.types import SegmentScope
from chromadb.db.system import SysDB
from chromadb.config import System, get_class

memory_limit = 100


# Helper class to keep tract of the last use id
class LastUse:
def __init__(self, n: int):
self.n = n
self.store = []

def add(self, id: uuid.UUID):
if id in self.store:
self.store.remove(id)
self.store.append(id)
else:
self.store.append(id)
while len(self.store) > self.n:
self.store.pop(0)
return self.store

def reset(self):
self.store = []


class SegmentManagerStateMachine(RuleBasedStateMachine):
collections: Bundle[strategies.Collection]
collections = Bundle("collections")
collection_size_store: Dict[uuid.UUID, int] = {}
segment_collection: Dict[uuid.UUID, uuid.UUID] = {}

def __init__(self, system: System):
super().__init__()
self.segment_manager = system.require(SegmentManager)
self.segment_manager.start()
self.segment_manager.reset_state()
self.last_use = LastUse(n=40)
self.collection_created_counter = 0
self.sysdb = system.require(SysDB)
self.system = system

@invariant()
def last_queried_segments_should_be_in_cache(self):
cache_sum = 0
index = 0
for id in reversed(self.last_use.store):
cache_sum += self.collection_size_store[id]
if cache_sum >= memory_limit and index is not 0:
break
assert self.segment_manager._segment_cache[id][SegmentScope.VECTOR] is not None
index += 1

@invariant()
@precondition(lambda self: self.system.settings.is_persistent is True)
def cache_should_not_be_bigger_than_settings(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the behavior for boundary conditions? Eg. if the limit is 10GB and we have two files - 6GB and 7GB, will we always only allow one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it this what we want? We could add a message in log when a collection got evicted for memory constraint

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah lets log

segment_sizes = {id: self.collection_size_store[id] for id in self.segment_manager._segment_cache}
total_size = sum(segment_sizes.values())
if len(segment_sizes) != 1:
assert total_size <= memory_limit

@initialize()
def initialize(self) -> None:
self.segment_manager.reset_state()
self.segment_manager.start()
self.collection_created_counter = 0
self.last_use.reset()

@rule(target=collections, coll=strategies.collections())
@precondition(lambda self: self.collection_created_counter <= 50)
def create_segment(
self, coll: strategies.Collection
) -> MultipleResults[strategies.Collection]:
segments = self.segment_manager.create_segments(asdict(coll))
for segment in segments:
self.sysdb.create_segment(segment)
self.segment_collection[segment["id"]] = coll.id
self.collection_created_counter += 1
self.collection_size_store[coll.id] = random.randint(0, memory_limit)
return multiple(coll)

@rule(coll=collections)
def get_segment(self, coll: strategies.Collection) -> None:
segment = self.segment_manager.get_segment(collection_id=coll.id, type=VectorReader)
self.last_use.add(coll.id)
assert segment is not None

@staticmethod
def mock_directory_size(directory: str):
path_id = directory.split("/").pop()
collection_id = SegmentManagerStateMachine.segment_collection[uuid.UUID(path_id)]
return SegmentManagerStateMachine.collection_size_store[collection_id]


@patch('chromadb.segment.impl.manager.local.get_directory_size', SegmentManagerStateMachine.mock_directory_size)
def test_segment_manager(caplog: pytest.LogCaptureFixture, system: System) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

system.settings.chroma_memory_limit_bytes = memory_limit
run_state_machine_as_test(
lambda: SegmentManagerStateMachine(system=system))
1 change: 1 addition & 0 deletions chromadb/test/segment/test_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def create_random_segment_definition() -> Segment:
topic="persistent://test/test/test_topic_1",
collection=None,
metadata=test_hnsw_config,
last_used=0,
)


Expand Down
1 change: 1 addition & 0 deletions chromadb/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Segment(TypedDict):
id: UUID
type: NamespacedName
scope: SegmentScope
last_used: int
# If a segment has a topic, it implies that this segment is a consumer of the topic
# and indexes the contents of the topic.
topic: Optional[str]
Expand Down