Skip to content

Commit

Permalink
Librarian (#304)
Browse files Browse the repository at this point in the history
  • Loading branch information
cybermaggedon authored Feb 11, 2025
1 parent e99c0ac commit a0bf236
Show file tree
Hide file tree
Showing 32 changed files with 922 additions and 66 deletions.
5 changes: 5 additions & 0 deletions templates/components/trustgraph.jsonnet
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
local base = import "base/base.jsonnet";
local images = import "values/images.jsonnet";
local url = import "values/url.jsonnet";
local minio = import "stores/minio.jsonnet";
local cassandra = import "stores/cassandra.jsonnet";

{

Expand Down Expand Up @@ -182,3 +184,6 @@ local url = import "values/url.jsonnet";

}

// Minio and Cassandra are used by the Librarian
+ minio + cassandra

5 changes: 4 additions & 1 deletion templates/generate
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@ class Generator:
self.templates.joinpath(filename),
self.resources.joinpath(dir, filename),
self.resources.joinpath(filename),
pathlib.Path(dir).joinpath(filename),
]
else:
candidates = [
self.templates.joinpath(filename),
pathlib.Path(dir).joinpath(filename),
pathlib.Path(filename),
]

try:
Expand Down Expand Up @@ -86,7 +89,7 @@ class Packager:

def __init__(self):
self.templates = pathlib.Path("./templates")
self.resources = pathlib.Path("./resources")
self.resources = pathlib.Path("./")

def process(
self, config, version="0.0.0", platform="docker-compose",
Expand Down
44 changes: 2 additions & 42 deletions templates/stores/milvus.jsonnet
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
local base = import "base/base.jsonnet";
local images = import "values/images.jsonnet";
local minio = import "stores/minio.jsonnet";

{
minio {

etcd +: {

Expand Down Expand Up @@ -47,47 +48,6 @@ local images = import "values/images.jsonnet";

},

mino +: {

create:: function(engine)

local vol = engine.volume("minio-data").with_size("20G");

local container =
engine.container("minio")
.with_image(images.minio)
.with_command([
"minio",
"server",
"/minio_data",
"--console-address",
":9001",
])
.with_environment({
MINIO_ROOT_USER: "minioadmin",
MINIO_ROOT_PASSWORD: "minioadmin",
})
.with_limits("0.5", "128M")
.with_reservations("0.25", "128M")
.with_port(9001, 9001, "api")
.with_volume_mount(vol, "/minio_data");

local containerSet = engine.containers(
"etcd", [ container ]
);

local service =
engine.service(containerSet)
.with_port(9001, 9001, "api");

engine.resources([
vol,
containerSet,
service,
])

},

milvus +: {

create:: function(engine)
Expand Down
49 changes: 49 additions & 0 deletions templates/stores/minio.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
local base = import "base/base.jsonnet";
local images = import "values/images.jsonnet";

{

minio +: {

create:: function(engine)

local vol = engine.volume("minio-data").with_size("20G");

local container =
engine.container("minio")
.with_image(images.minio)
.with_command([
"minio",
"server",
"/minio_data",
"--console-address",
":9001",
])
.with_environment({
MINIO_ROOT_USER: "minioadmin",
MINIO_ROOT_PASSWORD: "minioadmin",
})
.with_limits("0.5", "128M")
.with_reservations("0.25", "128M")
.with_port(9000, 9000, "api")
.with_port(9001, 9001, "console")
.with_volume_mount(vol, "/minio_data");

local containerSet = engine.containers(
"etcd", [ container ]
);

local service =
engine.service(containerSet)
.with_port(9000, 9000, "api")
.with_port(9001, 9001, "console");

engine.resources([
vol,
containerSet,
service,
])

},

}
2 changes: 1 addition & 1 deletion templates/values/images.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ local version = import "version.jsonnet";
pulsar: "docker.io/apachepulsar/pulsar:3.3.1",
pulsar_manager: "docker.io/apachepulsar/pulsar-manager:v0.4.0",
etcd: "quay.io/coreos/etcd:v3.5.15",
minio: "docker.io/minio/minio:RELEASE.2024-08-17T01-24-54Z",
minio: "docker.io/minio/minio:RELEASE.2025-02-03T21-03-04Z",
milvus: "docker.io/milvusdb/milvus:v2.4.9",
prometheus: "docker.io/prom/prometheus:v2.53.2",
grafana: "docker.io/grafana/grafana:11.1.4",
Expand Down
2 changes: 2 additions & 0 deletions trustgraph-base/trustgraph/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
from . consumer import Consumer
from . producer import Producer
from . consumer_producer import ConsumerProducer
from . publisher import Publisher
from . subscriber import Subscriber

9 changes: 9 additions & 0 deletions trustgraph-base/trustgraph/base/base_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ def __init__(self, **params):
})

pulsar_host = params.get("pulsar_host", self.default_pulsar_host)
pulsar_listener = params.get("pulsar_listener", None)
log_level = params.get("log_level", LogLevel.INFO)

self.pulsar_host = pulsar_host

self.client = pulsar.Client(
pulsar_host,
listener_name=pulsar_listener,
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
)

self.pulsar_listener = pulsar_listener

def __del__(self):

if hasattr(self, "client"):
Expand All @@ -52,6 +56,11 @@ def add_args(parser):
help=f'Pulsar host (default: {__class__.default_pulsar_host})',
)

parser.add_argument(
'--pulsar-listener',
help=f'Pulsar listener (default: none)',
)

parser.add_argument(
'-l', '--log-level',
type=LogLevel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@ def __init__(self, pulsar_host, topic, schema=None, max_size=10,
self.q = queue.Queue(maxsize=max_size)
self.chunking_enabled = chunking_enabled
self.listener_name = listener
self.running = True

def start(self):
self.task = threading.Thread(target=self.run)
self.task.start()

def stop(self):
self.running = False

def join(self):
self.stop()
self.task.join()

def run(self):

while True:
while self.running:

try:

Expand All @@ -35,9 +43,12 @@ def run(self):
chunking_enabled=self.chunking_enabled,
)

while True:
while self.running:

id, item = self.q.get()
try:
id, item = self.q.get(timeout=0.5)
except queue.Empty:
continue

if id:
producer.send(item, { "id": id })
Expand All @@ -52,3 +63,5 @@ def run(self):

def send(self, id, msg):
self.q.put((id, msg))


Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@ def __init__(self, pulsar_host, topic, subscription, consumer_name,
self.max_size = max_size
self.lock = threading.Lock()
self.listener_name = listener
self.running = True

def start(self):
self.task = threading.Thread(target=self.run)
self.task.start()

def stop(self):
self.running = False

def join(self):
self.task.join()

def run(self):

while True:
while self.running:

try:

Expand All @@ -41,7 +48,7 @@ def run(self):
schema=self.schema,
)

while True:
while self.running:

msg = consumer.receive()

Expand All @@ -59,12 +66,14 @@ def run(self):

if id in self.q:
try:
# FIXME: Timeout means data goes missing
self.q[id].put(value, timeout=0.5)
except:
pass

for q in self.full.values():
try:
# FIXME: Timeout means data goes missing
q.put(value, timeout=0.5)
except:
pass
Expand Down
3 changes: 3 additions & 0 deletions trustgraph-base/trustgraph/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ class LlmError(Exception):
class ParseError(Exception):
pass

class RequestError(Exception):
pass

1 change: 1 addition & 0 deletions trustgraph-base/trustgraph/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
from . metadata import *
from . agent import *
from . lookup import *
from . library import *


56 changes: 56 additions & 0 deletions trustgraph-base/trustgraph/schema/library.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

from pulsar.schema import Record, Bytes, String, Array
from . types import Triple
from . topic import topic
from . types import Error
from . metadata import Metadata
from . documents import Document, TextDocument

# add(Metadata, Bytes) : error?
# copy(id, user, collection)
# move(id, user, collection)
# delete(id)
# get(id) : Bytes
# reindex(id)
# list(user, collection) : id[]
# info(id[]) : DocumentInfo[]
# search(<key,op,value>[]) : id[]

class DocumentPackage(Record):
metadata = Array(Triple())
document = Bytes()
kind = String()
user = String()
collection = String()

class DocumentInfo(Record):
metadata = Array(Triple())
kind = String()
user = String()
collection = String()

class Criteria(Record):
key = String()
value = String()
operator = String()

class LibrarianRequest(Record):
operation = String()
id = String()
document = DocumentPackage()
user = String()
collection = String()
criteria = Array(Criteria())

class LibrarianResponse(Record):
error = Error()
document = DocumentPackage()
info = Array(DocumentInfo())

librarian_request_queue = topic(
'librarian', kind='non-persistent', namespace='request'
)
librarian_response_queue = topic(
'librarian', kind='non-persistent', namespace='response',
)

6 changes: 6 additions & 0 deletions trustgraph-flow/scripts/librarian
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python3

from trustgraph.librarian import run

run()

4 changes: 3 additions & 1 deletion trustgraph-flow/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"langchain-community",
"langchain-core",
"langchain-text-splitters",
"minio",
"neo4j",
"ollama",
"openai",
Expand Down Expand Up @@ -78,8 +79,8 @@
"scripts/de-write-qdrant",
"scripts/document-embeddings",
"scripts/document-rag",
"scripts/embeddings-ollama",
"scripts/embeddings-fastembed",
"scripts/embeddings-ollama",
"scripts/ge-query-milvus",
"scripts/ge-query-pinecone",
"scripts/ge-query-qdrant",
Expand All @@ -91,6 +92,7 @@
"scripts/kg-extract-definitions",
"scripts/kg-extract-relationships",
"scripts/kg-extract-topics",
"scripts/librarian",
"scripts/metering",
"scripts/object-extract-row",
"scripts/oe-write-milvus",
Expand Down
Loading

0 comments on commit a0bf236

Please sign in to comment.