Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions semantikon/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import ast
import inspect
import re
import string
import sys
import textwrap
import warnings
Expand Down Expand Up @@ -528,3 +529,28 @@ def wrapper(*args, **kwargs):
return wrapper

return decorator


def to_identifier(s: str) -> str:
"""
Convert an arbitrary string into a valid Python identifier by
replacing non-identifier characters with underscores.

Rules applied:
- ASCII letters, digits, and underscores are preserved
- All other characters are replaced with '_'
- If the identifier would start with a digit, prefix it with '_'

Args:
s (str): Input string.

Returns:
str: A valid Python identifier.
"""
allowed = string.ascii_letters + string.digits + "_"
result = "".join(c if c in allowed else "_" for c in s)

if not result or result[0].isdigit():
result = "_" + result

return result
135 changes: 105 additions & 30 deletions semantikon/ontology.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
meta_to_dict,
parse_input_args,
parse_output_args,
to_identifier,
)
from semantikon.metadata import SemantikonURI
from semantikon.qudt import UnitsDict
Expand Down Expand Up @@ -1392,11 +1393,31 @@ def query_io_completer(graph: Graph) -> Completer:


class SparqlWriter:
"""
A class for generating and executing SPARQL queries based on a graph structure.
"""

def __init__(self, graph: Graph):
self.graph = graph
"""
Initialize the SparqlWriter with a given RDFLib graph.

Args:
graph (Graph): An RDFLib graph containing the ontology or data to query.
"""
self._graph = graph

@cached_property
def G(self) -> nx.DiGraph:
"""
Construct a directed graph (DiGraph) representation of the ontology.

The graph is built by querying the RDFLib graph for subclass relationships
and OWL restrictions. Each edge in the graph represents a relationship
between a parent and child class, with the predicate stored as edge data.

Returns:
nx.DiGraph: A directed graph representing the ontology structure.
"""
query = """
SELECT ?parent ?property ?child WHERE {
?parent rdfs:subClassOf ?bnode .
Expand All @@ -1405,51 +1426,105 @@ def G(self) -> nx.DiGraph:
?bnode owl:someValuesFrom ?child .
}"""
G = nx.DiGraph()
for subj, pred, obj in self.graph.query(query):
for subj, pred, obj in self._graph.query(query):
G.add_edge(subj, obj, predicate=pred)
return G

@staticmethod
def _hash(u: str | URIRef) -> str:
return sha256(u.encode()).hexdigest()
def get_query_graph(self, *args) -> nx.DiGraph:
"""
Generate a query graph based on the provided arguments.

def to_query(self, u: URIRef, v: URIRef, predicate: URIRef) -> str:
return f"?t_{self._hash(u)} <{predicate}> ?t_{self._hash(v)} .\n"
The query graph is a directed graph (DiGraph) where nodes represent
data elements and edges represent relationships between them. This graph
can be used to generate SPARQL query text.

def _to_leaf_condition(self, c: str, uri: URIRef) -> str:
code = self._hash(uri)
return f"?t_{code} rdf:value ?{c} .\n?t_{code} a <{uri}> .\n"
Args:
*args: A variable number of arguments representing nodes in the graph.
Each argument can be an RDFLib node or a value.

def query(self, *args) -> list[list[Any]]:
Returns:
nx.DiGraph: A directed graph representing the query structure.
"""
G = nx.DiGraph()
data_nodes = []
variables = []
leaf_conditions = []
for ii, arg in enumerate(args):
if isinstance(arg, _Node):
arg = arg.value()
data_nodes.append(list(self.G.successors(arg))[0])
var = f"var_{ii}"
variables.append(f"?{var}")
leaf_conditions.append(self._to_leaf_condition(var, data_nodes[-1]))
query_text = ""
G.add_node(BNode(data_nodes[-1] + "_value"), output=True)
G.add_edge(BNode(data_nodes[-1]), data_nodes[-1], predicate="a")
G.add_edge(
BNode(data_nodes[-1]),
BNode(data_nodes[-1] + "_value"),
predicate="rdf:value",
)
if len(data_nodes) > 1:
for u, v in zip(data_nodes[:-1], data_nodes[1:]):
paths = nx.shortest_path(self.G.to_undirected(), u, v)
for uu, vv in zip(paths[:-1], paths[1:]):
if self.G.has_edge(uu, vv):
query_text += self.to_query(
uu, vv, self.G.edges[uu, vv]["predicate"]
G.add_edge(
BNode(uu),
BNode(vv),
predicate=self.G.edges[uu, vv]["predicate"],
)
else:
query_text += self.to_query(
vv, uu, self.G.edges[vv, uu]["predicate"]
G.add_edge(
BNode(vv),
BNode(uu),
predicate=self.G.edges[vv, uu]["predicate"],
)
total_query = (
"SELECT "
+ " ".join(variables)
+ " WHERE {\n"
+ query_text
+ "".join(leaf_conditions)
+ "}"
)
return [[a.toPython() for a in item] for item in self.graph.query(total_query)]
return G

@staticmethod
def get_query_text(G: nx.DiGraph) -> str:
"""
Convert a query graph into SPARQL query text.

This method takes a directed graph (DiGraph) representing a query structure
and generates the corresponding SPARQL query text.

Args:
G (nx.DiGraph): A directed graph representing the query structure.

Returns:
str: The SPARQL query text.
"""
output_args = [
f"?{to_identifier(node)}"
for node, data in G.nodes.data()
if data.get("output", False)
]
lines = ["SELECT " + " ".join(output_args) + " WHERE {"]
for subj, obj, data in G.edges.data():
subj, obj = [
f"<{e}>" if isinstance(e, URIRef) else f"?{to_identifier(e)}"
for e in [subj, obj]
]
pred = (
f"<{data['predicate']}>"
if isinstance(data["predicate"], URIRef)
else data["predicate"]
)
lines.append(f"{subj} {pred} {obj} .")
lines.append("}")
return "\n".join(lines)

def query(self, *args) -> list[list[Any]]:
"""
Execute a SPARQL query based on the provided arguments.

This method generates a query graph, converts it into SPARQL query text,
and executes the query on the RDFLib graph.

Args:
*args: A variable number of arguments representing nodes in the graph.
Each argument can be an RDFLib node or a value.

Returns:
list[list[Any]]: A list of query results, where each result is a list
of values corresponding to the query's output variables.
"""
G = self.get_query_graph(*args)
text = self.get_query_text(G)
return [[a.toPython() for a in item] for item in self._graph.query(text)]
4 changes: 2 additions & 2 deletions tests/unit/test_ontology.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ def test_function_to_knowledge_graph(self):
+ """
SELECT ?label WHERE {
?function iao:0000136 ex:get_kinetic_energy .
?function rdfs:label ?label
?function rdfs:label ?label .
}"""
)
self.assertEqual(list(g.query(query))[0][0].toPython(), "get_kinetic_energy")
Expand All @@ -818,7 +818,7 @@ def test_function_to_knowledge_graph(self):
+ """
SELECT ?label WHERE {
?function sns:has_parameter_specification ?bnode .
?bnode rdfs:label ?label
?bnode rdfs:label ?label .
}"""
)
g = onto.function_to_knowledge_graph(prepare_pizza)
Expand Down
7 changes: 7 additions & 0 deletions tests/unit/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
parse_input_args,
parse_metadata,
parse_output_args,
to_identifier,
with_explicit_defaults,
)
from semantikon.metadata import meta, u
Expand Down Expand Up @@ -273,6 +274,12 @@ def g(a, x=2, y=1):
def dead_function(a, x=2, y=1):
return a + x + y

def test_to_identifier(self):
self.assertEqual(to_identifier("valid_name"), "valid_name")
self.assertEqual(to_identifier("123invalid"), "_123invalid")
self.assertEqual(to_identifier("invalid-name!"), "invalid_name_")
self.assertEqual(to_identifier("name with spaces"), "name_with_spaces")


if __name__ == "__main__":
unittest.main()
Loading