From 3589b8edc878b29505232ee11daba7905227fe3b Mon Sep 17 00:00:00 2001 From: adrianad Date: Wed, 9 Jul 2025 13:35:52 +0200 Subject: [PATCH 01/22] Added total number of nodes and edges --- api/apps/kb_app.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index 43e6e4eac3e..356a9603ae2 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -349,11 +349,15 @@ def knowledge_graph(kb_id): obj[ty] = content_json if "nodes" in obj["graph"]: + total_nodes = len(obj["graph"]["nodes"]) obj["graph"]["nodes"] = sorted(obj["graph"]["nodes"], key=lambda x: x.get("pagerank", 0), reverse=True)[:256] if "edges" in obj["graph"]: + total_edges = len(obj["graph"]["edges"]) node_id_set = { o["id"] for o in obj["graph"]["nodes"] } filtered_edges = [o for o in obj["graph"]["edges"] if o["source"] != o["target"] and o["source"] in node_id_set and o["target"] in node_id_set] obj["graph"]["edges"] = sorted(filtered_edges, key=lambda x: x.get("weight", 0), reverse=True)[:128] + obj["graph"]["total_edges"] = total_edges + obj["graph"]["total_nodes"] = total_nodes return get_json_result(data=obj) @manager.route('//knowledge_graph', methods=['DELETE']) # noqa: F821 From 6afd75f2b29593c81f8b4dcfec0847765465f46a Mon Sep 17 00:00:00 2001 From: adrianad Date: Wed, 9 Jul 2025 14:20:40 +0200 Subject: [PATCH 02/22] Added Graph statistics to the frontend --- web/src/interfaces/database/knowledge.ts | 7 ++++- .../components/knowledge-graph/index.tsx | 27 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/web/src/interfaces/database/knowledge.ts b/web/src/interfaces/database/knowledge.ts index 011f8a9eb71..a13fa50d7c1 100644 --- a/web/src/interfaces/database/knowledge.ts +++ b/web/src/interfaces/database/knowledge.ts @@ -155,6 +155,11 @@ export interface INextTestingResult { export type IRenameTag = { fromTag: string; toTag: string }; export interface IKnowledgeGraph { - graph: Record; + graph: { + nodes?: any[]; + edges?: any[]; + total_nodes?: number; + total_edges?: number; + }; mind_map: TreeData; } diff --git a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx index 1c33438e531..6b24990d2ca 100644 --- a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx +++ b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx @@ -12,6 +12,11 @@ const KnowledgeGraph: React.FC = () => { const { t } = useTranslation(); const { handleDeleteKnowledgeGraph } = useDeleteKnowledgeGraph(); + const totalNodes = data?.graph?.total_nodes || 0; + const totalEdges = data?.graph?.total_edges || 0; + const displayedNodes = data?.graph?.nodes?.length || 0; + const displayedEdges = data?.graph?.edges?.length || 0; + return (
@@ -23,6 +28,28 @@ const KnowledgeGraph: React.FC = () => { {t('common.delete')} + + {/* Graph Statistics */} +
+
+ {t('knowledgeGraph.statistics', 'Graph Statistics')} +
+
+
+ {t('knowledgeGraph.nodes', 'Nodes')}: + + {displayedNodes.toLocaleString()} / {totalNodes.toLocaleString()} + +
+
+ {t('knowledgeGraph.edges', 'Edges')}: + + {displayedEdges.toLocaleString()} / {totalEdges.toLocaleString()} + +
+
+
+
); From e61d7fc634febde39b4438c8f1b50fafd1326d98 Mon Sep 17 00:00:00 2001 From: adrianad Date: Wed, 9 Jul 2025 14:26:34 +0200 Subject: [PATCH 03/22] Added translations --- web/src/locales/de.ts | 5 +++++ web/src/locales/en.ts | 5 +++++ web/src/locales/es.ts | 5 +++++ web/src/locales/id.ts | 5 +++++ web/src/locales/ja.ts | 5 +++++ web/src/locales/pt-br.ts | 5 +++++ web/src/locales/vi.ts | 5 +++++ web/src/locales/zh-traditional.ts | 5 +++++ web/src/locales/zh.ts | 5 +++++ 9 files changed, 45 insertions(+) diff --git a/web/src/locales/de.ts b/web/src/locales/de.ts index 3960e430430..3741eef3f25 100644 --- a/web/src/locales/de.ts +++ b/web/src/locales/de.ts @@ -1240,5 +1240,10 @@ export default { knowledge: 'Wissen', chat: 'Chat', }, + knowledgeGraph: { + statistics: 'Graphstatistiken', + nodes: 'Knoten', + edges: 'Kanten', + }, }, }; diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index dec8c57f076..27319a4fa3a 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -1303,5 +1303,10 @@ This delimiter is used to split the input text into several text pieces echo of }, }, }, + knowledgeGraph: { + statistics: 'Graph Statistics', + nodes: 'Nodes', + edges: 'Edges', + }, }, }; diff --git a/web/src/locales/es.ts b/web/src/locales/es.ts index 274dbbbb11e..503601b8e46 100644 --- a/web/src/locales/es.ts +++ b/web/src/locales/es.ts @@ -867,5 +867,10 @@ export default { knowledge: 'Conocimiento', chat: 'Chat', }, + knowledgeGraph: { + statistics: 'Estadísticas del Grafo', + nodes: 'Nodos', + edges: 'Aristas', + }, }, }; diff --git a/web/src/locales/id.ts b/web/src/locales/id.ts index 60cff62bd2e..a08ffba3308 100644 --- a/web/src/locales/id.ts +++ b/web/src/locales/id.ts @@ -1029,5 +1029,10 @@ export default { knowledge: 'pengetahuan', chat: 'obrolan', }, + knowledgeGraph: { + statistics: 'Statistik Grafik', + nodes: 'Node', + edges: 'Edge', + }, }, }; diff --git a/web/src/locales/ja.ts b/web/src/locales/ja.ts index 0a044b74217..ba274ad7e17 100644 --- a/web/src/locales/ja.ts +++ b/web/src/locales/ja.ts @@ -1098,5 +1098,10 @@ export default { knowledge: 'ナレッジ', chat: 'チャット', }, + knowledgeGraph: { + statistics: 'グラフ統計', + nodes: 'ノード', + edges: 'エッジ', + }, }, }; diff --git a/web/src/locales/pt-br.ts b/web/src/locales/pt-br.ts index c0144d2ab2e..971e9c7c166 100644 --- a/web/src/locales/pt-br.ts +++ b/web/src/locales/pt-br.ts @@ -1140,5 +1140,10 @@ export default { knowledge: 'conhecimento', chat: 'bate-papo', }, + knowledgeGraph: { + statistics: 'Estatísticas do Grafo', + nodes: 'Nós', + edges: 'Arestas', + }, }, }; diff --git a/web/src/locales/vi.ts b/web/src/locales/vi.ts index 17f66078314..c9e9b65a22c 100644 --- a/web/src/locales/vi.ts +++ b/web/src/locales/vi.ts @@ -1167,5 +1167,10 @@ export default { knowledge: 'kiến thức', chat: 'trò chuyện', }, + knowledgeGraph: { + statistics: 'Thống kê Đồ thị', + nodes: 'Nút', + edges: 'Cạnh', + }, }, }; diff --git a/web/src/locales/zh-traditional.ts b/web/src/locales/zh-traditional.ts index 3e85085fd3b..fe2ba4d6c83 100644 --- a/web/src/locales/zh-traditional.ts +++ b/web/src/locales/zh-traditional.ts @@ -1184,5 +1184,10 @@ export default { knowledge: '知識', chat: '聊天', }, + knowledgeGraph: { + statistics: '圖統計', + nodes: '節點', + edges: '邊', + }, }, }; diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts index 9bb76d91372..dd042672a9c 100644 --- a/web/src/locales/zh.ts +++ b/web/src/locales/zh.ts @@ -1263,5 +1263,10 @@ General:实体和关系提取提示来自 GitHub - microsoft/graphrag:基于 }, }, }, + knowledgeGraph: { + statistics: '图统计', + nodes: '节点', + edges: '边', + }, }, }; From 08f72cecd8a7623aab8ad692a9eb57aa7f8a9da3 Mon Sep 17 00:00:00 2001 From: adrianad Date: Wed, 9 Jul 2025 14:52:23 +0200 Subject: [PATCH 04/22] Added API Endpoints for detect communites and entity resolution --- api/apps/kb_app.py | 268 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 268 insertions(+) diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index 356a9603ae2..f995e7c8c92 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -15,6 +15,10 @@ # import json import os +import asyncio +import logging +import networkx as nx +from typing import Dict, Any from flask import request from flask_login import login_required, current_user @@ -35,6 +39,9 @@ from api.constants import DATASET_NAME_LIMIT from rag.settings import PAGERANK_FLD from rag.utils.storage_factory import STORAGE_IMPL +from graphrag.entity_resolution import EntityResolution +from graphrag.general.community_reports_extractor import CommunityReportsExtractor +from rag.llm import LLMBundle, LLMType @manager.route('/create', methods=['post']) # noqa: F821 @@ -373,3 +380,264 @@ def delete_knowledge_graph(kb_id): settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), kb_id) return get_json_result(data=True) + + +@manager.route('//knowledge_graph/resolve_entities', methods=['POST']) # noqa: F821 +@login_required +def resolve_entities(kb_id): + if not KnowledgebaseService.accessible(kb_id, current_user.id): + return get_json_result( + data=False, + message='No authorization.', + code=settings.RetCode.AUTHENTICATION_ERROR + ) + + try: + _, kb = KnowledgebaseService.get_by_id(kb_id) + + # Check if knowledge graph exists + if not settings.docStoreConn.indexExist(search.index_name(kb.tenant_id), kb_id): + return get_json_result( + data=False, + message='Knowledge graph not found.', + code=settings.RetCode.DATA_ERROR + ) + + # Retrieve existing knowledge graph + req = { + "kb_id": [kb_id], + "knowledge_graph_kwd": ["graph"] + } + + sres = settings.retrievaler.search(req, search.index_name(kb.tenant_id), [kb_id]) + if not len(sres.ids): + return get_json_result( + data=False, + message='Knowledge graph not found.', + code=settings.RetCode.DATA_ERROR + ) + + # Get graph data + graph_data = None + for id in sres.ids[:1]: + try: + graph_data = json.loads(sres.field[id]["content_with_weight"]) + break + except Exception: + continue + + if not graph_data or "nodes" not in graph_data or "edges" not in graph_data: + return get_json_result( + data=False, + message='Invalid knowledge graph data.', + code=settings.RetCode.DATA_ERROR + ) + + # Create NetworkX graph from stored data + graph = nx.Graph() + + # Add nodes + for node in graph_data["nodes"]: + graph.add_node(node["id"], **{k: v for k, v in node.items() if k != "id"}) + + # Add edges + for edge in graph_data["edges"]: + graph.add_edge(edge["source"], edge["target"], **{k: v for k, v in edge.items() if k not in ["source", "target"]}) + + # Get all nodes as subgraph nodes for entity resolution + subgraph_nodes = set(graph.nodes()) + + # Run entity resolution asynchronously + def progress_callback(msg=""): + logging.info(f"Entity resolution progress: {msg}") + + async def run_entity_resolution(): + chat_model = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) + entity_resolver = EntityResolution(chat_model) + + result = await entity_resolver(graph, subgraph_nodes, callback=progress_callback) + return result + + # Execute async function + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + resolution_result = loop.run_until_complete(run_entity_resolution()) + finally: + loop.close() + + # Convert updated graph back to JSON format + updated_nodes = [] + for node_id, node_data in resolution_result.graph.nodes(data=True): + node_dict = {"id": node_id, **node_data} + updated_nodes.append(node_dict) + + updated_edges = [] + for source, target, edge_data in resolution_result.graph.edges(data=True): + edge_dict = {"source": source, "target": target, **edge_data} + updated_edges.append(edge_dict) + + updated_graph_data = { + "nodes": updated_nodes, + "edges": updated_edges + } + + # Update stored knowledge graph + settings.docStoreConn.update( + {"knowledge_graph_kwd": ["graph"], "kb_id": [kb_id]}, + {"content_with_weight": json.dumps(updated_graph_data)}, + search.index_name(kb.tenant_id), + kb_id + ) + + return get_json_result( + data=True, + message='Entity resolution completed successfully.', + code=settings.RetCode.SUCCESS + ) + + except Exception as e: + logging.error(f"Entity resolution failed: {str(e)}") + return get_json_result( + data=False, + message=f'Entity resolution failed: {str(e)}', + code=settings.RetCode.SERVER_ERROR + ) + + +@manager.route('//knowledge_graph/detect_communities', methods=['POST']) # noqa: F821 +@login_required +def detect_communities(kb_id): + if not KnowledgebaseService.accessible(kb_id, current_user.id): + return get_json_result( + data=False, + message='No authorization.', + code=settings.RetCode.AUTHENTICATION_ERROR + ) + + try: + _, kb = KnowledgebaseService.get_by_id(kb_id) + + # Check if knowledge graph exists + if not settings.docStoreConn.indexExist(search.index_name(kb.tenant_id), kb_id): + return get_json_result( + data=False, + message='Knowledge graph not found.', + code=settings.RetCode.DATA_ERROR + ) + + # Retrieve existing knowledge graph + req = { + "kb_id": [kb_id], + "knowledge_graph_kwd": ["graph"] + } + + sres = settings.retrievaler.search(req, search.index_name(kb.tenant_id), [kb_id]) + if not len(sres.ids): + return get_json_result( + data=False, + message='Knowledge graph not found.', + code=settings.RetCode.DATA_ERROR + ) + + # Get graph data + graph_data = None + for id in sres.ids[:1]: + try: + graph_data = json.loads(sres.field[id]["content_with_weight"]) + break + except Exception: + continue + + if not graph_data or "nodes" not in graph_data or "edges" not in graph_data: + return get_json_result( + data=False, + message='Invalid knowledge graph data.', + code=settings.RetCode.DATA_ERROR + ) + + # Create NetworkX graph from stored data + graph = nx.Graph() + + # Add nodes + for node in graph_data["nodes"]: + graph.add_node(node["id"], **{k: v for k, v in node.items() if k != "id"}) + + # Add edges + for edge in graph_data["edges"]: + graph.add_edge(edge["source"], edge["target"], **{k: v for k, v in edge.items() if k not in ["source", "target"]}) + + # Set node degrees for community detection + for node_degree in graph.degree: + graph.nodes[str(node_degree[0])]["rank"] = int(node_degree[1]) + + # Run community detection asynchronously + def progress_callback(msg=""): + logging.info(f"Community detection progress: {msg}") + + async def run_community_detection(): + chat_model = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) + community_extractor = CommunityReportsExtractor(chat_model) + + result = await community_extractor(graph, callback=progress_callback) + return result + + # Execute async function + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + community_result = loop.run_until_complete(run_community_detection()) + finally: + loop.close() + + # Convert updated graph back to JSON format + updated_nodes = [] + for node_id, node_data in graph.nodes(data=True): + node_dict = {"id": node_id, **node_data} + updated_nodes.append(node_dict) + + updated_edges = [] + for source, target, edge_data in graph.edges(data=True): + edge_dict = {"source": source, "target": target, **edge_data} + updated_edges.append(edge_dict) + + updated_graph_data = { + "nodes": updated_nodes, + "edges": updated_edges + } + + # Update stored knowledge graph + settings.docStoreConn.update( + {"knowledge_graph_kwd": ["graph"], "kb_id": [kb_id]}, + {"content_with_weight": json.dumps(updated_graph_data)}, + search.index_name(kb.tenant_id), + kb_id + ) + + # Store community reports + community_reports = { + "structured_output": community_result.structured_output, + "reports": community_result.output + } + + # Store community reports separately + settings.docStoreConn.update( + {"knowledge_graph_kwd": ["community"], "kb_id": [kb_id]}, + {"content_with_weight": json.dumps(community_reports)}, + search.index_name(kb.tenant_id), + kb_id + ) + + return get_json_result( + data=True, + message=f'Community detection completed successfully. Found {len(community_result.structured_output)} communities.', + code=settings.RetCode.SUCCESS + ) + + except Exception as e: + logging.error(f"Community detection failed: {str(e)}") + return get_json_result( + data=False, + message=f'Community detection failed: {str(e)}', + code=settings.RetCode.SERVER_ERROR + ) From 27cbb9d94ac0b84c5bdbcd98ad996cef6c328f2d Mon Sep 17 00:00:00 2001 From: adrianad Date: Wed, 9 Jul 2025 14:58:17 +0200 Subject: [PATCH 05/22] Added buttons and progress bars for community detection and entity resolution --- web/src/hooks/knowledge-hooks.ts | 52 +++++++++++++++++ web/src/locales/de.ts | 6 ++ web/src/locales/en.ts | 6 ++ web/src/locales/es.ts | 6 ++ web/src/locales/id.ts | 6 ++ web/src/locales/ja.ts | 6 ++ web/src/locales/pt-br.ts | 6 ++ web/src/locales/vi.ts | 6 ++ web/src/locales/zh-traditional.ts | 6 ++ web/src/locales/zh.ts | 6 ++ .../components/knowledge-graph/index.tsx | 56 +++++++++++++++++-- web/src/services/knowledge-service.ts | 8 +++ web/src/utils/api.ts | 4 ++ 13 files changed, 168 insertions(+), 6 deletions(-) diff --git a/web/src/hooks/knowledge-hooks.ts b/web/src/hooks/knowledge-hooks.ts index f4cfd125f91..300fc6126d6 100644 --- a/web/src/hooks/knowledge-hooks.ts +++ b/web/src/hooks/knowledge-hooks.ts @@ -13,6 +13,8 @@ import kbService, { listTag, removeTag, renameTag, + resolveEntities, + detectCommunities, } from '@/services/knowledge-service'; import { useInfiniteQuery, @@ -495,3 +497,53 @@ export const useRemoveKnowledgeGraph = () => { return { data, loading, removeKnowledgeGraph: mutateAsync }; }; + +export const useResolveEntities = () => { + const knowledgeBaseId = useKnowledgeBaseId(); + + const queryClient = useQueryClient(); + const { + data, + isPending: loading, + mutateAsync, + } = useMutation({ + mutationKey: ['resolveEntities'], + mutationFn: async () => { + const { data } = await resolveEntities(knowledgeBaseId); + if (data.code === 0) { + message.success(i18n.t(`knowledgeGraph.entityResolutionSuccess`, 'Entity resolution completed successfully')); + queryClient.invalidateQueries({ + queryKey: ['fetchKnowledgeGraph'], + }); + } + return data; + }, + }); + + return { data, loading, resolveEntities: mutateAsync }; +}; + +export const useDetectCommunities = () => { + const knowledgeBaseId = useKnowledgeBaseId(); + + const queryClient = useQueryClient(); + const { + data, + isPending: loading, + mutateAsync, + } = useMutation({ + mutationKey: ['detectCommunities'], + mutationFn: async () => { + const { data } = await detectCommunities(knowledgeBaseId); + if (data.code === 0) { + message.success(i18n.t(`knowledgeGraph.communityDetectionSuccess`, 'Community detection completed successfully')); + queryClient.invalidateQueries({ + queryKey: ['fetchKnowledgeGraph'], + }); + } + return data; + }, + }); + + return { data, loading, detectCommunities: mutateAsync }; +}; diff --git a/web/src/locales/de.ts b/web/src/locales/de.ts index 3741eef3f25..498b5cd3b08 100644 --- a/web/src/locales/de.ts +++ b/web/src/locales/de.ts @@ -1244,6 +1244,12 @@ export default { statistics: 'Graphstatistiken', nodes: 'Knoten', edges: 'Kanten', + resolveEntities: 'Entitäten auflösen', + detectCommunities: 'Gemeinschaften erkennen', + resolving: 'Auflösung läuft...', + detecting: 'Erkennung läuft...', + entityResolutionSuccess: 'Entitätsauflösung erfolgreich abgeschlossen', + communityDetectionSuccess: 'Gemeinschaftserkennung erfolgreich abgeschlossen', }, }, }; diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index 27319a4fa3a..746637be3a9 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -1307,6 +1307,12 @@ This delimiter is used to split the input text into several text pieces echo of statistics: 'Graph Statistics', nodes: 'Nodes', edges: 'Edges', + resolveEntities: 'Resolve Entities', + detectCommunities: 'Detect Communities', + resolving: 'Resolving...', + detecting: 'Detecting...', + entityResolutionSuccess: 'Entity resolution completed successfully', + communityDetectionSuccess: 'Community detection completed successfully', }, }, }; diff --git a/web/src/locales/es.ts b/web/src/locales/es.ts index 503601b8e46..daa60d054aa 100644 --- a/web/src/locales/es.ts +++ b/web/src/locales/es.ts @@ -871,6 +871,12 @@ export default { statistics: 'Estadísticas del Grafo', nodes: 'Nodos', edges: 'Aristas', + resolveEntities: 'Resolver Entidades', + detectCommunities: 'Detectar Comunidades', + resolving: 'Resolviendo...', + detecting: 'Detectando...', + entityResolutionSuccess: 'Resolución de entidades completada exitosamente', + communityDetectionSuccess: 'Detección de comunidades completada exitosamente', }, }, }; diff --git a/web/src/locales/id.ts b/web/src/locales/id.ts index a08ffba3308..def2759f34b 100644 --- a/web/src/locales/id.ts +++ b/web/src/locales/id.ts @@ -1033,6 +1033,12 @@ export default { statistics: 'Statistik Grafik', nodes: 'Node', edges: 'Edge', + resolveEntities: 'Selesaikan Entitas', + detectCommunities: 'Deteksi Komunitas', + resolving: 'Menyelesaikan...', + detecting: 'Mendeteksi...', + entityResolutionSuccess: 'Penyelesaian entitas berhasil diselesaikan', + communityDetectionSuccess: 'Deteksi komunitas berhasil diselesaikan', }, }, }; diff --git a/web/src/locales/ja.ts b/web/src/locales/ja.ts index ba274ad7e17..08a8b8ed67b 100644 --- a/web/src/locales/ja.ts +++ b/web/src/locales/ja.ts @@ -1102,6 +1102,12 @@ export default { statistics: 'グラフ統計', nodes: 'ノード', edges: 'エッジ', + resolveEntities: 'エンティティ解決', + detectCommunities: 'コミュニティ検出', + resolving: '解決中...', + detecting: '検出中...', + entityResolutionSuccess: 'エンティティ解決が正常に完了しました', + communityDetectionSuccess: 'コミュニティ検出が正常に完了しました', }, }, }; diff --git a/web/src/locales/pt-br.ts b/web/src/locales/pt-br.ts index 971e9c7c166..ae4372ebed9 100644 --- a/web/src/locales/pt-br.ts +++ b/web/src/locales/pt-br.ts @@ -1144,6 +1144,12 @@ export default { statistics: 'Estatísticas do Grafo', nodes: 'Nós', edges: 'Arestas', + resolveEntities: 'Resolver Entidades', + detectCommunities: 'Detectar Comunidades', + resolving: 'Resolvendo...', + detecting: 'Detectando...', + entityResolutionSuccess: 'Resolução de entidades concluída com sucesso', + communityDetectionSuccess: 'Detecção de comunidades concluída com sucesso', }, }, }; diff --git a/web/src/locales/vi.ts b/web/src/locales/vi.ts index c9e9b65a22c..34a6260a53d 100644 --- a/web/src/locales/vi.ts +++ b/web/src/locales/vi.ts @@ -1171,6 +1171,12 @@ export default { statistics: 'Thống kê Đồ thị', nodes: 'Nút', edges: 'Cạnh', + resolveEntities: 'Giải quyết Thực thể', + detectCommunities: 'Phát hiện Cộng đồng', + resolving: 'Đang giải quyết...', + detecting: 'Đang phát hiện...', + entityResolutionSuccess: 'Giải quyết thực thể hoàn thành thành công', + communityDetectionSuccess: 'Phát hiện cộng đồng hoàn thành thành công', }, }, }; diff --git a/web/src/locales/zh-traditional.ts b/web/src/locales/zh-traditional.ts index fe2ba4d6c83..e9cd5235d37 100644 --- a/web/src/locales/zh-traditional.ts +++ b/web/src/locales/zh-traditional.ts @@ -1188,6 +1188,12 @@ export default { statistics: '圖統計', nodes: '節點', edges: '邊', + resolveEntities: '解析實體', + detectCommunities: '檢測社區', + resolving: '解析中...', + detecting: '檢測中...', + entityResolutionSuccess: '實體解析完成成功', + communityDetectionSuccess: '社區檢測完成成功', }, }, }; diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts index dd042672a9c..a407ac9a3a0 100644 --- a/web/src/locales/zh.ts +++ b/web/src/locales/zh.ts @@ -1267,6 +1267,12 @@ General:实体和关系提取提示来自 GitHub - microsoft/graphrag:基于 statistics: '图统计', nodes: '节点', edges: '边', + resolveEntities: '解析实体', + detectCommunities: '检测社区', + resolving: '解析中...', + detecting: '检测中...', + entityResolutionSuccess: '实体解析完成成功', + communityDetectionSuccess: '社区检测完成成功', }, }, }; diff --git a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx index 6b24990d2ca..208c2d018f9 100644 --- a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx +++ b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx @@ -1,7 +1,7 @@ import { ConfirmDeleteDialog } from '@/components/confirm-delete-dialog'; import { Button } from '@/components/ui/button'; -import { useFetchKnowledgeGraph } from '@/hooks/knowledge-hooks'; -import { Trash2 } from 'lucide-react'; +import { useFetchKnowledgeGraph, useResolveEntities, useDetectCommunities } from '@/hooks/knowledge-hooks'; +import { Trash2, Network, Users } from 'lucide-react'; import React from 'react'; import { useTranslation } from 'react-i18next'; import ForceGraph from './force-graph'; @@ -11,23 +11,67 @@ const KnowledgeGraph: React.FC = () => { const { data } = useFetchKnowledgeGraph(); const { t } = useTranslation(); const { handleDeleteKnowledgeGraph } = useDeleteKnowledgeGraph(); + const { resolveEntities, loading: resolvingEntities } = useResolveEntities(); + const { detectCommunities, loading: detectingCommunities } = useDetectCommunities(); const totalNodes = data?.graph?.total_nodes || 0; const totalEdges = data?.graph?.total_edges || 0; const displayedNodes = data?.graph?.nodes?.length || 0; const displayedEdges = data?.graph?.edges?.length || 0; + const handleResolveEntities = async () => { + try { + await resolveEntities(); + } catch (error) { + console.error('Entity resolution failed:', error); + } + }; + + const handleDetectCommunities = async () => { + try { + await detectCommunities(); + } catch (error) { + console.error('Community detection failed:', error); + } + }; + return (
- + {/* Action buttons */} +
+ + - + + + + +
{/* Graph Statistics */}
diff --git a/web/src/services/knowledge-service.ts b/web/src/services/knowledge-service.ts index f5383b63b21..cb77f27027b 100644 --- a/web/src/services/knowledge-service.ts +++ b/web/src/services/knowledge-service.ts @@ -182,6 +182,14 @@ export function deleteKnowledgeGraph(knowledgeId: string) { return request.delete(api.getKnowledgeGraph(knowledgeId)); } +export function resolveEntities(knowledgeId: string) { + return request.post(api.resolveEntities(knowledgeId)); +} + +export function detectCommunities(knowledgeId: string) { + return request.post(api.detectCommunities(knowledgeId)); +} + export const listDataset = ( params?: IFetchKnowledgeListRequestParams, body?: IFetchKnowledgeListRequestBody, diff --git a/web/src/utils/api.ts b/web/src/utils/api.ts index 42a098f891c..f95e3904c77 100644 --- a/web/src/utils/api.ts +++ b/web/src/utils/api.ts @@ -43,6 +43,10 @@ export default { get_kb_detail: `${api_host}/kb/detail`, getKnowledgeGraph: (knowledgeId: string) => `${api_host}/kb/${knowledgeId}/knowledge_graph`, + resolveEntities: (knowledgeId: string) => + `${api_host}/kb/${knowledgeId}/knowledge_graph/resolve_entities`, + detectCommunities: (knowledgeId: string) => + `${api_host}/kb/${knowledgeId}/knowledge_graph/detect_communities`, // tags listTag: (knowledgeId: string) => `${api_host}/kb/${knowledgeId}/tags`, From 76a731d82fc6949854b2e0d47e7dcca788f6231d Mon Sep 17 00:00:00 2001 From: adrianad Date: Wed, 9 Jul 2025 16:13:46 +0200 Subject: [PATCH 06/22] Fixing import --- api/apps/kb_app.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index f995e7c8c92..2e2b8450b9c 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -41,7 +41,8 @@ from rag.utils.storage_factory import STORAGE_IMPL from graphrag.entity_resolution import EntityResolution from graphrag.general.community_reports_extractor import CommunityReportsExtractor -from rag.llm import LLMBundle, LLMType +from api.db.services.llm_service import LLMBundle +from api.db import LLMType @manager.route('/create', methods=['post']) # noqa: F821 From 09882040b1420d00eb3aa7e78ad567a9570bb5c8 Mon Sep 17 00:00:00 2001 From: adrianad Date: Wed, 9 Jul 2025 16:40:42 +0200 Subject: [PATCH 07/22] Community detection gets started --- api/apps/kb_app.py | 95 ++++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 45 deletions(-) diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index 2e2b8450b9c..b0112e56a31 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -15,9 +15,9 @@ # import json import os -import asyncio import logging import networkx as nx +import trio from typing import Dict, Any from flask import request @@ -39,8 +39,7 @@ from api.constants import DATASET_NAME_LIMIT from rag.settings import PAGERANK_FLD from rag.utils.storage_factory import STORAGE_IMPL -from graphrag.entity_resolution import EntityResolution -from graphrag.general.community_reports_extractor import CommunityReportsExtractor +from graphrag.general.index import resolve_entities, extract_community from api.db.services.llm_service import LLMBundle from api.db import LLMType @@ -445,36 +444,48 @@ def resolve_entities(kb_id): for edge in graph_data["edges"]: graph.add_edge(edge["source"], edge["target"], **{k: v for k, v in edge.items() if k not in ["source", "target"]}) + # Set source_id for the graph (required by GraphRAG functions) + graph.graph["source_id"] = ["api_call"] + # Get all nodes as subgraph nodes for entity resolution subgraph_nodes = set(graph.nodes()) - # Run entity resolution asynchronously + # Run entity resolution using the existing GraphRAG functions def progress_callback(msg=""): logging.info(f"Entity resolution progress: {msg}") async def run_entity_resolution(): chat_model = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) - entity_resolver = EntityResolution(chat_model) + embedding_model = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=None, lang=kb.language) + + # Get all nodes as subgraph nodes for entity resolution + subgraph_nodes = set(graph.nodes()) + + # Call the existing resolve_entities function + await resolve_entities( + graph=graph, + subgraph_nodes=subgraph_nodes, + tenant_id=kb.tenant_id, + kb_id=kb_id, + doc_id="api_call", # Use placeholder since this is a manual API call + llm_bdl=chat_model, + embed_bdl=embedding_model, + callback=progress_callback + ) - result = await entity_resolver(graph, subgraph_nodes, callback=progress_callback) - return result + return graph - # Execute async function - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - resolution_result = loop.run_until_complete(run_entity_resolution()) - finally: - loop.close() + # Execute the async function using trio + updated_graph = trio.run(run_entity_resolution) # Convert updated graph back to JSON format updated_nodes = [] - for node_id, node_data in resolution_result.graph.nodes(data=True): + for node_id, node_data in updated_graph.nodes(data=True): node_dict = {"id": node_id, **node_data} updated_nodes.append(node_dict) updated_edges = [] - for source, target, edge_data in resolution_result.graph.edges(data=True): + for source, target, edge_data in updated_graph.edges(data=True): edge_dict = {"source": source, "target": target, **edge_data} updated_edges.append(edge_dict) @@ -493,7 +504,7 @@ async def run_entity_resolution(): return get_json_result( data=True, - message='Entity resolution completed successfully.', + message=f'Entity resolution completed successfully. Graph now has {len(updated_nodes)} nodes and {len(updated_edges)} edges.', code=settings.RetCode.SUCCESS ) @@ -568,37 +579,45 @@ def detect_communities(kb_id): for edge in graph_data["edges"]: graph.add_edge(edge["source"], edge["target"], **{k: v for k, v in edge.items() if k not in ["source", "target"]}) + # Set source_id for the graph (required by GraphRAG functions) + graph.graph["source_id"] = ["api_call"] + # Set node degrees for community detection for node_degree in graph.degree: graph.nodes[str(node_degree[0])]["rank"] = int(node_degree[1]) - # Run community detection asynchronously + # Run community detection using the existing GraphRAG functions def progress_callback(msg=""): logging.info(f"Community detection progress: {msg}") async def run_community_detection(): chat_model = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) - community_extractor = CommunityReportsExtractor(chat_model) + embedding_model = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=None, lang=kb.language) + + # Call the existing extract_community function + await extract_community( + graph=graph, + tenant_id=kb.tenant_id, + kb_id=kb_id, + doc_id="api_call", # Use placeholder since this is a manual API call + llm_bdl=chat_model, + embed_bdl=embedding_model, + callback=progress_callback + ) - result = await community_extractor(graph, callback=progress_callback) - return result + return graph - # Execute async function - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - community_result = loop.run_until_complete(run_community_detection()) - finally: - loop.close() + # Execute the async function using trio + updated_graph = trio.run(run_community_detection) # Convert updated graph back to JSON format updated_nodes = [] - for node_id, node_data in graph.nodes(data=True): + for node_id, node_data in updated_graph.nodes(data=True): node_dict = {"id": node_id, **node_data} updated_nodes.append(node_dict) updated_edges = [] - for source, target, edge_data in graph.edges(data=True): + for source, target, edge_data in updated_graph.edges(data=True): edge_dict = {"source": source, "target": target, **edge_data} updated_edges.append(edge_dict) @@ -615,23 +634,9 @@ async def run_community_detection(): kb_id ) - # Store community reports - community_reports = { - "structured_output": community_result.structured_output, - "reports": community_result.output - } - - # Store community reports separately - settings.docStoreConn.update( - {"knowledge_graph_kwd": ["community"], "kb_id": [kb_id]}, - {"content_with_weight": json.dumps(community_reports)}, - search.index_name(kb.tenant_id), - kb_id - ) - return get_json_result( data=True, - message=f'Community detection completed successfully. Found {len(community_result.structured_output)} communities.', + message=f'Community detection completed successfully. Graph now has {len(updated_nodes)} nodes and {len(updated_edges)} edges.', code=settings.RetCode.SUCCESS ) From 05d1d6d9c5eb87c95d1b13878ce29582bbec7e2e Mon Sep 17 00:00:00 2001 From: adrianad Date: Thu, 10 Jul 2025 09:31:17 +0200 Subject: [PATCH 08/22] Community Progress starting --- api/apps/kb_app.py | 47 ++++++++++++++++++- web/src/hooks/knowledge-hooks.ts | 24 +++++++++- web/src/locales/en.ts | 4 ++ .../components/knowledge-graph/index.tsx | 35 +++++++++++++- 4 files changed, 106 insertions(+), 4 deletions(-) diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index b0112e56a31..e52f9f562ff 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -586,9 +586,40 @@ def detect_communities(kb_id): for node_degree in graph.degree: graph.nodes[str(node_degree[0])]["rank"] = int(node_degree[1]) + # Progress tracking variables + progress_data = { + "total_communities": 0, + "processed_communities": 0, + "tokens_used": 0, + "current_status": "initializing" + } + # Run community detection using the existing GraphRAG functions def progress_callback(msg=""): + import re logging.info(f"Community detection progress: {msg}") + + # Parse progress messages to extract metrics + if "communities:" in msg.lower(): + # Extract community progress (e.g., "Communities: 3/4") + match = re.search(r'communities?:\s*(\d+)/(\d+)', msg, re.IGNORECASE) + if match: + progress_data["processed_communities"] = int(match.group(1)) + progress_data["total_communities"] = int(match.group(2)) + + if "tokens:" in msg.lower() or "token" in msg.lower(): + # Extract token usage (e.g., "used tokens: 12750") + match = re.search(r'tokens?[^\d]*(\d+)', msg, re.IGNORECASE) + if match: + progress_data["tokens_used"] = int(match.group(1)) + + # Update status based on message content + if "completed" in msg.lower(): + progress_data["current_status"] = "completed" + elif "processing" in msg.lower() or "generating" in msg.lower(): + progress_data["current_status"] = "processing" + elif "detecting" in msg.lower(): + progress_data["current_status"] = "detecting" async def run_community_detection(): chat_model = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) @@ -634,9 +665,21 @@ async def run_community_detection(): kb_id ) + # Count communities in the updated graph + communities = set() + for node_data in updated_nodes: + if "community" in node_data: + communities.add(node_data["community"]) + return get_json_result( - data=True, - message=f'Community detection completed successfully. Graph now has {len(updated_nodes)} nodes and {len(updated_edges)} edges.', + data={ + "success": True, + "nodes_count": len(updated_nodes), + "edges_count": len(updated_edges), + "communities_count": len(communities), + "progress": progress_data + }, + message=f'Community detection completed successfully. Graph now has {len(updated_nodes)} nodes, {len(updated_edges)} edges, and {len(communities)} communities. Tokens used: {progress_data["tokens_used"]}', code=settings.RetCode.SUCCESS ) diff --git a/web/src/hooks/knowledge-hooks.ts b/web/src/hooks/knowledge-hooks.ts index 300fc6126d6..96da787bcea 100644 --- a/web/src/hooks/knowledge-hooks.ts +++ b/web/src/hooks/knowledge-hooks.ts @@ -525,6 +525,7 @@ export const useResolveEntities = () => { export const useDetectCommunities = () => { const knowledgeBaseId = useKnowledgeBaseId(); + const [progress, setProgress] = useState(null); const queryClient = useQueryClient(); const { @@ -534,16 +535,37 @@ export const useDetectCommunities = () => { } = useMutation({ mutationKey: ['detectCommunities'], mutationFn: async () => { + // Reset progress at start + setProgress({ + total_communities: 0, + processed_communities: 0, + tokens_used: 0, + current_status: 'starting' + }); + const { data } = await detectCommunities(knowledgeBaseId); if (data.code === 0) { + // Update progress with final results if available + if (data.data && data.data.progress) { + setProgress({ + ...data.data.progress, + current_status: 'completed' + }); + } + message.success(i18n.t(`knowledgeGraph.communityDetectionSuccess`, 'Community detection completed successfully')); queryClient.invalidateQueries({ queryKey: ['fetchKnowledgeGraph'], }); + + // Clear progress after a delay + setTimeout(() => setProgress(null), 3000); + } else { + setProgress(null); } return data; }, }); - return { data, loading, detectCommunities: mutateAsync }; + return { data, loading, detectCommunities: mutateAsync, progress }; }; diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index 746637be3a9..d1fb8f1c363 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -1313,6 +1313,10 @@ This delimiter is used to split the input text into several text pieces echo of detecting: 'Detecting...', entityResolutionSuccess: 'Entity resolution completed successfully', communityDetectionSuccess: 'Community detection completed successfully', + communityProgress: 'Community Detection', + communities: 'Communities', + tokensUsed: 'Tokens Used', + status: 'Status', }, }, }; diff --git a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx index 208c2d018f9..5695228a365 100644 --- a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx +++ b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx @@ -12,7 +12,7 @@ const KnowledgeGraph: React.FC = () => { const { t } = useTranslation(); const { handleDeleteKnowledgeGraph } = useDeleteKnowledgeGraph(); const { resolveEntities, loading: resolvingEntities } = useResolveEntities(); - const { detectCommunities, loading: detectingCommunities } = useDetectCommunities(); + const { detectCommunities, loading: detectingCommunities, progress: communityProgress } = useDetectCommunities(); const totalNodes = data?.graph?.total_nodes || 0; const totalEdges = data?.graph?.total_edges || 0; @@ -91,6 +91,39 @@ const KnowledgeGraph: React.FC = () => { {displayedEdges.toLocaleString()} / {totalEdges.toLocaleString()}
+ + {/* Community Detection Progress */} + {detectingCommunities && communityProgress && ( +
+
+ {t('knowledgeGraph.communityProgress', 'Community Detection')} +
+
+ {communityProgress.total_communities > 0 && ( +
+ {t('knowledgeGraph.communities', 'Communities')}: + + {communityProgress.processed_communities}/{communityProgress.total_communities} + +
+ )} + {communityProgress.tokens_used > 0 && ( +
+ {t('knowledgeGraph.tokensUsed', 'Tokens Used')}: + + {communityProgress.tokens_used.toLocaleString()} + +
+ )} +
+ {t('knowledgeGraph.status', 'Status')}: + + {communityProgress.current_status} + +
+
+
+ )} From 2aec2fb34be8d7176da7f8d55395cc090436944b Mon Sep 17 00:00:00 2001 From: adrianad Date: Thu, 10 Jul 2025 10:34:02 +0200 Subject: [PATCH 09/22] Community detection with updates complete --- api/apps/kb_app.py | 71 +++++++-- web/src/hooks/knowledge-hooks.ts | 135 +++++++++++++++--- .../components/knowledge-graph/index.tsx | 2 +- web/src/services/knowledge-service.ts | 4 + web/src/utils/api.ts | 2 + 5 files changed, 181 insertions(+), 33 deletions(-) diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index e52f9f562ff..c158a30a11c 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -43,6 +43,8 @@ from api.db.services.llm_service import LLMBundle from api.db import LLMType +# Global progress storage for community detection +community_detection_progress = {} @manager.route('/create', methods=['post']) # noqa: F821 @login_required @@ -594,32 +596,32 @@ def detect_communities(kb_id): "current_status": "initializing" } + # Initialize progress data in global storage + community_detection_progress[kb_id] = progress_data + # Run community detection using the existing GraphRAG functions def progress_callback(msg=""): import re logging.info(f"Community detection progress: {msg}") # Parse progress messages to extract metrics + # Actual format: "Communities: 3/4, used tokens: 12750" if "communities:" in msg.lower(): # Extract community progress (e.g., "Communities: 3/4") - match = re.search(r'communities?:\s*(\d+)/(\d+)', msg, re.IGNORECASE) + match = re.search(r'Communities:\s*(\d+)/(\d+)', msg, re.IGNORECASE) if match: progress_data["processed_communities"] = int(match.group(1)) progress_data["total_communities"] = int(match.group(2)) - - if "tokens:" in msg.lower() or "token" in msg.lower(): - # Extract token usage (e.g., "used tokens: 12750") - match = re.search(r'tokens?[^\d]*(\d+)', msg, re.IGNORECASE) - if match: - progress_data["tokens_used"] = int(match.group(1)) + progress_data["current_status"] = "processing" # Update status based on message content - if "completed" in msg.lower(): + if "done" in msg.lower(): progress_data["current_status"] = "completed" - elif "processing" in msg.lower() or "generating" in msg.lower(): + elif "extracting" in msg.lower() or "extracted" in msg.lower(): progress_data["current_status"] = "processing" - elif "detecting" in msg.lower(): - progress_data["current_status"] = "detecting" + + # Update the global progress storage + community_detection_progress[kb_id] = progress_data.copy() async def run_community_detection(): chat_model = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) @@ -671,6 +673,21 @@ async def run_community_detection(): if "community" in node_data: communities.add(node_data["community"]) + # Mark operation as completed but don't delete immediately + # Frontend will handle cleanup after showing final status + progress_data["current_status"] = "completed" + community_detection_progress[kb_id] = progress_data.copy() + + # Schedule cleanup after 30 seconds to prevent memory buildup + def cleanup_progress(): + import time + time.sleep(30) + if kb_id in community_detection_progress: + del community_detection_progress[kb_id] + + import threading + threading.Thread(target=cleanup_progress, daemon=True).start() + return get_json_result( data={ "success": True, @@ -679,14 +696,44 @@ async def run_community_detection(): "communities_count": len(communities), "progress": progress_data }, - message=f'Community detection completed successfully. Graph now has {len(updated_nodes)} nodes, {len(updated_edges)} edges, and {len(communities)} communities. Tokens used: {progress_data["tokens_used"]}', + message=f'Community detection completed successfully. Graph now has {len(updated_nodes)} nodes, {len(updated_edges)} edges, and {len(communities)} communities.', code=settings.RetCode.SUCCESS ) except Exception as e: logging.error(f"Community detection failed: {str(e)}") + # Clean up progress data on error + if kb_id in community_detection_progress: + del community_detection_progress[kb_id] return get_json_result( data=False, message=f'Community detection failed: {str(e)}', code=settings.RetCode.SERVER_ERROR ) + + +@manager.route('//knowledge_graph/progress', methods=['GET']) # noqa: F821 +@login_required +def get_community_detection_progress(kb_id): + if not KnowledgebaseService.accessible(kb_id, current_user.id): + return get_json_result( + data=False, + message='No authorization.', + code=settings.RetCode.AUTHENTICATION_ERROR + ) + + # Get progress data for this kb_id + progress_data = community_detection_progress.get(kb_id, None) + + if progress_data is None: + return get_json_result( + data=None, + message='No active community detection operation.', + code=settings.RetCode.SUCCESS + ) + + return get_json_result( + data=progress_data, + message='Progress retrieved successfully.', + code=settings.RetCode.SUCCESS + ) diff --git a/web/src/hooks/knowledge-hooks.ts b/web/src/hooks/knowledge-hooks.ts index 96da787bcea..aa55000d037 100644 --- a/web/src/hooks/knowledge-hooks.ts +++ b/web/src/hooks/knowledge-hooks.ts @@ -15,6 +15,7 @@ import kbService, { renameTag, resolveEntities, detectCommunities, + getCommunityDetectionProgress, } from '@/services/knowledge-service'; import { useInfiniteQuery, @@ -26,7 +27,7 @@ import { } from '@tanstack/react-query'; import { useDebounce } from 'ahooks'; import { message } from 'antd'; -import { useState } from 'react'; +import { useState, useEffect, useRef } from 'react'; import { useSearchParams } from 'umi'; import { useHandleSearchChange } from './logic-hooks'; import { useSetPaginationParams } from './route-hook'; @@ -526,6 +527,7 @@ export const useResolveEntities = () => { export const useDetectCommunities = () => { const knowledgeBaseId = useKnowledgeBaseId(); const [progress, setProgress] = useState(null); + const pollingRef = useRef(null); const queryClient = useQueryClient(); const { @@ -535,24 +537,12 @@ export const useDetectCommunities = () => { } = useMutation({ mutationKey: ['detectCommunities'], mutationFn: async () => { - // Reset progress at start - setProgress({ - total_communities: 0, - processed_communities: 0, - tokens_used: 0, - current_status: 'starting' - }); - + // Start the community detection operation const { data } = await detectCommunities(knowledgeBaseId); + return data; + }, + onSuccess: (data) => { if (data.code === 0) { - // Update progress with final results if available - if (data.data && data.data.progress) { - setProgress({ - ...data.data.progress, - current_status: 'completed' - }); - } - message.success(i18n.t(`knowledgeGraph.communityDetectionSuccess`, 'Community detection completed successfully')); queryClient.invalidateQueries({ queryKey: ['fetchKnowledgeGraph'], @@ -560,12 +550,117 @@ export const useDetectCommunities = () => { // Clear progress after a delay setTimeout(() => setProgress(null), 3000); - } else { - setProgress(null); } - return data; + }, + onError: () => { + setProgress(null); + // Clear any ongoing polling + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; + } }, }); + // Function to start polling + const startPolling = () => { + if (pollingRef.current) { + clearInterval(pollingRef.current); + } + + pollingRef.current = setInterval(async () => { + try { + const { data: progressData } = await getCommunityDetectionProgress(knowledgeBaseId); + if (progressData.code === 0 && progressData.data) { + setProgress(progressData.data); + + // If status is completed, clear progress after a delay + if (progressData.data.current_status === 'completed') { + setTimeout(() => { + setProgress(null); + }, 3000); // Clear after 3 seconds + + // Stop polling since operation is completed + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; + } + } + } else if (progressData.code === 0 && progressData.data === null) { + // Operation completed or not running, stop polling + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; + } + } + } catch (error) { + console.error('Failed to fetch progress:', error); + } + }, 1000); // Poll every second + }; + + // Check for ongoing operation on component mount + useEffect(() => { + const checkInitialProgress = async () => { + try { + console.log('Checking initial progress for kb:', knowledgeBaseId); + const { data: progressData } = await getCommunityDetectionProgress(knowledgeBaseId); + console.log('Progress data received:', progressData); + + if (progressData.code === 0 && progressData.data) { + console.log('Found ongoing operation, setting progress:', progressData.data); + setProgress(progressData.data); + + // If status is completed, clear progress after a delay + if (progressData.data.current_status === 'completed') { + setTimeout(() => { + setProgress(null); + }, 3000); // Clear after 3 seconds + } else { + // Start polling since operation is still ongoing + startPolling(); + } + } else { + console.log('No ongoing operation found'); + } + } catch (error) { + console.error('Failed to check initial progress:', error); + } + }; + + if (knowledgeBaseId) { + checkInitialProgress(); + } + }, [knowledgeBaseId]); + + // Start polling when mutation starts + useEffect(() => { + if (loading) { + // Reset progress at start + setProgress({ + total_communities: 0, + processed_communities: 0, + tokens_used: 0, + current_status: 'starting' + }); + + // Start polling for progress + startPolling(); + } else { + // Stop polling when mutation completes + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; + } + } + + return () => { + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; + } + }; + }, [loading, knowledgeBaseId]); + return { data, loading, detectCommunities: mutateAsync, progress }; }; diff --git a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx index 5695228a365..f744b463b1d 100644 --- a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx +++ b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx @@ -93,7 +93,7 @@ const KnowledgeGraph: React.FC = () => { {/* Community Detection Progress */} - {detectingCommunities && communityProgress && ( + {communityProgress && (
{t('knowledgeGraph.communityProgress', 'Community Detection')} diff --git a/web/src/services/knowledge-service.ts b/web/src/services/knowledge-service.ts index cb77f27027b..cbdb8a92c86 100644 --- a/web/src/services/knowledge-service.ts +++ b/web/src/services/knowledge-service.ts @@ -190,6 +190,10 @@ export function detectCommunities(knowledgeId: string) { return request.post(api.detectCommunities(knowledgeId)); } +export function getCommunityDetectionProgress(knowledgeId: string) { + return request.get(api.getCommunityDetectionProgress(knowledgeId)); +} + export const listDataset = ( params?: IFetchKnowledgeListRequestParams, body?: IFetchKnowledgeListRequestBody, diff --git a/web/src/utils/api.ts b/web/src/utils/api.ts index f95e3904c77..874c0f4be38 100644 --- a/web/src/utils/api.ts +++ b/web/src/utils/api.ts @@ -47,6 +47,8 @@ export default { `${api_host}/kb/${knowledgeId}/knowledge_graph/resolve_entities`, detectCommunities: (knowledgeId: string) => `${api_host}/kb/${knowledgeId}/knowledge_graph/detect_communities`, + getCommunityDetectionProgress: (knowledgeId: string) => + `${api_host}/kb/${knowledgeId}/knowledge_graph/progress`, // tags listTag: (knowledgeId: string) => `${api_host}/kb/${knowledgeId}/tags`, From 6607d3c78004342729e554c9a765bb59765b7da3 Mon Sep 17 00:00:00 2001 From: adrianad Date: Thu, 10 Jul 2025 10:50:36 +0200 Subject: [PATCH 10/22] Show communities in Statistics --- web/src/hooks/knowledge-hooks.ts | 5 ----- .../components/knowledge-graph/index.tsx | 14 ++++++++++++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/web/src/hooks/knowledge-hooks.ts b/web/src/hooks/knowledge-hooks.ts index aa55000d037..2c5d3bc41f1 100644 --- a/web/src/hooks/knowledge-hooks.ts +++ b/web/src/hooks/knowledge-hooks.ts @@ -603,12 +603,9 @@ export const useDetectCommunities = () => { useEffect(() => { const checkInitialProgress = async () => { try { - console.log('Checking initial progress for kb:', knowledgeBaseId); const { data: progressData } = await getCommunityDetectionProgress(knowledgeBaseId); - console.log('Progress data received:', progressData); if (progressData.code === 0 && progressData.data) { - console.log('Found ongoing operation, setting progress:', progressData.data); setProgress(progressData.data); // If status is completed, clear progress after a delay @@ -620,8 +617,6 @@ export const useDetectCommunities = () => { // Start polling since operation is still ongoing startPolling(); } - } else { - console.log('No ongoing operation found'); } } catch (error) { console.error('Failed to check initial progress:', error); diff --git a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx index f744b463b1d..57a9a50bf63 100644 --- a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx +++ b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx @@ -18,6 +18,14 @@ const KnowledgeGraph: React.FC = () => { const totalEdges = data?.graph?.total_edges || 0; const displayedNodes = data?.graph?.nodes?.length || 0; const displayedEdges = data?.graph?.edges?.length || 0; + + // Calculate community count from graph data + const communityCount = data?.graph?.nodes?.reduce((communities, node) => { + if (node.communities && Array.isArray(node.communities)) { + node.communities.forEach(community => communities.add(community)); + } + return communities; + }, new Set()).size || 0; const handleResolveEntities = async () => { try { @@ -91,6 +99,12 @@ const KnowledgeGraph: React.FC = () => { {displayedEdges.toLocaleString()} / {totalEdges.toLocaleString()}
+
+ {t('knowledgeGraph.communities', 'Communities')}: + + {communityCount.toLocaleString()} + +
{/* Community Detection Progress */} {communityProgress && ( From 0a60955acb029b8143b10f6f851767c083708484 Mon Sep 17 00:00:00 2001 From: adrianad Date: Thu, 10 Jul 2025 11:02:02 +0200 Subject: [PATCH 11/22] Fixing endpoint naming --- api/apps/kb_app.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index c158a30a11c..d00907c4d31 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -39,7 +39,7 @@ from api.constants import DATASET_NAME_LIMIT from rag.settings import PAGERANK_FLD from rag.utils.storage_factory import STORAGE_IMPL -from graphrag.general.index import resolve_entities, extract_community +from graphrag.general.index import resolve_entities as graphrag_resolve_entities_impl, extract_community from api.db.services.llm_service import LLMBundle from api.db import LLMType @@ -464,15 +464,15 @@ async def run_entity_resolution(): subgraph_nodes = set(graph.nodes()) # Call the existing resolve_entities function - await resolve_entities( - graph=graph, - subgraph_nodes=subgraph_nodes, - tenant_id=kb.tenant_id, - kb_id=kb_id, - doc_id="api_call", # Use placeholder since this is a manual API call - llm_bdl=chat_model, - embed_bdl=embedding_model, - callback=progress_callback + await graphrag_resolve_entities_impl( + graph, + subgraph_nodes, + kb.tenant_id, + kb_id, + "api_call", # Use placeholder since this is a manual API call + chat_model, + embedding_model, + progress_callback ) return graph From a76eccec39848629315211c5b15e0a5271089aff Mon Sep 17 00:00:00 2001 From: adrianad Date: Thu, 10 Jul 2025 17:40:45 +0200 Subject: [PATCH 12/22] Update on the Redis Lock --- GRAPH_LOCK_ANALYSIS.md | 193 ++++++++++++++ api/apps/kb_app.py | 240 +++++++++++++++--- api/db/services/document_service.py | 11 + graphrag/entity_resolution.py | 2 +- .../general/community_reports_extractor.py | 2 +- rag/utils/redis_conn.py | 13 +- web/src/hooks/knowledge-hooks.ts | 180 ++++++++++++- web/src/locales/en.ts | 6 + .../components/knowledge-graph/index.tsx | 56 +++- web/src/services/knowledge-service.ts | 8 + web/src/utils/api.ts | 4 + 11 files changed, 658 insertions(+), 57 deletions(-) create mode 100644 GRAPH_LOCK_ANALYSIS.md diff --git a/GRAPH_LOCK_ANALYSIS.md b/GRAPH_LOCK_ANALYSIS.md new file mode 100644 index 00000000000..cb876fb81cd --- /dev/null +++ b/GRAPH_LOCK_ANALYSIS.md @@ -0,0 +1,193 @@ +# Knowledge Graph Lock Coverage Analysis + +## Problem Summary + +During document parsing, the knowledge graph was randomly resetting, causing all nodes and edges to disappear. This analysis investigated whether the Redis distributed locking mechanism was properly protecting all `set_graph()` operations. + +## Lock Coverage Analysis for `set_graph()` Operations + +### ✅ **Document Parsing Operations (PROPERLY LOCKED)** + +**In `run_graphrag()` function (`/srv/bfabriclocal/Desktop/ragflow/graphrag/general/index.py:75-120`):** + +```python +graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=1200) +await graphrag_task_lock.spin_acquire() # LOCK ACQUIRED + +try: + # 1. ✅ COVERED: merge_subgraph() calls set_graph() on line 220 + new_graph = await merge_subgraph(...) + + if with_resolution: + await graphrag_task_lock.spin_acquire() # Re-acquire (redundant but safe) + # 2. ✅ COVERED: resolve_entities() calls set_graph() on line 248 + await resolve_entities(...) + + if with_community: + await graphrag_task_lock.spin_acquire() # Re-acquire (redundant but safe) + # 3. ✅ COVERED: extract_community() does NOT call set_graph() + # It only inserts community reports, not graph data + await extract_community(...) + +finally: + graphrag_task_lock.release() # LOCK RELEASED +``` + +### ✅ **API Operations (PROPERLY LOCKED)** + +**API endpoints also use the same lock (`/srv/bfabriclocal/Desktop/ragflow/api/apps/kb_app.py`):** + +- `resolve_entities` API: Has `graphrag_task_lock` protection +- `detect_communities` API: Has `graphrag_task_lock` protection +- **But these don't call `set_graph()`** - they update the graph via `settings.docStoreConn.update()` + +```python +graphrag_task_lock = RedisDistributedLock( + f"graphrag_task_{kb_id}", + lock_value="api_entity_resolution", # or "api_community_detection" + timeout=1200 +) + +try: + await graphrag_task_lock.spin_acquire() + # ... GraphRAG operations ... +finally: + graphrag_task_lock.release() +``` + +## Summary: All `set_graph()` Calls Are Properly Locked + +| Call Site | Function | Lock Protection | Status | +|-----------|----------|----------------|--------| +| `index.py:220` | `merge_subgraph()` | ✅ `graphrag_task_lock` | **PROTECTED** | +| `index.py:248` | `resolve_entities()` | ✅ `graphrag_task_lock` | **PROTECTED** | + +## Key Findings + +1. **✅ All `set_graph()` operations are properly covered by locks** +2. **✅ Community detection doesn't call `set_graph()`** - it only manages community reports +3. **✅ API operations use a different update mechanism** (`docStoreConn.update()`) that doesn't go through `set_graph()` +4. **✅ The lock is held for the entire duration** of each `set_graph()` operation + +## Root Cause Analysis: Why Data Was Still Being Lost + +Since the locking coverage was correct, the issue was actually caused by **bugs in the Redis lock implementation**: + +### 🐛 **Critical Bug #1: `spin_acquire()` Deleted Existing Locks** + +**File**: `/srv/bfabriclocal/Desktop/ragflow/rag/utils/redis_conn.py:353-358` + +**Original broken code:** +```python +async def spin_acquire(self): + REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) # ❌ DELETES THE LOCK! + while True: + if self.lock.acquire(token=self.lock_value): + break + await trio.sleep(10) +``` + +**Problem**: Line 354 would delete any existing lock before trying to acquire, allowing multiple processes to get the same lock simultaneously. + +**Fixed code:** +```python +async def spin_acquire(self): + # Don't delete existing locks - just try to acquire properly + while True: + if self.lock.acquire(token=self.lock_value): + break + await trio.sleep(1) # Reduced sleep time for faster acquisition +``` + +### 🐛 **Critical Bug #2: `release()` Didn't Actually Release** + +**Original broken code:** +```python +def release(self): + REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) # ❌ WRONG! +``` + +**Problem**: This deleted the Redis key but didn't properly release the underlying Redis lock. + +**Fixed code:** +```python +def release(self): + # Properly release the underlying Redis lock + try: + self.lock.release() + except Exception as e: + # Fallback to delete if release fails + REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) +``` + +### 🐛 **Critical Bug #3: Invalid API Constructor Parameter** + +**Original broken code:** +```python +graphrag_task_lock = RedisDistributedLock( + f"graphrag_task_{kb_id}", + lock_value="api_entity_resolution", + redis_conn=settings.redis_conn, # ❌ INVALID PARAMETER + timeout=1200 +) +``` + +**Problem**: The `RedisDistributedLock` constructor doesn't accept a `redis_conn` parameter, causing the locks to potentially fail silently. + +**Fixed code:** +```python +graphrag_task_lock = RedisDistributedLock( + f"graphrag_task_{kb_id}", + lock_value="api_entity_resolution", + timeout=1200 +) +``` + +## Additional Risk Factor: `set_graph()` Race Condition Window + +Even with perfect locking, the `set_graph()` function has an inherent risk: + +```python +async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, change: GraphChange, callback): + # ❌ DELETE ALL GRAPH DATA immediately + await trio.to_thread.run_sync(lambda: settings.docStoreConn.delete( + {"knowledge_graph_kwd": ["graph", "subgraph"]}, + search.index_name(tenant_id), kb_id + )) + + # ... 50+ lines of processing ... + + # ✅ INSERT NEW DATA much later + await trio.to_thread.run_sync(lambda: settings.docStoreConn.insert(...)) +``` + +**Risk**: If anything fails between deletion and insertion, the graph is permanently lost. + +## Resolution Status + +| Issue | Status | Fix Applied | +|-------|--------|-------------| +| Lock coverage verification | ✅ **COMPLETE** | All `set_graph()` calls are properly locked | +| Redis lock `spin_acquire()` bug | ✅ **FIXED** | Removed lock deletion before acquisition | +| Redis lock `release()` bug | ✅ **FIXED** | Now properly releases underlying lock | +| API constructor bug | ✅ **FIXED** | Removed invalid parameter | +| Frontend query invalidation | ✅ **FIXED** | Added `knowledgeBaseId` to query keys | + +## Conclusion + +The knowledge graph reset issue was **not caused by missing lock coverage** - the locks were properly protecting all critical operations. Instead, it was caused by **fundamental bugs in the Redis distributed lock implementation** that allowed multiple processes to acquire the same lock simultaneously. + +With the Redis lock bugs fixed, the knowledge graph should now be properly protected during document parsing operations, preventing the random resets that were occurring before. + +## Testing Recommendations + +1. **Monitor graph stability** during multi-document parsing +2. **Check Redis logs** for lock acquisition/release patterns +3. **Verify no "graphrag_task_lock acquired" messages overlap** for the same `kb_id` +4. **Test with both single and multiple workers** to ensure lock effectiveness + +## Files Modified + +- `/srv/bfabriclocal/Desktop/ragflow/rag/utils/redis_conn.py` - Fixed Redis lock implementation +- `/srv/bfabriclocal/Desktop/ragflow/api/apps/kb_app.py` - Fixed API lock constructor +- `/srv/bfabriclocal/Desktop/ragflow/web/src/hooks/knowledge-hooks.ts` - Fixed query invalidation \ No newline at end of file diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index d00907c4d31..8a18348452f 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -20,7 +20,7 @@ import trio from typing import Dict, Any -from flask import request +from flask import request, Blueprint from flask_login import login_required, current_user from api.db.services import duplicate_name @@ -43,8 +43,12 @@ from api.db.services.llm_service import LLMBundle from api.db import LLMType -# Global progress storage for community detection +# Global progress storage for community detection and entity resolution community_detection_progress = {} +entity_resolution_progress = {} + +# Create manager blueprint if not already defined +manager = Blueprint('kb', __name__) @manager.route('/create', methods=['post']) # noqa: F821 @login_required @@ -359,12 +363,12 @@ def knowledge_graph(kb_id): if "nodes" in obj["graph"]: total_nodes = len(obj["graph"]["nodes"]) - obj["graph"]["nodes"] = sorted(obj["graph"]["nodes"], key=lambda x: x.get("pagerank", 0), reverse=True)[:256] + obj["graph"]["nodes"] = sorted(obj["graph"]["nodes"], key=lambda x: x.get("pagerank", 0), reverse=True)[:500] if "edges" in obj["graph"]: total_edges = len(obj["graph"]["edges"]) node_id_set = { o["id"] for o in obj["graph"]["nodes"] } filtered_edges = [o for o in obj["graph"]["edges"] if o["source"] != o["target"] and o["source"] in node_id_set and o["target"] in node_id_set] - obj["graph"]["edges"] = sorted(filtered_edges, key=lambda x: x.get("weight", 0), reverse=True)[:128] + obj["graph"]["edges"] = sorted(filtered_edges, key=lambda x: x.get("weight", 0), reverse=True)[:300] obj["graph"]["total_edges"] = total_edges obj["graph"]["total_nodes"] = total_nodes return get_json_result(data=obj) @@ -397,6 +401,14 @@ def resolve_entities(kb_id): try: _, kb = KnowledgebaseService.get_by_id(kb_id) + # Check if documents are currently being parsed + if DocumentService.has_documents_parsing(kb_id): + return get_json_result( + data=False, + message='Cannot perform entity resolution while documents are being parsed. Please wait for parsing to complete.', + code=423 # HTTP 423 Locked + ) + # Check if knowledge graph exists if not settings.docStoreConn.indexExist(search.index_name(kb.tenant_id), kb_id): return get_json_result( @@ -452,30 +464,96 @@ def resolve_entities(kb_id): # Get all nodes as subgraph nodes for entity resolution subgraph_nodes = set(graph.nodes()) + # Progress tracking variables + progress_data = { + "total_pairs": 0, + "processed_pairs": 0, + "remaining_pairs": 0, + "current_status": "initializing" + } + + # Initialize progress data in global storage + entity_resolution_progress[kb_id] = progress_data + # Run entity resolution using the existing GraphRAG functions def progress_callback(msg=""): + import re logging.info(f"Entity resolution progress: {msg}") + + # Parse progress messages to extract metrics + # Format: "Identified X candidate pairs" + if "identified" in msg.lower() and "candidate pairs" in msg.lower(): + match = re.search(r'Identified (\d+) candidate pairs', msg, re.IGNORECASE) + if match: + total_pairs = int(match.group(1)) + progress_data["total_pairs"] = total_pairs + progress_data["processed_pairs"] = 0 + progress_data["remaining_pairs"] = total_pairs + progress_data["current_status"] = "processing" + + # Format: "Resolved X pairs, Y are remained to resolve" + elif "resolved" in msg.lower() and "remained to resolve" in msg.lower(): + match = re.search(r'Resolved (\d+) pairs, (\d+) are remained to resolve', msg, re.IGNORECASE) + if match: + processed_pairs = int(match.group(1)) + remaining_pairs = int(match.group(2)) + total_pairs = processed_pairs + remaining_pairs + progress_data["total_pairs"] = total_pairs + progress_data["processed_pairs"] = processed_pairs + progress_data["remaining_pairs"] = remaining_pairs + progress_data["current_status"] = "processing" + + # Format: "Resolved X candidate pairs, Y of them are selected to merge" + elif "resolved" in msg.lower() and "candidate pairs" in msg.lower() and "selected to merge" in msg.lower(): + match = re.search(r'Resolved (\d+) candidate pairs', msg, re.IGNORECASE) + if match: + total_pairs = int(match.group(1)) + progress_data["total_pairs"] = total_pairs + progress_data["processed_pairs"] = total_pairs + progress_data["remaining_pairs"] = 0 + progress_data["current_status"] = "completed" + + # Update status based on message content + if "done" in msg.lower(): + progress_data["current_status"] = "completed" + + # Update the global progress storage + entity_resolution_progress[kb_id] = progress_data.copy() async def run_entity_resolution(): - chat_model = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) - embedding_model = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=None, lang=kb.language) + from rag.utils.redis_conn import RedisDistributedLock - # Get all nodes as subgraph nodes for entity resolution - subgraph_nodes = set(graph.nodes()) - - # Call the existing resolve_entities function - await graphrag_resolve_entities_impl( - graph, - subgraph_nodes, - kb.tenant_id, - kb_id, - "api_call", # Use placeholder since this is a manual API call - chat_model, - embedding_model, - progress_callback + # Acquire the same lock used by document parsing + graphrag_task_lock = RedisDistributedLock( + f"graphrag_task_{kb_id}", + lock_value="api_entity_resolution", + timeout=1200 ) - return graph + try: + await graphrag_task_lock.spin_acquire() + + chat_model = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) + embedding_model = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=None, lang=kb.language) + + # Get all nodes as subgraph nodes for entity resolution + subgraph_nodes = set(graph.nodes()) + + # Call the existing resolve_entities function + await graphrag_resolve_entities_impl( + graph, + subgraph_nodes, + kb.tenant_id, + kb_id, + "api_call", # Use placeholder since this is a manual API call + chat_model, + embedding_model, + progress_callback + ) + + return graph + finally: + graphrag_task_lock.release() # Execute the async function using trio updated_graph = trio.run(run_entity_resolution) @@ -504,14 +582,41 @@ async def run_entity_resolution(): kb_id ) + # Mark operation as completed but don't delete immediately + # Frontend will handle cleanup after showing final status + progress_data["current_status"] = "completed" + entity_resolution_progress[kb_id] = progress_data.copy() + + # Schedule cleanup after 30 seconds to prevent memory buildup + def cleanup_progress(): + import time + time.sleep(30) + if kb_id in entity_resolution_progress: + del entity_resolution_progress[kb_id] + + import threading + threading.Thread(target=cleanup_progress, daemon=True).start() + + # Extract merge information from final progress data + total_pairs = progress_data.get("total_pairs", 0) + return get_json_result( - data=True, - message=f'Entity resolution completed successfully. Graph now has {len(updated_nodes)} nodes and {len(updated_edges)} edges.', + data={ + "success": True, + "nodes_count": len(updated_nodes), + "edges_count": len(updated_edges), + "total_pairs_analyzed": total_pairs, + "progress": progress_data + }, + message=f'Entity resolution completed successfully. Analyzed {total_pairs} candidate pairs. Graph now has {len(updated_nodes)} nodes and {len(updated_edges)} edges.', code=settings.RetCode.SUCCESS ) except Exception as e: logging.error(f"Entity resolution failed: {str(e)}") + # Clean up progress data on error + if kb_id in entity_resolution_progress: + del entity_resolution_progress[kb_id] return get_json_result( data=False, message=f'Entity resolution failed: {str(e)}', @@ -532,6 +637,14 @@ def detect_communities(kb_id): try: _, kb = KnowledgebaseService.get_by_id(kb_id) + # Check if documents are currently being parsed + if DocumentService.has_documents_parsing(kb_id): + return get_json_result( + data=False, + message='Cannot perform community detection while documents are being parsed. Please wait for parsing to complete.', + code=423 # HTTP 423 Locked + ) + # Check if knowledge graph exists if not settings.docStoreConn.indexExist(search.index_name(kb.tenant_id), kb_id): return get_json_result( @@ -624,21 +737,35 @@ def progress_callback(msg=""): community_detection_progress[kb_id] = progress_data.copy() async def run_community_detection(): - chat_model = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) - embedding_model = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=None, lang=kb.language) + from rag.utils.redis_conn import RedisDistributedLock - # Call the existing extract_community function - await extract_community( - graph=graph, - tenant_id=kb.tenant_id, - kb_id=kb_id, - doc_id="api_call", # Use placeholder since this is a manual API call - llm_bdl=chat_model, - embed_bdl=embedding_model, - callback=progress_callback + # Acquire the same lock used by document parsing + graphrag_task_lock = RedisDistributedLock( + f"graphrag_task_{kb_id}", + lock_value="api_community_detection", + timeout=1200 ) - return graph + try: + await graphrag_task_lock.spin_acquire() + + chat_model = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) + embedding_model = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=None, lang=kb.language) + + # Call the existing extract_community function + await extract_community( + graph=graph, + tenant_id=kb.tenant_id, + kb_id=kb_id, + doc_id="api_call", # Use placeholder since this is a manual API call + llm_bdl=chat_model, + embed_bdl=embedding_model, + callback=progress_callback + ) + + return graph + finally: + graphrag_task_lock.release() # Execute the async function using trio updated_graph = trio.run(run_community_detection) @@ -714,7 +841,7 @@ def cleanup_progress(): @manager.route('//knowledge_graph/progress', methods=['GET']) # noqa: F821 @login_required -def get_community_detection_progress(kb_id): +def get_progress(kb_id): if not KnowledgebaseService.accessible(kb_id, current_user.id): return get_json_result( data=False, @@ -722,13 +849,21 @@ def get_community_detection_progress(kb_id): code=settings.RetCode.AUTHENTICATION_ERROR ) - # Get progress data for this kb_id - progress_data = community_detection_progress.get(kb_id, None) + # Check for operation type parameter + operation = request.args.get('operation', 'community_detection') + + # Get progress data for this kb_id based on operation type + if operation == 'entity_resolution': + progress_data = entity_resolution_progress.get(kb_id, None) + operation_name = 'entity resolution' + else: + progress_data = community_detection_progress.get(kb_id, None) + operation_name = 'community detection' if progress_data is None: return get_json_result( data=None, - message='No active community detection operation.', + message=f'No active {operation_name} operation.', code=settings.RetCode.SUCCESS ) @@ -737,3 +872,32 @@ def get_community_detection_progress(kb_id): message='Progress retrieved successfully.', code=settings.RetCode.SUCCESS ) + + +@manager.route('//document_parsing_status', methods=['GET']) # noqa: F821 +@login_required +def check_document_parsing_status(kb_id): + if not KnowledgebaseService.accessible(kb_id, current_user.id): + return get_json_result( + data=False, + message='No authorization.', + code=settings.RetCode.AUTHENTICATION_ERROR + ) + + try: + # Check if any documents are currently being parsed + is_parsing = DocumentService.has_documents_parsing(kb_id) + + return get_json_result( + data={"is_parsing": is_parsing}, + message='Document parsing status retrieved successfully.', + code=settings.RetCode.SUCCESS + ) + + except Exception as e: + logging.error(f"Failed to check document parsing status: {str(e)}") + return get_json_result( + data=False, + message=f'Failed to check document parsing status: {str(e)}', + code=settings.RetCode.SERVER_ERROR + ) diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index a9dfcb438a7..28a053ab145 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -606,6 +606,17 @@ def do_cancel(cls, doc_id): pass return False + @classmethod + @DB.connection_context() + def has_documents_parsing(cls, kb_id): + """Check if any documents in the knowledge base are currently being parsed.""" + docs = cls.model.select().where( + cls.model.kb_id == kb_id, + cls.model.run == TaskStatus.RUNNING.value, + cls.model.progress < 1 + ) + return docs.count() > 0 + def queue_raptor_o_graphrag_tasks(doc, ty, priority): chunking_config = DocumentService.get_chunking_config(doc["id"]) diff --git a/graphrag/entity_resolution.py b/graphrag/entity_resolution.py index 8d26335ca01..b48a5711e65 100644 --- a/graphrag/entity_resolution.py +++ b/graphrag/entity_resolution.py @@ -152,7 +152,7 @@ async def limited_merge_nodes(graph, nodes, change): ) async def _resolve_candidate(self, candidate_resolution_i: tuple[str, list[tuple[str, str]]], resolution_result: set[str], resolution_result_lock: trio.Lock): - gen_conf = {"temperature": 0.5} + gen_conf = {"temperature": 0.5, "max_tokens": 8000} pair_txt = [ f'When determining whether two {candidate_resolution_i[0]}s are the same, you should only focus on critical properties and overlook noisy factors.\n'] for index, candidate in enumerate(candidate_resolution_i[1]): diff --git a/graphrag/general/community_reports_extractor.py b/graphrag/general/community_reports_extractor.py index 4d8b33bfdfa..1f8a8a3c49e 100644 --- a/graphrag/general/community_reports_extractor.py +++ b/graphrag/general/community_reports_extractor.py @@ -87,7 +87,7 @@ async def extract_community_report(community): "relation_df": rela_df.to_csv(index_label="id") } text = perform_variable_replacements(self._extraction_prompt, variables=prompt_variables) - gen_conf = {"temperature": 0.3} + gen_conf = {"temperature": 0.3, "max_tokens": 8000} async with chat_limiter: try: with trio.move_on_after(120) as cancel_scope: diff --git a/rag/utils/redis_conn.py b/rag/utils/redis_conn.py index abfb26fb7da..43389c4d695 100644 --- a/rag/utils/redis_conn.py +++ b/rag/utils/redis_conn.py @@ -347,15 +347,20 @@ def __init__(self, lock_key, lock_value=None, timeout=10, blocking_timeout=1): self.lock = Lock(REDIS_CONN.REDIS, lock_key, timeout=timeout, blocking_timeout=blocking_timeout) def acquire(self): - REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) + # Don't delete existing locks - just try to acquire properly return self.lock.acquire(token=self.lock_value) async def spin_acquire(self): - REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) + # Don't delete existing locks - just try to acquire properly while True: if self.lock.acquire(token=self.lock_value): break - await trio.sleep(10) + await trio.sleep(1) # Reduced sleep time for faster acquisition def release(self): - REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) + # Properly release the underlying Redis lock + try: + self.lock.release() + except Exception as e: + # Fallback to delete if release fails + REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) diff --git a/web/src/hooks/knowledge-hooks.ts b/web/src/hooks/knowledge-hooks.ts index 2c5d3bc41f1..8d94a91f0cb 100644 --- a/web/src/hooks/knowledge-hooks.ts +++ b/web/src/hooks/knowledge-hooks.ts @@ -16,6 +16,8 @@ import kbService, { resolveEntities, detectCommunities, getCommunityDetectionProgress, + getEntityResolutionProgress, + checkDocumentParsing, } from '@/services/knowledge-service'; import { useInfiniteQuery, @@ -460,7 +462,7 @@ export const useFetchTagListByKnowledgeIds = () => { export function useFetchKnowledgeGraph() { const knowledgeBaseId = useKnowledgeBaseId(); - const { data, isFetching: loading } = useQuery({ + const { data, isFetching: loading, refetch } = useQuery({ queryKey: ['fetchKnowledgeGraph', knowledgeBaseId], initialData: { graph: {}, mind_map: {} } as IKnowledgeGraph, enabled: !!knowledgeBaseId, @@ -471,7 +473,7 @@ export function useFetchKnowledgeGraph() { }, }); - return { data, loading }; + return { data, loading, refetch }; } export const useRemoveKnowledgeGraph = () => { @@ -489,7 +491,7 @@ export const useRemoveKnowledgeGraph = () => { if (data.code === 0) { message.success(i18n.t(`message.deleted`)); queryClient.invalidateQueries({ - queryKey: ['fetchKnowledgeGraph'], + queryKey: ['fetchKnowledgeGraph', knowledgeBaseId], }); } return data?.code; @@ -501,6 +503,8 @@ export const useRemoveKnowledgeGraph = () => { export const useResolveEntities = () => { const knowledgeBaseId = useKnowledgeBaseId(); + const [progress, setProgress] = useState(null); + const pollingRef = useRef(null); const queryClient = useQueryClient(); const { @@ -510,18 +514,127 @@ export const useResolveEntities = () => { } = useMutation({ mutationKey: ['resolveEntities'], mutationFn: async () => { + // Start the entity resolution operation const { data } = await resolveEntities(knowledgeBaseId); + return data; + }, + onSuccess: (data) => { if (data.code === 0) { message.success(i18n.t(`knowledgeGraph.entityResolutionSuccess`, 'Entity resolution completed successfully')); queryClient.invalidateQueries({ - queryKey: ['fetchKnowledgeGraph'], + queryKey: ['fetchKnowledgeGraph', knowledgeBaseId], }); + + // Clear progress after a delay + setTimeout(() => setProgress(null), 3000); + } + }, + onError: () => { + setProgress(null); + // Clear any ongoing polling + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; } - return data; }, }); - return { data, loading, resolveEntities: mutateAsync }; + // Function to start polling + const startPolling = () => { + if (pollingRef.current) { + clearInterval(pollingRef.current); + } + + pollingRef.current = setInterval(async () => { + try { + const { data: progressData } = await getEntityResolutionProgress(knowledgeBaseId); + if (progressData.code === 0 && progressData.data) { + setProgress(progressData.data); + + // If status is completed, clear progress after a delay + if (progressData.data.current_status === 'completed') { + setTimeout(() => { + setProgress(null); + }, 10000); // Clear after 10 seconds + + // Stop polling since operation is completed + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; + } + } + } else if (progressData.code === 0 && progressData.data === null) { + // Operation completed or not running, stop polling + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; + } + } + } catch (error) { + console.error('Failed to fetch entity resolution progress:', error); + } + }, 1000); // Poll every second + }; + + // Check for ongoing operation on component mount + useEffect(() => { + const checkInitialProgress = async () => { + try { + const { data: progressData } = await getEntityResolutionProgress(knowledgeBaseId); + + if (progressData.code === 0 && progressData.data) { + setProgress(progressData.data); + + // If status is completed, clear progress after a delay + if (progressData.data.current_status === 'completed') { + setTimeout(() => { + setProgress(null); + }, 10000); // Clear after 10 seconds + } else { + // Start polling since operation is still ongoing + startPolling(); + } + } + } catch (error) { + console.error('Failed to check initial entity resolution progress:', error); + } + }; + + if (knowledgeBaseId) { + checkInitialProgress(); + } + }, [knowledgeBaseId]); + + // Start polling when mutation starts + useEffect(() => { + if (loading) { + // Reset progress at start + setProgress({ + total_pairs: 0, + processed_pairs: 0, + remaining_pairs: 0, + current_status: 'starting' + }); + + // Start polling for progress + startPolling(); + } else { + // Stop polling when mutation completes + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; + } + } + + return () => { + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; + } + }; + }, [loading, knowledgeBaseId]); + + return { data, loading, resolveEntities: mutateAsync, progress }; }; export const useDetectCommunities = () => { @@ -545,7 +658,7 @@ export const useDetectCommunities = () => { if (data.code === 0) { message.success(i18n.t(`knowledgeGraph.communityDetectionSuccess`, 'Community detection completed successfully')); queryClient.invalidateQueries({ - queryKey: ['fetchKnowledgeGraph'], + queryKey: ['fetchKnowledgeGraph', knowledgeBaseId], }); // Clear progress after a delay @@ -578,7 +691,7 @@ export const useDetectCommunities = () => { if (progressData.data.current_status === 'completed') { setTimeout(() => { setProgress(null); - }, 3000); // Clear after 3 seconds + }, 10000); // Clear after 10 seconds // Stop polling since operation is completed if (pollingRef.current) { @@ -612,7 +725,7 @@ export const useDetectCommunities = () => { if (progressData.data.current_status === 'completed') { setTimeout(() => { setProgress(null); - }, 3000); // Clear after 3 seconds + }, 10000); // Clear after 10 seconds } else { // Start polling since operation is still ongoing startPolling(); @@ -659,3 +772,52 @@ export const useDetectCommunities = () => { return { data, loading, detectCommunities: mutateAsync, progress }; }; + +export const useCheckDocumentParsing = () => { + const knowledgeBaseId = useKnowledgeBaseId(); + const [isParsing, setIsParsing] = useState(false); + const pollingRef = useRef(null); + + // Function to check parsing status + const checkParsing = async () => { + try { + const { data } = await checkDocumentParsing(knowledgeBaseId); + if (data.code === 0) { + setIsParsing(data.data.is_parsing); + } + } catch (error) { + console.error('Failed to check document parsing status:', error); + } + }; + + // Start polling + const startPolling = () => { + if (pollingRef.current) { + clearInterval(pollingRef.current); + } + + pollingRef.current = setInterval(checkParsing, 5000); // Poll every 5 seconds + }; + + // Stop polling + const stopPolling = () => { + if (pollingRef.current) { + clearInterval(pollingRef.current); + pollingRef.current = null; + } + }; + + // Effect to start polling when knowledge base ID changes + useEffect(() => { + if (knowledgeBaseId) { + checkParsing(); // Check immediately + startPolling(); // Start polling + } + + return () => { + stopPolling(); + }; + }, [knowledgeBaseId]); + + return { isParsing, checkParsing, startPolling, stopPolling }; +}; diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index d1fb8f1c363..1b2dc8f7f3a 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -1314,9 +1314,15 @@ This delimiter is used to split the input text into several text pieces echo of entityResolutionSuccess: 'Entity resolution completed successfully', communityDetectionSuccess: 'Community detection completed successfully', communityProgress: 'Community Detection', + entityProgress: 'Entity Resolution', communities: 'Communities', tokensUsed: 'Tokens Used', status: 'Status', + entityPairs: 'Entity Pairs', + remaining: 'Remaining', + documentsParsing: 'Documents are currently being parsed', + waitForParsing: 'Please wait for document parsing to complete', + operationBlocked: 'Operation temporarily unavailable during document parsing', }, }, }; diff --git a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx index 57a9a50bf63..7f8e706d89e 100644 --- a/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx +++ b/web/src/pages/add-knowledge/components/knowledge-graph/index.tsx @@ -1,6 +1,6 @@ import { ConfirmDeleteDialog } from '@/components/confirm-delete-dialog'; import { Button } from '@/components/ui/button'; -import { useFetchKnowledgeGraph, useResolveEntities, useDetectCommunities } from '@/hooks/knowledge-hooks'; +import { useFetchKnowledgeGraph, useResolveEntities, useDetectCommunities, useCheckDocumentParsing } from '@/hooks/knowledge-hooks'; import { Trash2, Network, Users } from 'lucide-react'; import React from 'react'; import { useTranslation } from 'react-i18next'; @@ -11,8 +11,9 @@ const KnowledgeGraph: React.FC = () => { const { data } = useFetchKnowledgeGraph(); const { t } = useTranslation(); const { handleDeleteKnowledgeGraph } = useDeleteKnowledgeGraph(); - const { resolveEntities, loading: resolvingEntities } = useResolveEntities(); + const { resolveEntities, loading: resolvingEntities, progress: entityProgress } = useResolveEntities(); const { detectCommunities, loading: detectingCommunities, progress: communityProgress } = useDetectCommunities(); + const { isParsing } = useCheckDocumentParsing(); const totalNodes = data?.graph?.total_nodes || 0; const totalEdges = data?.graph?.total_edges || 0; @@ -51,8 +52,9 @@ const KnowledgeGraph: React.FC = () => { variant="outline" size={'sm'} onClick={handleResolveEntities} - disabled={resolvingEntities || totalNodes === 0} + disabled={resolvingEntities || totalNodes === 0 || isParsing} className="flex items-center gap-2" + title={isParsing ? t('knowledgeGraph.waitForParsing', 'Please wait for document parsing to complete') : undefined} > {resolvingEntities ? t('knowledgeGraph.resolving', 'Resolving...') : t('knowledgeGraph.resolveEntities', 'Resolve Entities')} @@ -62,8 +64,9 @@ const KnowledgeGraph: React.FC = () => { variant="outline" size={'sm'} onClick={handleDetectCommunities} - disabled={detectingCommunities || totalNodes === 0} + disabled={detectingCommunities || totalNodes === 0 || isParsing} className="flex items-center gap-2" + title={isParsing ? t('knowledgeGraph.waitForParsing', 'Please wait for document parsing to complete') : undefined} > {detectingCommunities ? t('knowledgeGraph.detecting', 'Detecting...') : t('knowledgeGraph.detectCommunities', 'Detect Communities')} @@ -106,6 +109,39 @@ const KnowledgeGraph: React.FC = () => {
+ {/* Entity Resolution Progress */} + {entityProgress && ( +
+
+ {t('knowledgeGraph.entityProgress', 'Entity Resolution')} +
+
+ {entityProgress.total_pairs > 0 && ( +
+ {t('knowledgeGraph.entityPairs', 'Entity Pairs')}: + + {entityProgress.processed_pairs}/{entityProgress.total_pairs} + +
+ )} + {entityProgress.remaining_pairs > 0 && ( +
+ {t('knowledgeGraph.remaining', 'Remaining')}: + + {entityProgress.remaining_pairs.toLocaleString()} + +
+ )} +
+ {t('knowledgeGraph.status', 'Status')}: + + {entityProgress.current_status} + +
+
+
+ )} + {/* Community Detection Progress */} {communityProgress && (
@@ -138,6 +174,18 @@ const KnowledgeGraph: React.FC = () => {
)} + + {/* Document Parsing Status */} + {isParsing && ( +
+
+ {t('knowledgeGraph.documentsParsing', 'Documents Parsing')} +
+
+ {t('knowledgeGraph.waitForParsing', 'Please wait for document parsing to complete')} +
+
+ )} diff --git a/web/src/services/knowledge-service.ts b/web/src/services/knowledge-service.ts index cbdb8a92c86..032b1140b9e 100644 --- a/web/src/services/knowledge-service.ts +++ b/web/src/services/knowledge-service.ts @@ -194,6 +194,14 @@ export function getCommunityDetectionProgress(knowledgeId: string) { return request.get(api.getCommunityDetectionProgress(knowledgeId)); } +export function getEntityResolutionProgress(knowledgeId: string) { + return request.get(api.getEntityResolutionProgress(knowledgeId)); +} + +export function checkDocumentParsing(knowledgeId: string) { + return request.get(api.checkDocumentParsing(knowledgeId)); +} + export const listDataset = ( params?: IFetchKnowledgeListRequestParams, body?: IFetchKnowledgeListRequestBody, diff --git a/web/src/utils/api.ts b/web/src/utils/api.ts index 874c0f4be38..5d1a909bb0e 100644 --- a/web/src/utils/api.ts +++ b/web/src/utils/api.ts @@ -49,6 +49,10 @@ export default { `${api_host}/kb/${knowledgeId}/knowledge_graph/detect_communities`, getCommunityDetectionProgress: (knowledgeId: string) => `${api_host}/kb/${knowledgeId}/knowledge_graph/progress`, + getEntityResolutionProgress: (knowledgeId: string) => + `${api_host}/kb/${knowledgeId}/knowledge_graph/progress?operation=entity_resolution`, + checkDocumentParsing: (knowledgeId: string) => + `${api_host}/kb/${knowledgeId}/document_parsing_status`, // tags listTag: (knowledgeId: string) => `${api_host}/kb/${knowledgeId}/tags`, From db63bb50aea59946ca58df27978684548f6ad608 Mon Sep 17 00:00:00 2001 From: adrianad Date: Mon, 14 Jul 2025 10:03:17 +0200 Subject: [PATCH 13/22] Implement two-step GraphRAG workflow with manual entity extraction and graph building Add separate Extract Entities and Build Graph operations to replace automatic GraphRAG during parsing. Users can now manually control when to extract entities from documents and when to build the complete knowledge graph, with real-time progress tracking for both operations. --- api/apps/kb_app.py | 443 ++++++++++++++++++ api/db/services/document_service.py | 25 +- api/db/services/knowledgebase_service.py | 51 ++ api/utils/validation_utils.py | 6 +- graphrag/general/index.py | 303 ++++++++++++ graphrag/utils.py | 11 +- rag/svr/task_executor.py | 17 +- rag/utils/redis_conn.py | 17 +- .../use-default-parser-values.ts | 4 + .../graph-rag-form-fields.tsx | 58 --- .../parse-configuration/graph-rag-items.tsx | 14 - web/src/hooks/knowledge-hooks.ts | 284 +++++++++-- web/src/locales/en.ts | 12 + .../components/knowledge-graph/index.tsx | 188 +++++++- .../components/knowledge-sidebar/index.tsx | 17 +- web/src/services/knowledge-service.ts | 16 + web/src/utils/api.ts | 8 + 17 files changed, 1335 insertions(+), 139 deletions(-) diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index 8a18348452f..6d34678dbd5 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -47,6 +47,10 @@ community_detection_progress = {} entity_resolution_progress = {} +# Global progress storage for two-step graph creation +entity_extraction_progress = {} +graph_building_progress = {} + # Create manager blueprint if not already defined manager = Blueprint('kb', __name__) @@ -132,6 +136,26 @@ def update(): return get_data_error_result( message="Duplicated knowledgebase name.") + # Validate GraphRAG configuration if present + if "parser_config" in req and "graphrag" in req.get("parser_config", {}): + graphrag_config = req["parser_config"]["graphrag"] + + # Validate graphrag_mode enum + if "graphrag_mode" in graphrag_config: + valid_modes = ["none", "extract_only", "full_auto"] + if graphrag_config["graphrag_mode"] not in valid_modes: + return get_data_error_result( + message=f"Invalid graphrag_mode. Must be one of: {', '.join(valid_modes)}") + + # Warn about deprecated use_graphrag + if "use_graphrag" in graphrag_config: + # Convert legacy boolean to new enum format + use_graphrag = graphrag_config["use_graphrag"] + if isinstance(use_graphrag, bool): + new_mode = "full_auto" if use_graphrag else "none" + graphrag_config["graphrag_mode"] = new_mode + graphrag_config.pop("use_graphrag", None) + del req["kb_id"] if not KnowledgebaseService.update_by_id(kb.id, req): return get_data_error_result() @@ -839,6 +863,367 @@ def cleanup_progress(): ) +@manager.route('//knowledge_graph/extract_entities', methods=['POST']) # noqa: F821 +@login_required +def extract_entities(kb_id): + if not KnowledgebaseService.accessible(kb_id, current_user.id): + return get_json_result( + data=False, + message='No authorization.', + code=settings.RetCode.AUTHENTICATION_ERROR + ) + + try: + _, kb = KnowledgebaseService.get_by_id(kb_id) + + # Check if documents are currently being parsed + if DocumentService.has_documents_parsing(kb_id): + return get_json_result( + data=False, + message='Cannot extract entities while documents are being parsed. Please wait for parsing to complete.', + code=423 # HTTP 423 Locked + ) + + # Check if index exists + if not settings.docStoreConn.indexExist(search.index_name(kb.tenant_id), kb_id): + return get_json_result( + data=False, + message='Knowledge base index not found.', + code=settings.RetCode.DATA_ERROR + ) + + # Check if there are chunks to process + req = {"kb_id": [kb_id]} + sres = settings.retrievaler.search(req, search.index_name(kb.tenant_id), [kb_id]) + if not len(sres.ids): + return get_json_result( + data=False, + message='No documents found to extract entities from.', + code=settings.RetCode.DATA_ERROR + ) + + # Initialize progress tracking + entity_extraction_progress[kb_id] = { + "total_documents": 0, + "processed_documents": 0, + "entities_found": 0, + "current_status": "starting" + } + + # Set up callback for progress tracking + def progress_callback(progress=None, msg=""): + import re + # Handle both callback styles: callback(msg) and callback(progress, msg="text") + if progress is None: + # Old style: callback(msg) + pass + elif isinstance(progress, str): + # Old style: callback(msg) where first arg is actually the message + msg = progress + # else: New style: callback(progress_float, msg="text") - use msg parameter + + # Parse progress messages to update tracking + if "Starting entity extraction for" in msg: + match = re.search(r"(\d+) documents", msg) + if match: + entity_extraction_progress[kb_id]["total_documents"] = int(match.group(1)) + entity_extraction_progress[kb_id]["current_status"] = "processing" + elif "Document" in msg and "extracted" in msg: + # Parse: "Document doc_id: extracted X entities, Y relations" + match = re.search(r"extracted (\d+) entities", msg) + if match: + current_entities = entity_extraction_progress[kb_id].get("entities_found", 0) + entity_extraction_progress[kb_id]["entities_found"] = current_entities + int(match.group(1)) + entity_extraction_progress[kb_id]["processed_documents"] += 1 + elif "Entity extraction completed" in msg: + entity_extraction_progress[kb_id]["current_status"] = "completed" + + # Create LLM bundle + llm_bdl = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) + embed_bdl = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=kb.embd_id, lang=kb.language) + + # Run entity extraction in background + async def run_extraction(): + logging.info(f"Starting entity extraction for kb {kb_id}") + try: + from graphrag.general.index import generate_subgraph, merge_subgraph + from graphrag.light.graph_extractor import GraphExtractor as LightKGExt + from graphrag.general.graph_extractor import GraphExtractor as GeneralKGExt + + # Get GraphRAG configuration + graphrag_config = kb.parser_config.get("graphrag", {}) + entity_types = graphrag_config.get("entity_types", ["organization", "person", "location", "event", "time"]) + method = graphrag_config.get("method", "light") + logging.info(f"GraphRAG config: method={method}, entity_types={entity_types}") + + # Lock to prevent concurrent operations + from rag.utils.redis_conn import RedisDistributedLock + extract_lock = RedisDistributedLock(f"graph_extract_{kb_id}", lock_value="api_extract_entities", timeout=1200) + logging.info(f"Acquiring lock for kb {kb_id}") + await extract_lock.spin_acquire() + + try: + # Select extractor based on method (same as parsing workflow) + extractor_class = LightKGExt if method == "light" else GeneralKGExt + logging.info(f"Using extractor class: {extractor_class.__name__}") + + # Get documents that have chunks using DocumentService + from api.db.services.document_service import DocumentService + docs, count = DocumentService.get_by_kb_id(kb_id, 1, 1000, 'create_time', False, '', [], [], []) + docs_with_chunks = [doc for doc in docs if doc['chunk_num'] > 0] + logging.info(f"Found {len(docs_with_chunks)} documents with chunks (out of {count} total documents)") + + # Group chunks by document + doc_chunks = {} + total_docs = 0 + for doc in docs_with_chunks: + doc_id = doc['id'] + chunks = [] + for d in settings.retrievaler.chunk_list( + doc_id, kb.tenant_id, [kb_id], fields=["content_with_weight"] + ): + chunks.append(d["content_with_weight"]) + if chunks: + doc_chunks[doc_id] = chunks + total_docs += 1 + else: + logging.warning(f"Document {doc_id} ({doc['name']}) reports {doc['chunk_num']} chunks but chunk_list returned none") + + logging.info(f"Processing {total_docs} documents with chunks") + + if total_docs == 0: + logging.warning("No documents with chunks found to process") + entity_extraction_progress[kb_id]["current_status"] = "completed" + return + + # Update progress + entity_extraction_progress[kb_id]["total_documents"] = total_docs + entity_extraction_progress[kb_id]["current_status"] = "processing" + + processed_docs = 0 + total_entities = 0 + total_relations = 0 + + # Process each document - extract entities only (no graph building) + for doc_id, chunks in doc_chunks.items(): + logging.info(f"Processing document {doc_id} with {len(chunks)} chunks") + progress_callback(f"Processing document {doc_id}") + + # Extract entities and relations using GraphRAG extractor + ext = extractor_class( + llm_bdl, + language=kb.language, + entity_types=entity_types, + ) + ents, rels = await ext(doc_id, chunks, progress_callback) + + # Store entities and relations separately (no graph building yet) + from graphrag.utils import graph_node_to_chunk, graph_edge_to_chunk + + entity_chunks = [] + for ent in ents: + ent["source_id"] = [doc_id] + await graph_node_to_chunk(kb_id, embed_bdl, ent["entity_name"], ent, entity_chunks) + + relation_chunks = [] + for rel in rels: + rel["source_id"] = [doc_id] + # Only add relation if both entities exist + entity_names = {ent["entity_name"] for ent in ents} + if rel["src_id"] in entity_names and rel["tgt_id"] in entity_names: + await graph_edge_to_chunk(kb_id, embed_bdl, rel["src_id"], rel["tgt_id"], rel, relation_chunks) + + # Bulk insert entities and relations + if entity_chunks: + await trio.to_thread.run_sync( + lambda: settings.docStoreConn.insert( + entity_chunks, search.index_name(kb.tenant_id), kb_id + ) + ) + + if relation_chunks: + await trio.to_thread.run_sync( + lambda: settings.docStoreConn.insert( + relation_chunks, search.index_name(kb.tenant_id), kb_id + ) + ) + + processed_docs += 1 + total_entities += len(ents) + total_relations += len(relation_chunks) + + logging.info(f"Document {doc_id}: extracted {len(ents)} entities, {len(relation_chunks)} relations") + + # Update progress + entity_extraction_progress[kb_id].update({ + "processed_documents": processed_docs, + "entities_found": total_entities, + "relations_found": total_relations + }) + + progress_callback(f"Document {doc_id}: extracted {len(ents)} entities, {len(relation_chunks)} relations") + + # Final status + entity_extraction_progress[kb_id]["current_status"] = "completed" + progress_callback("Entity extraction completed") + logging.info(f"Entity extraction completed for kb {kb_id}: {total_entities} entities, {total_relations} relations") + + finally: + extract_lock.release() + logging.info(f"Released lock for kb {kb_id}") + + except Exception as e: + logging.exception(f"Entity extraction failed for kb {kb_id}: {str(e)}") + entity_extraction_progress[kb_id]["current_status"] = "failed" + raise + + # Start the extraction process + trio.run(run_extraction) + + return get_json_result( + data=True, + message='Entity extraction completed successfully.' + ) + + except Exception as e: + logging.exception(f"Extract entities failed for kb {kb_id}: {str(e)}") + # Clean up progress on failure + if kb_id in entity_extraction_progress: + del entity_extraction_progress[kb_id] + + return get_json_result( + data=False, + message=f'Entity extraction failed: {str(e)}', + code=settings.RetCode.SERVER_ERROR + ) + + +@manager.route('//knowledge_graph/build_graph', methods=['POST']) # noqa: F821 +@login_required +def build_graph(kb_id): + if not KnowledgebaseService.accessible(kb_id, current_user.id): + return get_json_result( + data=False, + message='No authorization.', + code=settings.RetCode.AUTHENTICATION_ERROR + ) + + try: + _, kb = KnowledgebaseService.get_by_id(kb_id) + + # Check if documents are currently being parsed + if DocumentService.has_documents_parsing(kb_id): + return get_json_result( + data=False, + message='Cannot build graph while documents are being parsed. Please wait for parsing to complete.', + code=423 # HTTP 423 Locked + ) + + # Check if index exists + if not settings.docStoreConn.indexExist(search.index_name(kb.tenant_id), kb_id): + return get_json_result( + data=False, + message='Knowledge base index not found.', + code=settings.RetCode.DATA_ERROR + ) + + # Check if entities exist + req = { + "kb_id": [kb_id], + "knowledge_graph_kwd": ["entity"] + } + sres = settings.retrievaler.search(req, search.index_name(kb.tenant_id), [kb_id]) + if not len(sres.ids): + return get_json_result( + data=False, + message='No entities found. Please extract entities first.', + code=settings.RetCode.DATA_ERROR + ) + + # Initialize progress tracking + graph_building_progress[kb_id] = { + "total_entities": 0, + "processed_entities": 0, + "relationships_created": 0, + "current_status": "starting" + } + + # Set up callback for progress tracking + def progress_callback(msg=""): + import re + # Parse progress messages to update tracking + if "Found" in msg and "entities and" in msg: + # Parse: "Found X entities and Y relations" + entity_match = re.search(r"Found (\d+) entities", msg) + if entity_match: + graph_building_progress[kb_id]["total_entities"] = int(entity_match.group(1)) + graph_building_progress[kb_id]["current_status"] = "processing" + elif "Built graph with" in msg: + # Parse: "Built graph with X nodes and Y edges" + edge_match = re.search(r"(\d+) edges", msg) + if edge_match: + graph_building_progress[kb_id]["relationships_created"] = int(edge_match.group(1)) + elif "Graph building completed" in msg: + graph_building_progress[kb_id]["current_status"] = "completed" + + # Create LLM bundle + llm_bdl = LLMBundle(kb.tenant_id, LLMType.CHAT, llm_name=None, lang=kb.language) + embed_bdl = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=kb.embd_id, lang=kb.language) + + # Run graph building in background + async def run_build(): + try: + from graphrag.general.index import build_graph_from_entities + + # Lock to prevent concurrent operations + from rag.utils.redis_conn import RedisDistributedLock + build_lock = RedisDistributedLock(f"graph_build_{kb_id}", lock_value="api_build_graph", timeout=1200) + await build_lock.spin_acquire() + + try: + result = await build_graph_from_entities( + tenant_id=kb.tenant_id, + kb_id=kb_id, + embed_bdl=embed_bdl, + callback=progress_callback + ) + finally: + build_lock.release() + + # Update final progress from result + graph_building_progress[kb_id].update({ + "total_entities": result["total_entities"], + "processed_entities": result["processed_entities"], + "relationships_created": result["relationships_created"], + "current_status": result["status"] + }) + + except Exception as e: + logging.exception(f"Graph building failed for kb {kb_id}: {str(e)}") + graph_building_progress[kb_id]["current_status"] = "failed" + raise + + # Start the graph building process + trio.run(run_build) + + return get_json_result( + data=True, + message='Graph building completed successfully.' + ) + + except Exception as e: + logging.exception(f"Build graph failed for kb {kb_id}: {str(e)}") + # Clean up progress on failure + if kb_id in graph_building_progress: + del graph_building_progress[kb_id] + + return get_json_result( + data=False, + message=f'Graph building failed: {str(e)}', + code=settings.RetCode.SERVER_ERROR + ) + + @manager.route('//knowledge_graph/progress', methods=['GET']) # noqa: F821 @login_required def get_progress(kb_id): @@ -856,6 +1241,12 @@ def get_progress(kb_id): if operation == 'entity_resolution': progress_data = entity_resolution_progress.get(kb_id, None) operation_name = 'entity resolution' + elif operation == 'entity_extraction': + progress_data = entity_extraction_progress.get(kb_id, None) + operation_name = 'entity extraction' + elif operation == 'graph_building': + progress_data = graph_building_progress.get(kb_id, None) + operation_name = 'graph building' else: progress_data = community_detection_progress.get(kb_id, None) operation_name = 'community detection' @@ -874,6 +1265,58 @@ def get_progress(kb_id): ) +@manager.route('//knowledge_graph/extract_entities/progress', methods=['GET']) # noqa: F821 +@login_required +def get_extraction_progress(kb_id): + if not KnowledgebaseService.accessible(kb_id, current_user.id): + return get_json_result( + data=False, + message='No authorization.', + code=settings.RetCode.AUTHENTICATION_ERROR + ) + + progress_data = entity_extraction_progress.get(kb_id, None) + + if progress_data is None: + return get_json_result( + data=None, + message='No active entity extraction operation.', + code=settings.RetCode.SUCCESS + ) + + return get_json_result( + data=progress_data, + message='Entity extraction progress retrieved successfully.', + code=settings.RetCode.SUCCESS + ) + + +@manager.route('//knowledge_graph/build_graph/progress', methods=['GET']) # noqa: F821 +@login_required +def get_build_progress(kb_id): + if not KnowledgebaseService.accessible(kb_id, current_user.id): + return get_json_result( + data=False, + message='No authorization.', + code=settings.RetCode.AUTHENTICATION_ERROR + ) + + progress_data = graph_building_progress.get(kb_id, None) + + if progress_data is None: + return get_json_result( + data=None, + message='No active graph building operation.', + code=settings.RetCode.SUCCESS + ) + + return get_json_result( + data=progress_data, + message='Graph building progress retrieved successfully.', + code=settings.RetCode.SUCCESS + ) + + @manager.route('//document_parsing_status', methods=['GET']) # noqa: F821 @login_required def check_document_parsing_status(kb_id): diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 28a053ab145..04bec1faeb8 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -567,7 +567,7 @@ def update_progress(cls): if (d["parser_config"].get("raptor") or {}).get("use_raptor") and not has_raptor: queue_raptor_o_graphrag_tasks(d, "raptor", priority) prg = 0.98 * len(tsks) / (len(tsks) + 1) - elif (d["parser_config"].get("graphrag") or {}).get("use_graphrag") and not has_graphrag: + elif _should_auto_queue_graphrag(d["parser_config"]) and not has_graphrag: queue_raptor_o_graphrag_tasks(d, "graphrag", priority) prg = 0.98 * len(tsks) / (len(tsks) + 1) else: @@ -618,6 +618,29 @@ def has_documents_parsing(cls, kb_id): return docs.count() > 0 +def _should_auto_queue_graphrag(parser_config): + """ + Determine if GraphRAG tasks should be automatically queued after parsing. + + With the new UI, GraphRAG is now manual-only (extract_only mode): + - New enum graphrag_mode="full_auto" -> auto queue + - New enum graphrag_mode="extract_only" -> no auto queue + - New enum graphrag_mode="none" -> no auto queue + - Legacy boolean use_graphrag=true -> no auto queue (changed behavior) + - Default -> no auto queue + """ + graphrag_config = parser_config.get("graphrag", {}) + + # Check for new enum format first + graphrag_mode = graphrag_config.get("graphrag_mode") + if graphrag_mode is not None: + return graphrag_mode == "full_auto" + + # Legacy format no longer triggers auto-queuing since toggle was removed + # This allows existing configs to work but use manual GraphRAG workflow + return False + + def queue_raptor_o_graphrag_tasks(doc, ty, priority): chunking_config = DocumentService.get_chunking_config(doc["id"]) hasher = xxhash.xxh64() diff --git a/api/db/services/knowledgebase_service.py b/api/db/services/knowledgebase_service.py index 454bdbdc746..433e83e7fa2 100644 --- a/api/db/services/knowledgebase_service.py +++ b/api/db/services/knowledgebase_service.py @@ -436,3 +436,54 @@ def update_document_number_in_init(cls, kb_id, doc_num): else: raise e + @classmethod + @DB.connection_context() + def migrate_graphrag_config(cls): + """ + Migrate existing knowledge bases from legacy boolean use_graphrag + to new enum graphrag_mode format. + + Migration rules: + - use_graphrag: true -> graphrag_mode: "full_auto" + - use_graphrag: false -> graphrag_mode: "none" + - Missing use_graphrag -> graphrag_mode: "none" + - Already has graphrag_mode -> no change + """ + migrated_count = 0 + + # Get all knowledge bases + kbs = cls.model.select() + + for kb in kbs: + parser_config = kb.parser_config or {} + graphrag_config = parser_config.get("graphrag", {}) + + # Skip if already has new enum format + if "graphrag_mode" in graphrag_config: + continue + + # Check for legacy boolean format + use_graphrag = graphrag_config.get("use_graphrag") + + if isinstance(use_graphrag, bool): + # Migrate from boolean to enum + new_mode = "full_auto" if use_graphrag else "none" + graphrag_config["graphrag_mode"] = new_mode + + # Remove old boolean field + graphrag_config.pop("use_graphrag", None) + + # Update the knowledge base + parser_config["graphrag"] = graphrag_config + cls.update_by_id(kb.id, {"parser_config": parser_config}) + migrated_count += 1 + + elif use_graphrag is None: + # No legacy config, set default + graphrag_config["graphrag_mode"] = "none" + parser_config["graphrag"] = graphrag_config + cls.update_by_id(kb.id, {"parser_config": parser_config}) + migrated_count += 1 + + return migrated_count + diff --git a/api/utils/validation_utils.py b/api/utils/validation_utils.py index d87d8945d7a..2035d277672 100644 --- a/api/utils/validation_utils.py +++ b/api/utils/validation_utils.py @@ -353,11 +353,11 @@ class RaptorConfig(Base): class GraphragConfig(Base): - use_graphrag: bool = Field(default=False) + use_graphrag: bool = Field(default=False) # Default off - GraphRAG is manual-only via Knowledge Graph UI entity_types: list[str] = Field(default_factory=lambda: ["organization", "person", "geo", "event", "category"]) method: GraphragMethodEnum = Field(default=GraphragMethodEnum.light) - community: bool = Field(default=False) - resolution: bool = Field(default=False) + community: bool = Field(default=False) # Will be handled via Knowledge Graph UI + resolution: bool = Field(default=False) # Will be handled via Knowledge Graph UI class ParserConfig(Base): diff --git a/graphrag/general/index.py b/graphrag/general/index.py index 6e107bc87a2..3b4dfc9036a 100644 --- a/graphrag/general/index.py +++ b/graphrag/general/index.py @@ -33,6 +33,8 @@ does_graph_contains, tidy_graph, GraphChange, + graph_node_to_chunk, + graph_edge_to_chunk, ) from rag.nlp import rag_tokenizer, search from rag.utils.redis_conn import RedisDistributedLock @@ -319,3 +321,304 @@ async def extract_community( msg=f"Graph indexed {len(cr.structured_output)} communities in {now - start:.2f}s." ) return community_structure, community_reports + + +async def extract_entities_only( + tenant_id: str, + kb_id: str, + doc_ids: list[str], # Process specific documents or all if empty + language: str, + entity_types: list[str], + method: str, # "light" or "general" + llm_bdl, + embed_bdl, + callback +) -> dict: + """ + Extract entities and relations from documents without building the full graph. + Stores entities with knowledge_graph_kwd: "entity" and relations with "relation". + """ + start = trio.current_time() + + # Select extractor based on method + extractor_class = LightKGExt if method == "light" else GeneralKGExt + + # Get document chunks to process + if doc_ids: + # Process specific documents + chunks_data = [] + for doc_id in doc_ids: + for d in settings.retrievaler.chunk_list( + doc_id, tenant_id, [kb_id], fields=["content_with_weight", "doc_id"] + ): + chunks_data.append((doc_id, d["content_with_weight"])) + else: + # Process all documents in knowledge base + chunks_data = [] + # Get all doc_ids for this kb + doc_conds = { + "size": 10000, # Large number to get all docs + "kb_id": kb_id + } + doc_res = await trio.to_thread.run_sync( + lambda: settings.retrievaler.search(doc_conds, search.index_name(tenant_id), [kb_id]) + ) + processed_docs = set() + for doc_id in doc_res.ids: + if doc_id not in processed_docs: + processed_docs.add(doc_id) + for d in settings.retrievaler.chunk_list( + doc_id, tenant_id, [kb_id], fields=["content_with_weight", "doc_id"] + ): + chunks_data.append((doc_id, d["content_with_weight"])) + + if not chunks_data: + callback(msg="No documents found to process") + return { + "total_documents": 0, + "processed_documents": 0, + "entities_found": 0, + "relations_found": 0, + "status": "completed" + } + + # Group chunks by document + doc_chunks = {} + for doc_id, chunk_content in chunks_data: + if doc_id not in doc_chunks: + doc_chunks[doc_id] = [] + doc_chunks[doc_id].append(chunk_content) + + total_documents = len(doc_chunks) + processed_documents = 0 + total_entities = 0 + total_relations = 0 + + callback(msg=f"Starting entity extraction for {total_documents} documents") + + # Process each document + for doc_id, chunks in doc_chunks.items(): + try: + # Check if entities already exist for this document + contains = await does_graph_contains(tenant_id, kb_id, doc_id) + if contains: + callback(msg=f"Entities already exist for document {doc_id}, skipping") + processed_documents += 1 + continue + + callback(msg=f"Extracting entities from document {doc_id}") + + # Extract entities and relations + ext = extractor_class( + llm_bdl, + language=language, + entity_types=entity_types, + ) + ents, rels = await ext(doc_id, chunks, callback) + + # Store entities + entity_chunks = [] + for ent in ents: + assert "description" in ent, f"entity {ent} does not have description" + ent["source_id"] = [doc_id] + await graph_node_to_chunk(kb_id, embed_bdl, ent["entity_name"], ent, entity_chunks) + + # Store relations + relation_chunks = [] + for rel in rels: + assert "description" in rel, f"relation {rel} does not have description" + rel["source_id"] = [doc_id] + await graph_edge_to_chunk(kb_id, embed_bdl, rel["src_id"], rel["tgt_id"], rel, relation_chunks) + + # Bulk insert entities + if entity_chunks: + await trio.to_thread.run_sync( + lambda: settings.docStoreConn.insert( + entity_chunks, search.index_name(tenant_id), kb_id + ) + ) + + # Bulk insert relations + if relation_chunks: + await trio.to_thread.run_sync( + lambda: settings.docStoreConn.insert( + relation_chunks, search.index_name(tenant_id), kb_id + ) + ) + + total_entities += len(ents) + total_relations += len(rels) + processed_documents += 1 + + callback(msg=f"Document {doc_id}: extracted {len(ents)} entities, {len(rels)} relations") + + except Exception as e: + callback(msg=f"Error processing document {doc_id}: {str(e)}") + processed_documents += 1 + continue + + now = trio.current_time() + callback(msg=f"Entity extraction completed in {now - start:.2f} seconds") + + return { + "total_documents": total_documents, + "processed_documents": processed_documents, + "entities_found": total_entities, + "relations_found": total_relations, + "status": "completed" + } + + +async def build_graph_from_entities( + tenant_id: str, + kb_id: str, + embed_bdl, + callback +) -> dict: + """ + Build complete NetworkX graph from pre-extracted entities and relations. + Calculates PageRank and stores the final graph. + """ + start = trio.current_time() + + callback(msg="Starting graph building from extracted entities") + + # Query all entities + entity_conds = { + "fields": ["entity_kwd", "content_with_weight", "source_id"], + "size": 10000, # Large number to get all entities + "knowledge_graph_kwd": ["entity"], + "kb_id": kb_id + } + + entity_res = await trio.to_thread.run_sync( + lambda: settings.retrievaler.search(entity_conds, search.index_name(tenant_id), [kb_id]) + ) + + if entity_res.total == 0: + callback(msg="No entities found to build graph") + return { + "total_entities": 0, + "processed_entities": 0, + "relationships_created": 0, + "status": "failed", + "error": "No entities found" + } + + # Query all relations + relation_conds = { + "fields": ["from_entity_kwd", "to_entity_kwd", "content_with_weight", "source_id"], + "size": 10000, # Large number to get all relations + "knowledge_graph_kwd": ["relation"], + "kb_id": kb_id + } + + relation_res = await trio.to_thread.run_sync( + lambda: settings.retrievaler.search(relation_conds, search.index_name(tenant_id), [kb_id]) + ) + + callback(msg=f"Found {entity_res.total} entities and {relation_res.total} relations") + + # Build NetworkX graph + graph = nx.Graph() + processed_entities = 0 + relationships_created = 0 + all_source_ids = set() + + # Add nodes from entities + for entity_id in entity_res.ids: + try: + entity_data = entity_res.field[entity_id] + entity_name = entity_data["entity_kwd"] + entity_meta = json.loads(entity_data["content_with_weight"]) + + # Add source_ids to track document origins + if "source_id" in entity_data: + source_ids = entity_data["source_id"] + if isinstance(source_ids, str): + source_ids = [source_ids] + entity_meta["source_id"] = source_ids + all_source_ids.update(source_ids) + + graph.add_node(entity_name, **entity_meta) + processed_entities += 1 + + except Exception as e: + callback(msg=f"Error processing entity {entity_id}: {str(e)}") + continue + + # Add edges from relations + for relation_id in relation_res.ids: + try: + relation_data = relation_res.field[relation_id] + from_entity = relation_data["from_entity_kwd"] + to_entity = relation_data["to_entity_kwd"] + relation_meta = json.loads(relation_data["content_with_weight"]) + + # Only add edge if both nodes exist + if graph.has_node(from_entity) and graph.has_node(to_entity): + # Add source_ids to track document origins + if "source_id" in relation_data: + source_ids = relation_data["source_id"] + if isinstance(source_ids, str): + source_ids = [source_ids] + relation_meta["source_id"] = source_ids + all_source_ids.update(source_ids) + + graph.add_edge(from_entity, to_entity, **relation_meta) + relationships_created += 1 + + except Exception as e: + callback(msg=f"Error processing relation {relation_id}: {str(e)}") + continue + + if len(graph.nodes) == 0: + callback(msg="No valid graph nodes created") + return { + "total_entities": entity_res.total, + "processed_entities": 0, + "relationships_created": 0, + "status": "failed", + "error": "No valid nodes created" + } + + # Set graph metadata + graph.graph["source_id"] = sorted(list(all_source_ids)) + + callback(msg=f"Built graph with {len(graph.nodes)} nodes and {len(graph.edges)} edges") + + # Calculate PageRank on the complete graph + callback(msg="Calculating PageRank...") + pagerank_start = trio.current_time() + + pr = nx.pagerank(graph) + for node_name, pagerank in pr.items(): + graph.nodes[node_name]["pagerank"] = pagerank + + pagerank_end = trio.current_time() + callback(msg=f"PageRank calculation completed in {pagerank_end - pagerank_start:.2f} seconds") + + # Store the complete graph + callback(msg="Storing complete graph...") + store_start = trio.current_time() + + change = GraphChange() + change.added_updated_nodes = set(graph.nodes()) + change.added_updated_edges = set(graph.edges()) + + await set_graph(tenant_id, kb_id, embed_bdl, graph, change, callback) + + store_end = trio.current_time() + callback(msg=f"Graph storage completed in {store_end - store_start:.2f} seconds") + + now = trio.current_time() + callback(msg=f"Graph building completed in {now - start:.2f} seconds") + + return { + "total_entities": entity_res.total, + "processed_entities": processed_entities, + "relationships_created": relationships_created, + "total_nodes": len(graph.nodes), + "total_edges": len(graph.edges), + "status": "completed" + } diff --git a/graphrag/utils.py b/graphrag/utils.py index 81df2a24b4d..495293397e6 100644 --- a/graphrag/utils.py +++ b/graphrag/utils.py @@ -354,16 +354,21 @@ def get_relation(tenant_id, kb_id, from_ent_name, to_ent_name, size=1): async def graph_edge_to_chunk(kb_id, embd_mdl, from_ent_name, to_ent_name, meta, chunks): + # Handle missing fields with defaults + description = meta.get("description", f"Relationship between {from_ent_name} and {to_ent_name}") + keywords = meta.get("keywords", []) + weight = meta.get("weight", 1.0) + chunk = { "id": get_uuid(), "from_entity_kwd": from_ent_name, "to_entity_kwd": to_ent_name, "knowledge_graph_kwd": "relation", "content_with_weight": json.dumps(meta, ensure_ascii=False), - "content_ltks": rag_tokenizer.tokenize(meta["description"]), - "important_kwd": meta["keywords"], + "content_ltks": rag_tokenizer.tokenize(description), + "important_kwd": keywords, "source_id": meta["source_id"], - "weight_int": int(meta["weight"]), + "weight_int": int(weight), "kb_id": kb_id, "available_int": 0 } diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index adcaa4c238a..0066a5f5272 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -553,9 +553,20 @@ async def do_handle_task(task): chunks, token_count = await run_raptor(task, chat_model, embedding_model, vector_size, progress_callback) # Either using graphrag or Standard chunking methods elif task.get("task_type", "") == "graphrag": - if not task_parser_config.get("graphrag", {}).get("use_graphrag", False): - progress_callback(prog=-1.0, msg="Internal configuration error.") - return + graphrag_config = task_parser_config.get("graphrag", {}) + + # Check new enum format first, then fall back to legacy boolean + graphrag_mode = graphrag_config.get("graphrag_mode") + if graphrag_mode is not None: + # New enum format: must be "extract_only" or "full_auto" to run GraphRAG + if graphrag_mode not in ["extract_only", "full_auto"]: + progress_callback(prog=-1.0, msg="GraphRAG not enabled for this knowledge base.") + return + else: + # Legacy boolean format for backward compatibility + if not graphrag_config.get("use_graphrag", False): + progress_callback(prog=-1.0, msg="Internal configuration error.") + return graphrag_conf = task["kb_parser_config"].get("graphrag", {}) start_ts = timer() chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language) diff --git a/rag/utils/redis_conn.py b/rag/utils/redis_conn.py index 43389c4d695..7cdc8748d10 100644 --- a/rag/utils/redis_conn.py +++ b/rag/utils/redis_conn.py @@ -337,13 +337,16 @@ def delete(self, key) -> bool: class RedisDistributedLock: - def __init__(self, lock_key, lock_value=None, timeout=10, blocking_timeout=1): + def __init__(self, lock_key, lock_value=None, timeout=10, blocking_timeout=None): self.lock_key = lock_key if lock_value: self.lock_value = lock_value else: self.lock_value = str(uuid.uuid4()) self.timeout = timeout + # If no blocking_timeout specified, use the same as lock timeout for proper waiting + if blocking_timeout is None: + blocking_timeout = timeout self.lock = Lock(REDIS_CONN.REDIS, lock_key, timeout=timeout, blocking_timeout=blocking_timeout) def acquire(self): @@ -351,11 +354,13 @@ def acquire(self): return self.lock.acquire(token=self.lock_value) async def spin_acquire(self): - # Don't delete existing locks - just try to acquire properly - while True: - if self.lock.acquire(token=self.lock_value): - break - await trio.sleep(1) # Reduced sleep time for faster acquisition + # Use blocking acquisition with configured timeout instead of manual spinning + acquired = await trio.to_thread.run_sync( + lambda: self.lock.acquire(blocking=True, token=self.lock_value) + ) + if not acquired: + raise TimeoutError(f"Failed to acquire lock '{self.lock_key}' within {self.timeout} seconds") + return True def release(self): # Properly release the underlying Redis lock diff --git a/web/src/components/chunk-method-dialog/use-default-parser-values.ts b/web/src/components/chunk-method-dialog/use-default-parser-values.ts index 6b52b50fd24..1b220c493cb 100644 --- a/web/src/components/chunk-method-dialog/use-default-parser-values.ts +++ b/web/src/components/chunk-method-dialog/use-default-parser-values.ts @@ -25,6 +25,10 @@ export function useDefaultParserValues() { }, graphrag: { use_graphrag: false, + entity_types: ['organization', 'person', 'geo', 'event', 'category'], + method: 'light', + community: false, + resolution: false, }, entity_types: [], pages: [], diff --git a/web/src/components/parse-configuration/graph-rag-form-fields.tsx b/web/src/components/parse-configuration/graph-rag-form-fields.tsx index fddc65cf614..e822b280721 100644 --- a/web/src/components/parse-configuration/graph-rag-form-fields.tsx +++ b/web/src/components/parse-configuration/graph-rag-form-fields.tsx @@ -152,64 +152,6 @@ const GraphRagItems = ({ )} /> - - ( - -
- - {t('resolution')} - -
- - - -
-
-
-
- -
-
- )} - /> - - ( - -
- - {t('community')} - -
- - - -
-
-
-
- -
-
- )} - /> )} diff --git a/web/src/components/parse-configuration/graph-rag-items.tsx b/web/src/components/parse-configuration/graph-rag-items.tsx index 04e3f015024..4ddf7e5844d 100644 --- a/web/src/components/parse-configuration/graph-rag-items.tsx +++ b/web/src/components/parse-configuration/graph-rag-items.tsx @@ -112,20 +112,6 @@ const GraphRagItems = ({ marginBottom = false }: GraphRagItemsProps) => { >