Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,15 @@ See the examples in the [clients](./engine/clients) directory.

Once all the necessary classes are implemented, you can register the engine in the [ClientFactory](./engine/clients/client_factory.py).

### Doris Vector Search Support

This repository now includes experimental support for benchmarking Apache Doris vector search via the [`doris_vector_search`](https://github.com/uchenily/doris_vector_search) SDK.

To run a Doris benchmark (server docker unchanged for now), start your Doris cluster separately, then invoke:

```bash
python run.py --engines "doris" --datasets "dbpedia-openai-100K-1536-angular"
```

You can adjust connection parameters and table/database names through the `experiments/configurations` engine configuration files (add a section for `doris`). Table schema is inferred automatically on first upload batch; vector distance mapping uses `l2_distance` or `inner_product` depending on dataset distance (cosine mapped to inner product assuming normalized vectors).

3 changes: 3 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- [ ] Fix Doris search init when distance argument is string
- [ ] Ensure Doris searcher opens existing table or handles missing gracefully
- [ ] Address missing column detection due to newly created table
14 changes: 14 additions & 0 deletions engine/base_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ def run_experiment(
distance=dataset.config.distance, vector_size=dataset.config.vector_size
)

# Ensure Doris components (and other engines that might need it) know the vector dim
vector_dim = dataset.config.vector_size
if vector_dim is not None:
self.configurator.collection_params.setdefault("vector_dim", vector_dim)
uploader_collection = self.uploader.upload_params.setdefault(
"collection_params", {}
)
uploader_collection.setdefault("vector_dim", vector_dim)
for searcher in self.searchers:
search_collection = searcher.search_params.setdefault(
"collection_params", {}
)
search_collection.setdefault("vector_dim", vector_dim)

reader = dataset.get_reader(execution_params.get("normalize", False))

if skip_if_exists:
Expand Down
37 changes: 28 additions & 9 deletions engine/clients/client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
BaseSearcher,
BaseUploader,
)
from engine.clients.doris import DorisConfigurator, DorisSearcher, DorisUploader
from engine.clients.elasticsearch import (
ElasticConfigurator,
ElasticSearcher,
Expand Down Expand Up @@ -39,6 +40,7 @@
"opensearch": OpenSearchConfigurator,
"redis": RedisConfigurator,
"pgvector": PgVectorConfigurator,
"doris": DorisConfigurator,
}

ENGINE_UPLOADERS = {
Expand All @@ -49,6 +51,7 @@
"opensearch": OpenSearchUploader,
"redis": RedisUploader,
"pgvector": PgVectorUploader,
"doris": DorisUploader,
}

ENGINE_SEARCHERS = {
Expand All @@ -59,6 +62,7 @@
"opensearch": OpenSearchSearcher,
"redis": RedisSearcher,
"pgvector": PgVectorSearcher,
"doris": DorisSearcher,
}


Expand All @@ -79,26 +83,41 @@ def _create_configurator(self, experiment) -> BaseConfigurator:

def _create_uploader(self, experiment) -> BaseUploader:
engine_uploader_class = ENGINE_UPLOADERS[experiment["engine"]]
upload_params = {**experiment.get("upload_params", {})}
# Propagate collection_params for engines that need database/table info during upload (e.g., doris)
if experiment["engine"] == "doris":
merged_collection = {
**experiment.get("collection_params", {}),
**upload_params.get("collection_params", {}),
}
upload_params["collection_params"] = merged_collection
engine_uploader = engine_uploader_class(
self.host,
connection_params={**experiment.get("connection_params", {})},
upload_params={**experiment.get("upload_params", {})},
upload_params=upload_params,
)
return engine_uploader

def _create_searchers(self, experiment) -> List[BaseSearcher]:
engine_searcher_class: Type[BaseSearcher] = ENGINE_SEARCHERS[
experiment["engine"]
]

engine_searchers = [
engine_searcher_class(
self.host,
connection_params={**experiment.get("connection_params", {})},
search_params=search_params,
engine_searchers = []
for search_params in experiment.get("search_params", [{}]):
params = {**search_params}
if experiment["engine"] == "doris":
merged_collection = {
**experiment.get("collection_params", {}),
**params.get("collection_params", {}),
}
params["collection_params"] = merged_collection
engine_searchers.append(
engine_searcher_class(
self.host,
connection_params={**experiment.get("connection_params", {})},
search_params=params,
)
)
for search_params in experiment.get("search_params", [{}])
]

return engine_searchers

Expand Down
9 changes: 9 additions & 0 deletions engine/clients/doris/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .configure import DorisConfigurator
from .search import DorisSearcher
from .upload import DorisUploader

__all__ = [
"DorisConfigurator",
"DorisUploader",
"DorisSearcher",
]
10 changes: 10 additions & 0 deletions engine/clients/doris/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
DEFAULT_DORIS_DATABASE = "benchmark"
DEFAULT_DORIS_TABLE = "vectors"

# Mapping from internal distances to doris metric_type
DISTANCE_MAPPING = {
"l2": "l2_distance",
"dot": "inner_product",
# Cosine can be approximated by inner product if vectors normalized upstream
"cosine": "inner_product",
}
76 changes: 76 additions & 0 deletions engine/clients/doris/configure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from typing import Optional

import mysql.connector
from doris_vector_search import AuthOptions, DorisVectorClient
from mysql.connector import ProgrammingError

from benchmark.dataset import Dataset
from engine.base_client.configure import BaseConfigurator
from engine.base_client.distances import Distance
from engine.clients.doris.config import (
DEFAULT_DORIS_DATABASE,
DEFAULT_DORIS_TABLE,
DISTANCE_MAPPING,
)


class DorisConfigurator(BaseConfigurator):
SPARSE_VECTOR_SUPPORT = False

DISTANCE_MAPPING = {
Distance.L2: DISTANCE_MAPPING["l2"],
Distance.DOT: DISTANCE_MAPPING["dot"],
Distance.COSINE: DISTANCE_MAPPING["cosine"],
}

def __init__(self, host, collection_params: dict, connection_params: dict):
super().__init__(host, collection_params, connection_params)

database = collection_params.get("database", DEFAULT_DORIS_DATABASE)
auth = AuthOptions(
host=connection_params.get("host", host),
query_port=connection_params.get("query_port", 9030),
http_port=connection_params.get("http_port", 8030),
user=connection_params.get("user", "root"),
password=connection_params.get("password", ""),
)
# Ensure database exists before creating main client
try:
tmp_conn = mysql.connector.connect(
host=auth.host,
port=auth.query_port,
user=auth.user,
password=auth.password,
)
cursor = tmp_conn.cursor()
cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{database}`")
cursor.close()
tmp_conn.close()
except ProgrammingError:
# If we cannot create database, proceed and let actual client raise clearer error
pass
self.client = DorisVectorClient(database=database, auth_options=auth)

def clean(self):
table_name = self.collection_params.get("table_name", DEFAULT_DORIS_TABLE)
try:
self.client.drop_table(table_name)
except Exception:
# Table may not exist, ignore
pass

def recreate(self, dataset: Dataset, collection_params) -> Optional[dict]:
# Doris table and index are created lazily on first upload batch to infer schema
# Return execution params which depend on distance/metric mapping
return {}

def execution_params(self, distance, vector_size) -> dict:
metric = self.DISTANCE_MAPPING.get(distance)
# Provide search-related session variables tuning if needed
return {"metric_type": metric}

def delete_client(self):
try:
self.client.close()
except Exception:
pass
Loading