diff --git a/semantikon/converter.py b/semantikon/converter.py index a70f3f5a..efdba3ff 100644 --- a/semantikon/converter.py +++ b/semantikon/converter.py @@ -5,6 +5,7 @@ import ast import inspect import re +import string import sys import textwrap import warnings @@ -528,3 +529,28 @@ def wrapper(*args, **kwargs): return wrapper return decorator + + +def to_identifier(s: str) -> str: + """ + Convert an arbitrary string into a valid Python identifier by + replacing non-identifier characters with underscores. + + Rules applied: + - ASCII letters, digits, and underscores are preserved + - All other characters are replaced with '_' + - If the identifier would start with a digit, prefix it with '_' + + Args: + s (str): Input string. + + Returns: + str: A valid Python identifier. + """ + allowed = string.ascii_letters + string.digits + "_" + result = "".join(c if c in allowed else "_" for c in s) + + if not result or result[0].isdigit(): + result = "_" + result + + return result diff --git a/semantikon/ontology.py b/semantikon/ontology.py index 2173cda6..91509ce9 100644 --- a/semantikon/ontology.py +++ b/semantikon/ontology.py @@ -18,6 +18,7 @@ meta_to_dict, parse_input_args, parse_output_args, + to_identifier, ) from semantikon.metadata import SemantikonURI from semantikon.qudt import UnitsDict @@ -1392,11 +1393,31 @@ def query_io_completer(graph: Graph) -> Completer: class SparqlWriter: + """ + A class for generating and executing SPARQL queries based on a graph structure. + """ + def __init__(self, graph: Graph): - self.graph = graph + """ + Initialize the SparqlWriter with a given RDFLib graph. + + Args: + graph (Graph): An RDFLib graph containing the ontology or data to query. + """ + self._graph = graph @cached_property def G(self) -> nx.DiGraph: + """ + Construct a directed graph (DiGraph) representation of the ontology. + + The graph is built by querying the RDFLib graph for subclass relationships + and OWL restrictions. Each edge in the graph represents a relationship + between a parent and child class, with the predicate stored as edge data. + + Returns: + nx.DiGraph: A directed graph representing the ontology structure. + """ query = """ SELECT ?parent ?property ?child WHERE { ?parent rdfs:subClassOf ?bnode . @@ -1405,51 +1426,105 @@ def G(self) -> nx.DiGraph: ?bnode owl:someValuesFrom ?child . }""" G = nx.DiGraph() - for subj, pred, obj in self.graph.query(query): + for subj, pred, obj in self._graph.query(query): G.add_edge(subj, obj, predicate=pred) return G - @staticmethod - def _hash(u: str | URIRef) -> str: - return sha256(u.encode()).hexdigest() + def get_query_graph(self, *args) -> nx.DiGraph: + """ + Generate a query graph based on the provided arguments. - def to_query(self, u: URIRef, v: URIRef, predicate: URIRef) -> str: - return f"?t_{self._hash(u)} <{predicate}> ?t_{self._hash(v)} .\n" + The query graph is a directed graph (DiGraph) where nodes represent + data elements and edges represent relationships between them. This graph + can be used to generate SPARQL query text. - def _to_leaf_condition(self, c: str, uri: URIRef) -> str: - code = self._hash(uri) - return f"?t_{code} rdf:value ?{c} .\n?t_{code} a <{uri}> .\n" + Args: + *args: A variable number of arguments representing nodes in the graph. + Each argument can be an RDFLib node or a value. - def query(self, *args) -> list[list[Any]]: + Returns: + nx.DiGraph: A directed graph representing the query structure. + """ + G = nx.DiGraph() data_nodes = [] - variables = [] - leaf_conditions = [] for ii, arg in enumerate(args): if isinstance(arg, _Node): arg = arg.value() data_nodes.append(list(self.G.successors(arg))[0]) - var = f"var_{ii}" - variables.append(f"?{var}") - leaf_conditions.append(self._to_leaf_condition(var, data_nodes[-1])) - query_text = "" + G.add_node(BNode(data_nodes[-1] + "_value"), output=True) + G.add_edge(BNode(data_nodes[-1]), data_nodes[-1], predicate="a") + G.add_edge( + BNode(data_nodes[-1]), + BNode(data_nodes[-1] + "_value"), + predicate="rdf:value", + ) if len(data_nodes) > 1: for u, v in zip(data_nodes[:-1], data_nodes[1:]): paths = nx.shortest_path(self.G.to_undirected(), u, v) for uu, vv in zip(paths[:-1], paths[1:]): if self.G.has_edge(uu, vv): - query_text += self.to_query( - uu, vv, self.G.edges[uu, vv]["predicate"] + G.add_edge( + BNode(uu), + BNode(vv), + predicate=self.G.edges[uu, vv]["predicate"], ) else: - query_text += self.to_query( - vv, uu, self.G.edges[vv, uu]["predicate"] + G.add_edge( + BNode(vv), + BNode(uu), + predicate=self.G.edges[vv, uu]["predicate"], ) - total_query = ( - "SELECT " - + " ".join(variables) - + " WHERE {\n" - + query_text - + "".join(leaf_conditions) - + "}" - ) - return [[a.toPython() for a in item] for item in self.graph.query(total_query)] + return G + + @staticmethod + def get_query_text(G: nx.DiGraph) -> str: + """ + Convert a query graph into SPARQL query text. + + This method takes a directed graph (DiGraph) representing a query structure + and generates the corresponding SPARQL query text. + + Args: + G (nx.DiGraph): A directed graph representing the query structure. + + Returns: + str: The SPARQL query text. + """ + output_args = [ + f"?{to_identifier(node)}" + for node, data in G.nodes.data() + if data.get("output", False) + ] + lines = ["SELECT " + " ".join(output_args) + " WHERE {"] + for subj, obj, data in G.edges.data(): + subj, obj = [ + f"<{e}>" if isinstance(e, URIRef) else f"?{to_identifier(e)}" + for e in [subj, obj] + ] + pred = ( + f"<{data['predicate']}>" + if isinstance(data["predicate"], URIRef) + else data["predicate"] + ) + lines.append(f"{subj} {pred} {obj} .") + lines.append("}") + return "\n".join(lines) + + def query(self, *args) -> list[list[Any]]: + """ + Execute a SPARQL query based on the provided arguments. + + This method generates a query graph, converts it into SPARQL query text, + and executes the query on the RDFLib graph. + + Args: + *args: A variable number of arguments representing nodes in the graph. + Each argument can be an RDFLib node or a value. + + Returns: + list[list[Any]]: A list of query results, where each result is a list + of values corresponding to the query's output variables. + """ + G = self.get_query_graph(*args) + text = self.get_query_text(G) + return [[a.toPython() for a in item] for item in self._graph.query(text)] diff --git a/tests/unit/test_ontology.py b/tests/unit/test_ontology.py index c838bcae..edc988e4 100644 --- a/tests/unit/test_ontology.py +++ b/tests/unit/test_ontology.py @@ -809,7 +809,7 @@ def test_function_to_knowledge_graph(self): + """ SELECT ?label WHERE { ?function iao:0000136 ex:get_kinetic_energy . - ?function rdfs:label ?label + ?function rdfs:label ?label . }""" ) self.assertEqual(list(g.query(query))[0][0].toPython(), "get_kinetic_energy") @@ -818,7 +818,7 @@ def test_function_to_knowledge_graph(self): + """ SELECT ?label WHERE { ?function sns:has_parameter_specification ?bnode . - ?bnode rdfs:label ?label + ?bnode rdfs:label ?label . }""" ) g = onto.function_to_knowledge_graph(prepare_pizza) diff --git a/tests/unit/test_parsers.py b/tests/unit/test_parsers.py index fbb2eb5e..56807531 100644 --- a/tests/unit/test_parsers.py +++ b/tests/unit/test_parsers.py @@ -11,6 +11,7 @@ parse_input_args, parse_metadata, parse_output_args, + to_identifier, with_explicit_defaults, ) from semantikon.metadata import meta, u @@ -273,6 +274,12 @@ def g(a, x=2, y=1): def dead_function(a, x=2, y=1): return a + x + y + def test_to_identifier(self): + self.assertEqual(to_identifier("valid_name"), "valid_name") + self.assertEqual(to_identifier("123invalid"), "_123invalid") + self.assertEqual(to_identifier("invalid-name!"), "invalid_name_") + self.assertEqual(to_identifier("name with spaces"), "name_with_spaces") + if __name__ == "__main__": unittest.main()