diff --git a/.pylintrc b/.pylintrc index 2c2ea8d4..e05f56c1 100644 --- a/.pylintrc +++ b/.pylintrc @@ -274,7 +274,7 @@ exclude-too-few-public-methods= ignored-parents= # Maximum number of arguments for function / method. -max-args=5 +max-args=6 # Maximum number of attributes for a class (see R0902). max-attributes=15 diff --git a/doc/source/basics/connections.rst b/doc/source/basics/connections.rst index 16b222b1..a3bfbb52 100644 --- a/doc/source/basics/connections.rst +++ b/doc/source/basics/connections.rst @@ -83,34 +83,11 @@ It is also possible that a connection requires multiple other connections (**AND :class:`.DnsConnection` requires a :class:`UdpConnection` **AND** a :class:`TcpConnection`, because DNS uses UDP per default, but it uses TCP for requests that sends data that is to much for UDP. -So we can define an AND connection simply by using tuples: +So we can define an AND connection simply with: .. code-block:: - conn = DnsConnection.based_on((UdpConnection, TcpConnection)) - -Limits of connection-relations ------------------------------- - -You can define **AND**/**OR** definitions in almost any possible variation, but there is one limit. -It is not allowed to define **AND** connections inside other **AND** connections, so for example: - -.. code-block:: python - - # ALLOWED - conn = SpecialConnection.based_on((AConnection, BConnection), CConnection) - # NOT ALLOWED - conn = SpecialConnection.based_on((AConnection, BConnection, (CConnection, DConnection))) - -The last definition is not allowed because we use an inner **AND** connection there. We can write the same logic more -easier by refactoring the both **AND** relations: - -.. code-block:: python - - # same like the NOT ALLOWED tree from above - BUT NOW IT IS ALLOWED - conn = SpecialConnection.based_on((AConnection, BConnection, CConnection, DConnection))) - -This limitation makes it easier to read the logic. + conn = DnsConnection.based_on(UdpConnection & TcpConnection) Using the base connection object ================================ @@ -139,7 +116,7 @@ connection: .. code-block:: python - conn = Connection.based_on(AConnection, BConnection) + conn = Connection.based_on(AConnection | BConnection) **A container connection always has based-on elements**. diff --git a/doc/source/basics/features.rst b/doc/source/basics/features.rst index 675bf153..9bfdc2e9 100644 --- a/doc/source/basics/features.rst +++ b/doc/source/basics/features.rst @@ -325,7 +325,7 @@ Basically our scenario level implementation looks like: from balder.connections import TcpConnection from .connections import SerialConnection - @balder.for_vdevice(with_vdevice='OtherVDevice', [TcpConnection, SerialConnection]) + @balder.for_vdevice('OtherVDevice', with_connections=TcpConnection | SerialConnection) class SendMessengerFeature(balder.Feature): class OtherVDevice(balder.VDevice): @@ -394,12 +394,12 @@ use the **Method-Based-Binding** decorator: class SetupSendMessengerFeature(MessengerFeature): - @balder.for_vdevice(MessengerFeature.OtherVDevice, with_connection=SerialConnection) + @balder.for_vdevice(MessengerFeature.OtherVDevice, with_connections=SerialConnection) def send(msg) -> None: serial = MySerial(com=..) ... - @balder.for_vdevice(MessengerFeature.OtherVDevice, with_connection=TcpConnection) + @balder.for_vdevice(MessengerFeature.OtherVDevice, with_connections=TcpConnection) def send(msg) -> None: sock = socket.socket(...) ... @@ -419,7 +419,7 @@ So take a look at the following :class:`Setup`, that matches our :class:`Scenari class MySetup(balder.Setup): - @balder.connect(SlaveDevice, over_connection=balder.Connection.based_on(SerialConnection, TcpConnection)) + @balder.connect(SlaveDevice, over_connection=SerialConnection | TcpConnection) class MainDevice(balder.Device): msg = SetupSendMessengerFeature() @@ -429,7 +429,7 @@ So take a look at the following :class:`Setup`, that matches our :class:`Scenari This example connects the two relevant devices over a :class:`TcpConnection` with each other, because the scenario defines, that the devices should be connected over an TcpConnection. If the test now uses on of our methods ``MyMessengerFeature.send(..)``, the variation with the decorator -``@balder.for_vdevice(..., over_connection=[TcpConnection])`` will be used. +``@balder.for_vdevice(..., over_connection=TcpConnection)`` will be used. If one would exchange the connection with the ``SerialConnection``, Balder would select the method variation with the decorator ``@balder.for_vdevice(MessengerFeature.OtherVDevice, with_connection=SerialConnection)``. diff --git a/doc/source/basics/scenarios.rst b/doc/source/basics/scenarios.rst index b58c47d8..7be27b11 100644 --- a/doc/source/basics/scenarios.rst +++ b/doc/source/basics/scenarios.rst @@ -79,9 +79,9 @@ You can also define some by your own. In addition to define single connections, you can also select a part of the global connection tree or combine some connections with an OR or an AND relationship. So for example you could connect our devices and allow an Ethernet as well as a Serial connection, by defining -``@balder.connect(SendDevice, over_connection=Connection.based_on(MyEthernet, MySerial))``. Of course you could also -define, that you need both, the Serial and the Ethernet connection. This can be done by using tuples: -``@balder.connect(SendDevice, over_connection=Connection.based_on((MyEthernet, MySerial)))`` +``@balder.connect(SendDevice, over_connection=MyEthernet | MySerial)``. Of course you could also +define, that you need both, the Serial and the Ethernet connection. This can be done with: +``@balder.connect(SendDevice, over_connection=MyEthernet & MySerial`` In our example we only define that we want a universal :class:`Connection` between our devices ``SendDevice`` and ``RecvDevice``. With this the connection type doesn't matter and every connection works here. diff --git a/doc/source/basics/vdevices.rst b/doc/source/basics/vdevices.rst index 7e9a80dc..7a845ded 100644 --- a/doc/source/basics/vdevices.rst +++ b/doc/source/basics/vdevices.rst @@ -316,7 +316,7 @@ Let's go back to an easy scenario which only has one single vDevice: class Receiver(balder.Device): recv = RecvFeature() - @balder.connect(with_device=Receiver, over_connection=balder.Connection.based_on(SmsConnection, EMailConnection)) + @balder.connect(with_device=Receiver, over_connection=SmsConnection | EMailConnection) class Sender(balder.Device): send = SendFeature(receiver=ScenarioSendMessage.Receiver) @@ -405,11 +405,11 @@ all of that with our two features ``SendFeature`` and ``RecvFeature``: send_to_recv1 = SendFeature(receiver='Receiver1') send_to_recv2 = SendFeature(receiver='Receiver2') - @balder.connect(with_device=Sender, over_connection=balder.Connection.based_on(SmsConnection, EMailConnection)) + @balder.connect(with_device=Sender, over_connection=SmsConnection | EMailConnection) class Receiver1(balder.Device): recv = RecvFeature() - @balder.connect(with_device=Sender, over_connection=balder.Connection.based_on(SmsConnection, EMailConnection)) + @balder.connect(with_device=Sender, over_connection=SmsConnection | EMailConnection) class Receiver2(balder.Device): recv = RecvFeature() diff --git a/src/_balder/cnnrelations/and_connection_relation.py b/src/_balder/cnnrelations/and_connection_relation.py index fc58f1e8..410ed056 100644 --- a/src/_balder/cnnrelations/and_connection_relation.py +++ b/src/_balder/cnnrelations/and_connection_relation.py @@ -3,10 +3,10 @@ import itertools -from .base_connection_relation import BaseConnectionRelation +from .base_connection_relation import BaseConnectionRelation, BaseConnectionRelationT if TYPE_CHECKING: - from ..connection import Connection + from ..connection import List, Union, Connection from .or_connection_relation import OrConnectionRelation @@ -62,3 +62,88 @@ def get_simplified_relation(self) -> OrConnectionRelation: if not self_simplified_or_relations: all_new_ands.append(and_template) return OrConnectionRelation(*all_new_ands) + + def is_single(self) -> bool: + + if len(self._connections) == 0: + return True + + return min(cnn.is_single() for cnn in self._connections) + + def get_singles(self) -> List[Connection]: + from ..connection import Connection # pylint: disable=import-outside-toplevel + + singles_and_relations = () + for cur_elem in self._connections: + # get all singles of this AND relation element + singles_and_relations += (cur_elem.get_singles(),) + # now get the variations and add them to our results + return [ + Connection.based_on(AndConnectionRelation(*cur_tuple)) + for cur_tuple in itertools.product(*singles_and_relations) + ] + + def cut_into_all_possible_subtree_branches(self) -> List[AndConnectionRelation]: + if not self.is_single(): + raise ValueError('can not execute method, because relation is not single') + + tuple_with_all_possibilities = ( + tuple(cur_tuple_item.cut_into_all_possible_subtree_branches() for cur_tuple_item in self._connections)) + + cloned_tuple_list = [] + for cur_tuple in list(itertools.product(*tuple_with_all_possibilities)): + cloned_tuple = AndConnectionRelation(*[cur_tuple_item.clone() for cur_tuple_item in cur_tuple]) + cloned_tuple_list.append(cloned_tuple) + return cloned_tuple_list + + def contained_in(self, other_conn: Union[Connection, BaseConnectionRelationT], ignore_metadata=False) -> bool: + # This method checks if the AND relation is contained in the `other_conn`. To ensure that an AND relation is + # contained in a connection tree, there has to be another AND relation into the `other_conn`, that has the same + # length or is bigger. In addition, there has to exist an order combination where every element of the this AND + # relation is contained in the found AND relation of the `other_cnn`. In this case it doesn't matter where the + # AND relation is in `other_elem` (will be converted to single, and AND relation will be searched in all + # BASED_ON elements). If the AND relation of `other_conn` has fewer items than this AND relation, it will be + # ignored. The method only search for a valid existing item in the `other_conn` AND relation for every item of + # this AND relation. + from ..connection import Connection # pylint: disable=import-outside-toplevel + + if not self.is_resolved(): + raise ValueError('can not execute method, because connection relation is not resolved') + if not other_conn.is_resolved(): + raise ValueError('can not execute method, because other connection relation is not resolved') + + if isinstance(other_conn, BaseConnectionRelation): + other_conn = Connection.based_on(other_conn) + + self_singles = self.get_singles() + other_singles = other_conn.get_singles() + + for cur_self_single, cur_other_single in itertools.product(self_singles, other_singles): + # check if we can find an AND relation in the other object -> go the single connection upwards and + # search for a `AndConnectionRelation` + + # self is a container connection -> use raw inner AND list + cur_self_single_and_relation = cur_self_single.based_on_elements.connections[0] + + cur_sub_other_single = cur_other_single + while cur_sub_other_single is not None: + if isinstance(cur_sub_other_single, AndConnectionRelation): + # found an AND relation -> check if length does match + if len(cur_sub_other_single) < len(cur_self_single_and_relation): + # this complete element is not possible - skip this single! + break + # length is okay, no check if every single element is contained in one of this tuple + for cur_inner_self_elem in cur_self_single_and_relation.connections: + + if not cur_inner_self_elem.contained_in(cur_sub_other_single, + ignore_metadata=ignore_metadata): + # at least one element is not contained in other AND relation - this complete element + # is not possible - skip this single! + break + # all items are contained in the current other AND relation -> match + return True + # go further up, if this element is no AND relation + cur_sub_other_single = cur_sub_other_single.based_on_elements[0] \ + if cur_sub_other_single.based_on_elements else None + + return False diff --git a/src/_balder/cnnrelations/base_connection_relation.py b/src/_balder/cnnrelations/base_connection_relation.py index 187499ce..4fdcf3ce 100644 --- a/src/_balder/cnnrelations/base_connection_relation.py +++ b/src/_balder/cnnrelations/base_connection_relation.py @@ -1,10 +1,12 @@ from __future__ import annotations -from typing import List, Union, Type, TypeVar, TYPE_CHECKING +from typing import List, Union, Type, Dict, TypeVar, TYPE_CHECKING from abc import ABC, abstractmethod from ..utils import cnn_type_check_and_convert if TYPE_CHECKING: from ..connection import Connection + from ..connection import ConnectionMetadata + from ..device import Device from .and_connection_relation import AndConnectionRelation from .or_connection_relation import OrConnectionRelation @@ -31,6 +33,12 @@ def __getitem__(self, item): def __len__(self): return len(self._connections) + def __hash__(self): + all_hashes = 0 + for cur_elem in self.connections: + all_hashes += hash(cur_elem) + hash(self.__class__.__name__) + return all_hashes + def __and__( self, other: Union[Connection, Type[Connection], AndConnectionRelation, OrConnectionRelation] @@ -62,6 +70,20 @@ def connections(self) -> List[Union[Connection, BaseConnectionRelationT]]: """ return self._connections.copy() + @property + def metadata(self) -> ConnectionMetadata | None: + """ + returns the metadata of this connection relation + """ + if not self.connections: + return None + + # get all unique metadata objects + existing_metadata = list({elem.metadata for elem in self.connections if elem}) + if len(existing_metadata) > 1: + raise ValueError(f'different metadata detected: `{existing_metadata}`') + return existing_metadata[0] + @abstractmethod def get_simplified_relation(self) -> OrConnectionRelation: """ @@ -106,8 +128,143 @@ def extend(self, relation: BaseConnectionRelationT): for cur_elem in relation.connections: self.append(cur_elem) + def set_metadata_for_all_subitems(self, metadata: Union[Dict[str, Union[Device, str]], None]): + """ + This method sets the metadata for all existing Connection items in this element. + + :param metadata: the metadata that should be set (if the value is explicitly set to None, it removes the + metadata from every item) + """ + for cur_cnn in self._connections: + cur_cnn.set_metadata_for_all_subitems(metadata) + + def get_all_used_connection_types(self) -> List[Type[Connection]]: + """ + This method returns all available connection types, that are used within this connection relation. + """ + from ..connection import Connection # pylint: disable=import-outside-toplevel + + result = [] + for cur_inner_elem in self._connections: + if isinstance(cur_inner_elem, Connection): + result.append(cur_inner_elem.__class__) + elif isinstance(cur_inner_elem, BaseConnectionRelation): + result.extend(cur_inner_elem.get_all_used_connection_types()) + else: + raise TypeError(f'unexpected type for inner item `{cur_inner_elem.__class__.__name__}`') + return list(set(result)) + @abstractmethod def get_tree_str(self) -> str: """ returns the tree string for this part of the connection tree """ + + def is_resolved(self) -> bool: + """ + returns whether this connection relation is part of a single connection + """ + if len(self._connections) == 0: + return True + return min(cnn.is_resolved() for cnn in self._connections) + + def get_resolved(self) -> BaseConnectionRelationT: + """ + This method returns a resolved Connection Tree. This means that it convert the based_on references so that + every based on connection is a direct parent of the current element. It secures that there are no undefined + connection layers between an object and the given parent. + """ + return self.__class__(*[cnn.get_resolved() for cnn in self._connections]) + + @abstractmethod + def is_single(self) -> bool: + """ + returns whether this connection relation is part of a single connection + """ + + @abstractmethod + def get_singles(self) -> List[Connection]: + """ + returns the single connections of all components of this connection relation + """ + + def cnn_are_in_other(self, other: BaseConnectionRelationT, ignore_metadata: bool=False) -> bool: + """ + This method validates that the elements from this relation are contained in the other relation. Elements matches + with each other, as soon as they are equal. + + .. note:: + This method only checks that every single connections from the first element is contained in the second too. + It does not check the other direction. If you want to validate this, you need to call this method with both + possibilities. + + :param other: the first list of connections + :param other: the other relation object + :param ignore_metadata: True, if the metadata of the single connections should be ignored + :return: True in case that every connection of the first list is equal with one in the second list, otherwise + False + """ + for cur_cnn in self._connections: + found_equal = False + for cur_other_cnn in other.connections: + if cur_cnn.equal_with(cur_other_cnn, ignore_metadata=ignore_metadata): + found_equal = True + break + if not found_equal: + return False + return True + + @abstractmethod + def cut_into_all_possible_subtree_branches(self): + """ + This method returns a list of all possible connection tree branches. A branch is a single connection, while + this method returns a list of all possible singles where every single connection has this connection as head. + """ + + def equal_with(self, other_relation: BaseConnectionRelationT, ignore_metadata=False) -> bool: + """ + This method returns True if the current relation matches with the other relation object. It always converts the + elements to a resolved version and checks if both of them are exactly the same. + + .. note:: + Note that both connection relations need to be resolved. + + .. note:: + Note that both Connection objects have to have the same ending parent elements. Only the order of the inner + connection elements are irrelevant. + + :param other_relation: the other connection relation (needs to be the same type) + + :param ignore_metadata: if this value is true the method ignores the metadata + + :return: returns True if both elements are same + """ + if self.__class__ != other_relation.__class__: + return False + + if not self.is_resolved(): + raise ValueError('can not execute method, because connection relation is not resolved') + if not other_relation.is_resolved(): + raise ValueError('can not execute method, because other connection relation is not resolved') + + # check inner connection elements (if they match all in both directions) + return (self.cnn_are_in_other(other_relation, ignore_metadata=ignore_metadata) + and other_relation.cnn_are_in_other(self, ignore_metadata=ignore_metadata)) + + @abstractmethod + def contained_in(self, other_conn: Union[Connection, BaseConnectionRelationT], ignore_metadata=False) -> bool: + """ + This method helps to find out whether this connection relation fits within a given connection tree. A connection + object is a certain part of the large connection tree that Balder has at its disposal. This method checks + whether a possibility of this connection tree fits in one possibility of the given connection tree. + + .. note:: + The method returns true if one single connection of this object fits in another single connection that is + given by `other_conn`. + + :param other_conn: the other connection + + :param ignore_metadata: if this value is true the method ignores the metadata + + :return: true if the self object is contained in the `other_conn`, otherwise false + """ diff --git a/src/_balder/cnnrelations/or_connection_relation.py b/src/_balder/cnnrelations/or_connection_relation.py index f34c9cca..d7303240 100644 --- a/src/_balder/cnnrelations/or_connection_relation.py +++ b/src/_balder/cnnrelations/or_connection_relation.py @@ -1,7 +1,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import List, Union, TYPE_CHECKING -from .base_connection_relation import BaseConnectionRelation +from .base_connection_relation import BaseConnectionRelation, BaseConnectionRelationT if TYPE_CHECKING: from ..connection import Connection @@ -31,3 +31,35 @@ def get_simplified_relation(self) -> OrConnectionRelation: else: raise TypeError(f'detect unexpected element type `{cur_inner_elem.__class__}` in inner elements') return result + + def is_single(self) -> bool: + if len(self.connections) == 0: + return True + if len(self.connections) == 1: + return self.connections[0].is_single() + return False + + def get_singles(self) -> List[Connection]: + result = [] + for elem in self.connections: + result.extend(elem.get_singles()) + return result + + def cut_into_all_possible_subtree_branches(self) -> List[OrConnectionRelation]: + if not self.is_single(): + raise ValueError('can not execute method, because relation is not single') + result = [OrConnectionRelation()] + if len(self.connections) == 0: + return result + result.extend(self.connections[0].cut_into_all_possible_subtree_branches()) + return result + + def contained_in(self, other_conn: Union[Connection, BaseConnectionRelationT], ignore_metadata=False) -> bool: + if not self.is_resolved(): + raise ValueError('can not execute method, because connection relation is not resolved') + if not other_conn.is_resolved(): + raise ValueError('can not execute method, because other connection relation is not resolved') + for cur_inner_cnn in self._connections: + if cur_inner_cnn.contained_in(other_conn): + return True + return False diff --git a/src/_balder/collector.py b/src/_balder/collector.py index 034b5350..dbf31d39 100644 --- a/src/_balder/collector.py +++ b/src/_balder/collector.py @@ -27,7 +27,6 @@ if TYPE_CHECKING: from _balder.plugin_manager import PluginManager - ConnectionType = Union[Type[Connection], Connection, Tuple[Union[Type[Connection], Connection]]] logger = logging.getLogger(__file__) @@ -45,7 +44,7 @@ class Collector: # with the method `rework_method_variation_decorators()` _possible_method_variations: Dict[ Callable, - List[Tuple[Union[Type[VDevice], str], Union[ConnectionType, List[ConnectionType]]]] + List[Tuple[Union[Type[VDevice], str], Connection]] ] = {} # this static attribute will be managed by the decorator `@parametrize(..)`. It holds all functions/methods that @@ -85,7 +84,7 @@ def register_raw_fixture(fixture: Callable, level: str): def register_possible_method_variation( meth: Callable, vdevice: Union[Type[VDevice], str], - with_connections: Union[ConnectionType, List[ConnectionType]]): + with_connections: Connection): """ allows to register a new method variation - used by decorator `@balder.for_vdevice()` @@ -509,11 +508,6 @@ def rework_method_variation_decorators(): if name not in owner_for_vdevice.keys(): owner_for_vdevice[name] = {} - cur_decorator_cleaned_cnns = [] - for cur_cnn in cur_decorator_with_connections: - cur_cnn = cur_cnn() if isinstance(cur_cnn, type) and issubclass(cur_cnn, Connection) else cur_cnn - cur_decorator_cleaned_cnns.append(cur_cnn) - if cur_fn in owner_for_vdevice[name].keys(): old_dict = owner_for_vdevice[name][cur_fn] if cur_decorator_vdevice in old_dict.keys(): @@ -521,10 +515,10 @@ def rework_method_variation_decorators(): f'`{cur_decorator_vdevice}` at method `{name}` of class ' f'`{owner.__name__}` ') - old_dict[cur_decorator_vdevice] = cur_decorator_cleaned_cnns + old_dict[cur_decorator_vdevice] = cur_decorator_with_connections owner_for_vdevice[name][cur_fn] = old_dict else: - new_dict = {cur_decorator_vdevice: cur_decorator_cleaned_cnns} + new_dict = {cur_decorator_vdevice: cur_decorator_with_connections} owner_for_vdevice[name][cur_fn] = new_dict def owner_wrapper(the_owner_of_this_method, the_name, wrap_fn): diff --git a/src/_balder/connection.py b/src/_balder/connection.py index eb34ed6a..4c751f36 100644 --- a/src/_balder/connection.py +++ b/src/_balder/connection.py @@ -3,6 +3,8 @@ import copy import itertools + +from _balder.connection_metadata import ConnectionMetadata from _balder.device import Device from _balder.exceptions import IllegalConnectionTypeError from _balder.cnnrelations import AndConnectionRelation, OrConnectionRelation @@ -37,7 +39,7 @@ def __init__(self, from_device: Union[Type[Device], None] = None, to_device: Uni .. note:: With a direct instance of a :class:`Connection` object you can create an own Connection-Tree. You can use this container object, if you need a container for a list of :class:`Connection`-Trees that are combined - with an AND (tuple) or/and an OR (list). + with an AND (``&``) or/and an OR (``|``). :param from_device: the device this connection starts from (default: None) @@ -47,47 +49,19 @@ def __init__(self, from_device: Union[Type[Device], None] = None, to_device: Uni :param to_device_node_name: the node name of the device the connection ends """ + # note: currently every connection is bidirectional (we want to add support for this later) # contains all metadata of this connection object - self._metadata = { - "from_device": None, "from_device_node_name": None, "to_device": None, "to_device_node_name": None - } + self._metadata = ConnectionMetadata( + from_device=from_device, + to_device=to_device, + from_device_node_name=from_device_node_name, + to_device_node_name=to_device_node_name, + bidirectional=True + ) - self._bidirectional = None # contains all sub connection objects, this connection tree is based on - self._based_on_connections = [] - - if from_device is not None and not issubclass(from_device, Device): - raise TypeError(f"detect illegal argument element {str(from_device)} for given attribute " - f"`from_device` - should be a subclasses of `balder.Device`") - self._metadata["from_device"] = from_device - - if from_device_node_name is not None and not isinstance(from_device_node_name, str): - raise TypeError(f"detect illegal argument type {type(from_device_node_name)} for given attribute " - f"`from_device_node_name` - should be a string value") - self._metadata["from_device_node_name"] = from_device_node_name - - if to_device is not None and not issubclass(to_device, Device): - raise TypeError(f"detect illegal argument element {str(to_device)} for given attribute " - f"`to_device` - should be a subclasses of `balder.Device`") - self._metadata["to_device"] = to_device - - if to_device_node_name is not None and not isinstance(to_device_node_name, str): - raise TypeError(f"detect illegal argument type {type(to_device_node_name)} for given attribute " - f"`to_device_node_name` - should be a string value") - self._metadata["to_device_node_name"] = to_device_node_name - - # describes if the connection is uni or bidirectional - # note: currently every connection is bidirectional (we want to add support for this later) - self._bidirectional = True - - if not ((from_device is None and to_device is None and from_device_node_name is None - and to_device_node_name is None) or ( - from_device is not None and to_device is not None and from_device_node_name is not None and - to_device_node_name is not None)): - raise ValueError( - "you have to provide all or none of the following items: `from_device`, `from_device_node_name`, " - "`to_device` or `to_device_node_name`") + self._based_on_connections = OrConnectionRelation() def __and__(self, other: Union[Connection, AndConnectionRelation, OrConnectionRelation]) -> AndConnectionRelation: new_list = AndConnectionRelation(self) @@ -112,202 +86,12 @@ def __eq__(self, other): return False def __hash__(self): - all_hashes = hash(self.from_device) + hash(self.to_device) + hash(self.from_node_name) + \ - hash(self.to_node_name) + hash(str(self)) - for cur_child in self.based_on_elements: - all_hashes += hash(cur_child) + all_hashes = hash(self.metadata) + hash(self.based_on_elements) return hash(all_hashes) - def clone_without_based_on_elements(self) -> Connection: - """ - This method returns a copied version of this element, while all `_based_on_connections` are removed (the copied - element has an empty list here). - - :return: a python copied object of this item - """ - self_copy = copy.copy(self) - self_copy._based_on_connections = [] # pylint: disable=protected-access - return self_copy - - def clone(self) -> Connection: - """ - This method returns an exact clone of this connection. For this clone every inner connection object will be - newly instantiated, but all internal references (like the devices and so on) will not be copied (objects are the - same for this object and the clone). The method will make a normal copy for every connection object in the - `_based_on_elements` list. - """ - self_copy = self.clone_without_based_on_elements() - - for cur_based_on in self._based_on_connections: - if isinstance(cur_based_on, tuple): - cloned_tuple = tuple([cur_tuple_element.clone() for cur_tuple_element in cur_based_on]) - self_copy.append_to_based_on(cloned_tuple) - elif isinstance(cur_based_on, Connection): - cloned_cur_based_on = cur_based_on.clone() - self_copy.append_to_based_on(cloned_cur_based_on) - else: - raise TypeError('based on element is not from valid type') - return self_copy - - @staticmethod - def __cut_conn_from_only_parent_to_child(elem: Connection) -> List[Connection]: - """ - This helper method returns all possible pieces while the base element will remain intact - so every returned - element is from the same type as `elem` (only the related `based_on_elements` are changed). - - .. note:: - The given element itself will also be added here! - - .. note:: - The given element has to be single! - """ - all_pieces = [elem.clone()] - if len(elem.based_on_elements) == 0: - return all_pieces - # if the next element is a tuple -> call procedure for tuples and add a copy of this object as child - if isinstance(elem.based_on_elements[0], tuple): - # return all possibilities of the tuple while adding the current object as child of the tuple - for cur_tuple in Connection.__cut_tuple_from_only_parent_to_child(elem.based_on_elements[0]): - # for this we do not have to use `clone`, because we copy the base object with `copy.copy` and - # completely replace the `_based_on_connections` by our own - copied_conn = elem.clone_without_based_on_elements() - copied_conn.append_to_based_on(cur_tuple) - all_pieces.append(copied_conn) - return all_pieces - # if this element is the last element with a parent -> copy it and remove the parent, return it - if len(elem.based_on_elements[0].based_on_elements) == 0: - new_elem = elem.clone_without_based_on_elements() - all_pieces.append(new_elem) - return all_pieces - # otherwise, the current item has grandparents, so call the method recursively for parents and add a copy of - # this object as child - all_possible_parents = Connection.__cut_conn_from_only_parent_to_child(elem.based_on_elements[0]) - for cur_parent in all_possible_parents: - copied_conn = elem.clone_without_based_on_elements() - copied_conn.append_to_based_on(cur_parent) - all_pieces.append(copied_conn) - return all_pieces - - @staticmethod - def __cut_tuple_from_only_parent_to_child(elem: Tuple[Connection]) -> List[Tuple[Connection]]: - """ - This helper method returns all possible pieces while the base elements of the tuple will remain intact - so - every returned tuple element is from same type like the related elements in the tuple `elem` (only the related - `based_on_elements` are changed). - - .. note:: - The given element has to be single! - - .. note:: - Note that this method also returns every possible ordering - """ - - tuple_with_all_possibilities = ( - tuple([Connection.__cut_conn_from_only_parent_to_child(cur_tuple_item) for cur_tuple_item in elem])) - - cloned_tuple_list = [] - for cur_tuple in list(itertools.product(*tuple_with_all_possibilities)): - cloned_tuple = tuple([cur_tuple_item.clone() for cur_tuple_item in cur_tuple]) - cloned_tuple_list.append(cloned_tuple) - return cloned_tuple_list - - @staticmethod - def check_if_tuple_contained_in_connection(tuple_elem: Tuple[Connection], other_elem: Connection) -> bool: - """ - This method checks if the tuple given by `tuple_elem` is contained in the `other_elem`. To ensure that a tuple - element is contained in a connection tree, there has to be another tuple into the `other_elem`, that has the - same length or is bigger. In addition, there has to exist an order combination where every element of the - `tuple_elem` is contained in the found tuple in `other_elem`. In this case it doesn't matter where the tuple is - in `other_elem` (will be converted to single, and tuple will be searched in all BASED_ON elements). If the tuple - element of `other_elem` has fewer items than our `tuple_elem`, it will be ignored. The method only search for - a valid existing item in the `other_elem` tuple for every item of the `tuple_elem`. - - :param tuple_elem: the tuple element that should be contained in the `other_elem` - - :param other_elem: the connection object, the given tuple should be contained in - """ - def tuple_is_contained_in_other(inner_tuple, contained_in_tuple): - # check if every tuple elem fits in one of `contained_in_tuple` (allow to use a position in - # `contained_in_tuple` multiple times) - for cur_tuple_element in inner_tuple: - found_match_for_this_elem = False - for cur_contained_in_elem in contained_in_tuple: - if cur_tuple_element.contained_in(cur_contained_in_elem, ignore_metadata=True): - found_match_for_this_elem = True - break - if not found_match_for_this_elem: - return False - return True - - other_singles = other_elem.get_singles() - tuple_singles = Connection.convert_tuple_to_singles(tuple_elem) - for cur_tuple_single in tuple_singles: - for cur_other_single in other_singles: - # check if we found a tuple element in the other object -> go the single connection upwards and search - # for a tuple - cur_sub_other_single: List[Union[Connection, Tuple[Connection]]] = [cur_other_single] - while len(cur_sub_other_single) > 0: - if isinstance(cur_sub_other_single[0], tuple): - # found a tuple -> check if length does match - if len(cur_sub_other_single[0]) >= len(cur_tuple_single) and tuple_is_contained_in_other( - cur_tuple_single, contained_in_tuple=cur_sub_other_single[0]): - return True - # otherwise, this complete element is not possible - skip this single! - break - cur_sub_other_single = cur_sub_other_single[0].based_on_elements - return False - - @staticmethod - def convert_tuple_to_singles(tuple_elem: Tuple[Connection]) -> List[Union[Connection, Tuple[Connection]]]: - """ - This method converts the given `tuple_elem` to single items and return these. - - :param tuple_elem: the tuple element out of which the single items are being created - - :returns: a list of new connection objects that are single items - """ - singles_tuple = () - tuple_idx = 0 - for cur_tuple_elem in tuple_elem: - if not isinstance(cur_tuple_elem, Connection): - raise TypeError(f"the tuple element at index {tuple_idx} of element from `other_conn` is not " - "from type `Connection`") - # get all singles of this tuple element - singles_tuple += (cur_tuple_elem.get_singles(),) - tuple_idx += 1 - # now get the variations and add them to our results - return list(itertools.product(*singles_tuple)) - - @staticmethod - def cleanup_connection_list(full_list: List[Union[Connection, Tuple[Connection]]]) \ - -> List[Union[Connection, Tuple[Connection]]]: - """ - This method cleanup a connection list while removing items that are direct duplicates and by removing duplicates - that are fully contained_in other items. - - :param full_list: the full list of connections and tuples of connections that should be cleaned-up - - :returns: returns the cleaned up list - """ - result = full_list.copy() - next_loop = True - while not next_loop: - next_loop = False - for cur_elem in result: - all_other_elems = [cur_item for cur_item in result if cur_item != cur_elem] - if isinstance(cur_elem, Connection): - for cur_other_elem in all_other_elems: - # check if it contained in or the same - if cur_elem.contained_in(cur_other_elem, ignore_metadata=True): - # we can remove it from result list - result.remove(cur_elem) - next_loop = True - break - return result - # ---------------------------------- STATIC METHODS ---------------------------------------------------------------- - # ---------------------------------- CLASS METHODS ---------------------------------------------------------------- + # ---------------------------------- CLASS METHODS ----------------------------------------------------------------- @classmethod def get_parents(cls, tree_name: Union[str, None] = None) -> List[Type[Connection]]: @@ -383,347 +167,190 @@ def based_on( """ this_instance = cls() - # TODO temp solution for conversion -> will be replaced shortly - args = [] if isinstance(connection, type) and issubclass(connection, Connection): - args.append(connection()) - elif isinstance(connection, Connection): - args.append(connection) - elif isinstance(connection, OrConnectionRelation): - for cur_connection in connection.connections: - if isinstance(cur_connection, Connection): - args.append(cur_connection) - elif isinstance(cur_connection, AndConnectionRelation): - args.append(tuple(cur_connection.connections)) + connection = connection() + new_items = [] + if isinstance(connection, Connection): + new_items.append(connection) elif isinstance(connection, AndConnectionRelation): - args.append(tuple(connection.connections)) + new_items.append(connection) + elif isinstance(connection, OrConnectionRelation): + for cur_inner_elem in connection.connections: + new_items.append(cur_inner_elem) else: raise TypeError(f'can not use object from type {connection}') - new_items = [] - for cur_item in args: - if isinstance(cur_item, Connection): - if cur_item.__class__ == Connection: - # it is a container -> add elements - new_items += cur_item.based_on_elements - else: - new_items.append(cur_item) - elif isinstance(cur_item, type) and issubclass(cur_item, Connection): - new_items.append(cur_item()) - else: - new_items.append(cur_item) # do not create a container connection if no container is required here - if cls == Connection and len(new_items) == 1 and not isinstance(new_items[0], tuple): + if cls == Connection and len(new_items) == 1 and isinstance(new_items[0], Connection): return new_items[0] this_instance.append_to_based_on(*new_items) return this_instance @classmethod - def check_equal_connections_are_in( - cls, cnns_from: List[Connection], are_in_cnns_from: List[Connection], ignore_metadata: bool=False) -> bool: - """ - This method validates that the elements from the first list are contained (are equal) with one of the elements - in the second list. - - .. note:: - This method only checks that every single connections from the first element is contained in the second too. - It does not check the other direction. If you want to validate this, you need to call this method with both - possibilities. - - :param cnns_from: the first list of connections - :param are_in_cnns_from: the second list of connection - :param ignore_metadata: True, if the metadata of the single connections should be ignored - :return: True in case that every connection of the first list is equal with one in the second list, otherwise - False - """ - for cur_cnn in cnns_from: - found_equal = False - for cur_other_cnn in are_in_cnns_from: - if cur_cnn.equal_with(cur_other_cnn, ignore_metadata=ignore_metadata): - found_equal = True - break - if not found_equal: - return False - return True - - @classmethod - def check_equal_connection_tuples_are_in( - cls, tuples_from: List[Tuple[Connection]], are_in_tuples_from: List[Tuple[Connection]], - ignore_metadata: bool=False) -> bool: + def filter_connections_that_are_contained_in( + cls, + cnns_from: List[Union[Connection, AndConnectionRelation, OrConnectionRelation]], + are_contained_in: Connection, + ignore_metadata: bool = False + ) -> List[Connection]: """ - This method validates that the connection tuples from the first list are contained (are equal) within the tuple - elements of the second list. + This method filters the connection elements from the first list to include only those connections that are + contained within the provided connection ``are_contained_in``. - .. note:: - This method only checks that every single tuple from the first element is contained in the second list. - It does not check the other direction. If you want to validate this, you need to call this method with both - possibilities. - - :param tuples_from: the first list of connection-tuples - :param are_in_tuples_from: the second list of connection-tuples - :param ignore_metadata: True, if the metadata of the single connections should be ignored - :return: True in case that every tuple connections of the first list is equal with one tuple in the second, - otherwise False + :param cnns_from: a list of connections + :param are_contained_in: the connection, the connection elements should be contained in + :param ignore_metadata: True, if the metadata should be ignored + :return: a list with the filtered connections """ - for cur_search_tuple in tuples_from: - found_match_for_cur_search_tuple = False - # go through each unmatched other tuple - for cur_other_tuple in are_in_tuples_from: - cur_search_tuple_is_completely_in_other_tuple = True - for cur_search_tuple_elem in cur_search_tuple: - fount_it = False - for cur_other_tuple_elem in cur_other_tuple: - if cur_search_tuple_elem.equal_with(cur_other_tuple_elem, ignore_metadata=ignore_metadata): - fount_it = True - break - if not fount_it: - cur_search_tuple_is_completely_in_other_tuple = False - break - if cur_search_tuple_is_completely_in_other_tuple: - # here we have no match - found_match_for_cur_search_tuple = True - break - if not found_match_for_cur_search_tuple: - return False - return True + return [ + cnn for cnn in cnns_from + if cnn.contained_in(other_conn=are_contained_in, ignore_metadata=ignore_metadata) + ] # ---------------------------------- PROPERTIES -------------------------------------------------------------------- @property - def metadata(self) -> dict: + def metadata(self) -> ConnectionMetadata: """returns the connection metadata dictionary""" return self._metadata - @metadata.setter - def metadata(self, data): - empty_metadata = { - "from_device": None, "to_device": None, "from_device_node_name": None, "to_device_node_name": None} - - if not isinstance(data, dict): - raise ValueError("the given metadata value has to be a dictionary") - if data != {}: - if sorted(["from_device", "to_device", "from_device_node_name", "to_device_node_name"]) != \ - sorted(list(data.keys())): - raise ValueError("if you provide a metadata dictionary you have to provide all elements of it") - else: - data = empty_metadata.copy() - - # only allow to set the metadata dictionary if the old one has the same values or was empty before (no values) - if data != empty_metadata: - if self._metadata == empty_metadata: - # it is ok, because the dictionary was empty before - pass - elif self._metadata == data: - # it is ok too, because the new set data is the same as the data was before - pass - else: - raise ValueError("you can not set another metadata than the data set before - please reset it first") - - self._metadata = data - @property def from_device(self): """device from which the connection starts""" - return self._metadata["from_device"] + return self._metadata.from_device if self._metadata else None @property def to_device(self): """device at which the connection ends""" - return self._metadata["to_device"] + return self._metadata.to_device if self._metadata else None @property def from_node_name(self): """the name of the node in the `Device` from which the connection starts""" - return self._metadata["from_device_node_name"] + return self._metadata.from_node_name if self._metadata else None @property def to_node_name(self): """the name of the node in the `Device` at which the connection ends""" - return self._metadata["to_device_node_name"] + return self._metadata.to_node_name if self._metadata else None @property - def based_on_elements(self) -> List[Union[Connection, Tuple[Connection]]]: + def based_on_elements(self) -> OrConnectionRelation: """returns a copy of the internal based_on_connection""" - return self._based_on_connections.copy() + return self._based_on_connections.clone() # ---------------------------------- PROTECTED METHODS ------------------------------------------------------------- - def _metadata_contained_in(self, of_connection: Connection) -> bool: - """ - This method returns true if the metadata of the current connection is contained in the given one. - - The method returns true in the following situations: - * both connections are bidirectional and the from and to elements (device and node name) are the same - * both connections are unidirectional and have the same from and to elements - * both connections are bidirectional and the from is the to and the to is the from - * one connection is unidirectional and the other is bidirectional and the from and to elements are the same - * one connection is unidirectional and the other is bidirectional and the from is the to and the to is the from - - :return: true if the metadata of the current connection is contained in the metadata of the given one - """ - check_same = \ - self.from_device == of_connection.from_device and self.from_node_name == of_connection.from_node_name and \ - self.to_device == of_connection.to_device and self.to_node_name == of_connection.to_node_name - check_opposite = \ - self.from_device == of_connection.to_device and self.from_node_name == of_connection.to_node_name and \ - self.to_device == of_connection.from_device and self.to_node_name == of_connection.from_node_name - if self.is_bidirectional() and of_connection.is_bidirectional(): - return check_same or check_opposite - - if self.is_bidirectional() and not of_connection.is_bidirectional() or \ - not self.is_bidirectional() and of_connection.is_bidirectional(): - return check_same or check_opposite - - return check_same - - def _metadata_equal_with(self, of_connection: Connection) -> bool: - """ - This method returns true if the metadata of the current connection is equal with the metadata of the given - connection. - - The method returns true in the following situations: - * both connections are bidirectional and the from and to elements (device and node name) are the same - * both connections are unidirectional and have the same from and to elements - * both connections are bidirectional and the from is the to and the to is the from - - :return: true if the metadata of the current connection is contained in the metadata of the given one - """ - check_same = \ - self.from_device == of_connection.from_device and self.from_node_name == of_connection.from_node_name and \ - self.to_device == of_connection.to_device and self.to_node_name == of_connection.to_node_name - check_opposite = \ - self.from_device == of_connection.to_device and self.from_node_name == of_connection.to_node_name and \ - self.to_device == of_connection.from_device and self.to_node_name == of_connection.from_node_name - if self.is_bidirectional() and of_connection.is_bidirectional() or \ - not self.is_bidirectional() and not of_connection.is_bidirectional(): - return check_same or check_opposite - - return False + # ---------------------------------- METHODS ----------------------------------------------------------------------- - def get_intersection_with_other_single(self, other_conn: Union[Connection, Tuple[Connection]]) \ - -> List[Connection, Tuple[Connection]]: + def get_intersection_with_other_single( + self, + other_conn: Union[Connection, AndConnectionRelation, OrConnectionRelation] + ) -> List[Connection]: """ A helper method that returns an intersection between the two connections (self and the given one). - :param other_conn: the other **single** connection object (could be a tuple too, but note that this has to have - only **single** elements!) + :param other_conn: the other **single** connection object (could be a relation too, but note that them needs to + be **single** elements!) """ if not self.is_single(): raise ValueError( "the current connection object is not single -> method only possible for single connections") - if isinstance(other_conn, Connection): - if other_conn.__class__ == Connection: - raise ValueError("a container object from direct class `Connection` is not allowed here - please " - "provide a single connection or a single tuple") - if not other_conn.is_single(): - raise ValueError( - "the connection object given by `other_conn` is not single -> method only possible for single " - "connections") - elif isinstance(other_conn, tuple): - # has to be a tuple - cur_idx = 0 - for cur_tuple_element in other_conn: - if not cur_tuple_element.is_single(): - raise ValueError( - f"the connection object given by tuple element at index {cur_idx} in `other_conn` is not " - f"single -> method only possible for single connections") - cur_idx += 1 - other_conn = Connection.based_on(AndConnectionRelation(*other_conn)) - else: - raise TypeError("the object given by `other_conn` has to be from type `Connection` or has to be a tuple " - "of `Connection` objects") + if not other_conn.is_single(): + raise ValueError( + "the other connection object is not single -> method only possible for single connections") intersection = [] - def determine_for(pieces: List[Union[Connection, Tuple[Connection]]], in_other_cnn: Connection): - for cur_piece in pieces: - if isinstance(cur_piece, Connection): - if cur_piece.contained_in(in_other_cnn, ignore_metadata=True): - intersection.append(cur_piece) - else: - # isinstance of tuple - if Connection.check_if_tuple_contained_in_connection(cur_piece, in_other_cnn): - intersection.append(cur_piece) - #: check if some sub elements of self connection are contained in `other_conn` - self_pieces = self.cut_into_all_possible_subtrees() - determine_for(pieces=self_pieces, in_other_cnn=other_conn) + self_pieces = self.cut_into_all_possible_pieces() + intersection.extend(self.__class__.filter_connections_that_are_contained_in( + cnns_from=self_pieces, + are_contained_in=other_conn, + ignore_metadata=True + )) #: check if some sub elements of `other_conn` are contained in self connection - other_pieces = other_conn.cut_into_all_possible_subtrees() - determine_for(pieces=other_pieces, in_other_cnn=self) + other_pieces = other_conn.cut_into_all_possible_pieces() + intersection.extend(self.__class__.filter_connections_that_are_contained_in( + cnns_from=other_pieces, + are_contained_in=self, + ignore_metadata=True + )) #: filter all duplicated (and contained in each other) connections intersection_without_duplicates = [] for cur_conn in intersection: - checkable_cur_conn = Connection.based_on(AndConnectionRelation(*cur_conn)) \ - if isinstance(cur_conn, tuple) else cur_conn found_it = False for cur_existing_conn in intersection_without_duplicates: - checkable_cur_existing_conn = Connection.based_on(AndConnectionRelation(*cur_existing_conn)) \ - if isinstance(cur_existing_conn, tuple) else cur_existing_conn - if checkable_cur_conn.equal_with(checkable_cur_existing_conn, ignore_metadata=True): + if cur_conn.equal_with(cur_existing_conn, ignore_metadata=True): found_it = True break if not found_it: intersection_without_duplicates.append(cur_conn) + intersection_filtered = [] #: filter all *contained in each other* connections for cur_conn in intersection_without_duplicates: - usable_cur_conn = Connection.based_on(AndConnectionRelation(*cur_conn)) \ - if isinstance(cur_conn, tuple) else cur_conn is_contained_in_another = False for cur_validate_cnn in intersection_without_duplicates: - - cur_usable_validate_cnn = ( - Connection.based_on(AndConnectionRelation(*cur_validate_cnn))) \ - if isinstance(cur_validate_cnn, tuple) \ - else cur_validate_cnn - if cur_validate_cnn == cur_conn: # skip the same element continue - if usable_cur_conn.contained_in(cur_usable_validate_cnn, ignore_metadata=True): + if cur_conn.contained_in(cur_validate_cnn, ignore_metadata=True): is_contained_in_another = True break if not is_contained_in_another: + if isinstance(cur_conn, (AndConnectionRelation,OrConnectionRelation)): + cur_conn = Connection.based_on(cur_conn) intersection_filtered.append(cur_conn) # only return unique objects return intersection_filtered - # ---------------------------------- METHODS ----------------------------------------------------------------------- - - def set_devices(self, from_device: Type[Device], to_device: Type[Device]): + def clone_without_based_on_elements(self) -> Connection: """ - Method for setting the devices of the connection if this has not yet been done during instantiation - - .. note:: - Note that you can not change the devices after they were set (only possible internally) + This method returns a copied version of this element, while all `_based_on_connections` are removed (the copied + element has an empty list here). - :param from_device: device from which the connection starts + :return: a python copied object of this item + """ + self_copy = copy.copy(self) + self_copy._based_on_connections = OrConnectionRelation() # pylint: disable=protected-access + return self_copy - :param to_device: device at which the connection ends + def clone(self) -> Connection: """ - if self.from_device is not None or self.to_device is not None: - raise ValueError("devices already set") - self._metadata["from_device"] = from_device - self._metadata["to_device"] = to_device + This method returns an exact clone of this connection. For this clone every inner connection object will be + newly instantiated, but all internal references (like the devices and so on) will not be copied (objects are the + same for this object and the clone). The method will make a normal copy for every connection object in the + `_based_on_elements` list. + """ + self_copy = self.clone_without_based_on_elements() + self_copy.append_to_based_on(self._based_on_connections.clone()) # pylint: disable=protected-access + return self_copy - def update_node_names(self, from_device_node_name: str, to_device_node_name: str) -> None: + def cut_into_all_possible_subtree_branches(self) -> List[Connection]: + """ + This method returns a list of all possible connection tree branches. A branch is a single connection, while + this method returns a list of all possible singles where every single connection has this connection as head. """ - This method dates the names of the nodes from which the connection in the `from_device` originates and finally - arrives in the` to_device`. Please provide the node name of your ``from_device`` in ``from_device_node_name`` - and the node name of your ``to_device`` in ``to_device_node_name``. + all_pieces = [self.clone()] + if len(self.based_on_elements) == 0: + return all_pieces - :param from_device_node_name: specifies the node in the ``from_device`` + if not self.is_single(): + raise ValueError("the current connection object is not single -> method only possible for single " + "connections") - :param to_device_node_name: specifies the node in the ``to_device`` - """ - self._metadata["from_device_node_name"] = from_device_node_name - self._metadata["to_device_node_name"] = to_device_node_name + # return all possibilities of the relation while remain the current object as head + for sub_branch in self.based_on_elements.cut_into_all_possible_subtree_branches(): + copied_conn = self.clone_without_based_on_elements() + copied_conn.append_to_based_on(sub_branch) + all_pieces.append(copied_conn) + return all_pieces - def cut_into_all_possible_subtrees(self) -> List[Union[Connection, Tuple[Connection]]]: + def cut_into_all_possible_pieces(self) -> List[Connection]: """ This method cuts the resolved connection tree in all possible pieces by removing elements that change the existing tree - thereby the method returns a list with all possibilities (a copy of this object with all @@ -735,7 +362,7 @@ def cut_into_all_possible_subtrees(self) -> List[Union[Connection, Tuple[Connect """ if self.__class__ == Connection: # this is only a container, execute process for every item of this one - child_elems = self.based_on_elements + child_elems = self.based_on_elements.connections else: child_elems = [self] @@ -743,32 +370,31 @@ def cut_into_all_possible_subtrees(self) -> List[Union[Connection, Tuple[Connect for cur_item in child_elems: - if isinstance(cur_item, Connection): - if not cur_item.is_single(): - raise ValueError("one of the given element is not single -> method only works with single items") - elif isinstance(cur_item, tuple): - tuple_idx = 0 - for cur_tuple_elem in cur_item: - if not cur_tuple_elem.is_single(): - raise ValueError( - f"one of the given tuple element has a item at index {tuple_idx}, that is not single -> " - f"method only works with single items or tuple of single items") - tuple_idx += 1 - - cur_element = [cur_item] - while len(cur_element) > 0: - if isinstance(cur_element[0], Connection): - all_pieces += Connection.__cut_conn_from_only_parent_to_child(cur_element[0]) - elif isinstance(cur_element[0], tuple): - all_pieces += Connection.__cut_tuple_from_only_parent_to_child(cur_element[0]) + if not cur_item.is_single(): + raise ValueError("one of the given element is not single -> method only works with single items") + + cur_element = cur_item + while cur_element: + + if isinstance(cur_element, AndConnectionRelation): # now we can not go deeper, because we need this current AND connection + all_pieces += [Connection.based_on(and_relation) + for and_relation in cur_element.cut_into_all_possible_subtree_branches()] break - cur_element = cur_element[0].based_on_elements + + all_pieces += cur_element.cut_into_all_possible_subtree_branches() + + if len(cur_element.based_on_elements.connections) > 1: + # should never be fulfilled because we have a single connection + raise ValueError('unexpected size of inner connections') + + cur_element = cur_element.based_on_elements.connections[0] \ + if cur_element.based_on_elements.connections else None # filter all duplicates return list(set(all_pieces)) - def set_metadata_for_all_subitems(self, metadata: Union[Dict[str, Union[Device, str]], None]): + def set_metadata_for_all_subitems(self, metadata: Union[ConnectionMetadata, None]): """ This method sets the metadata for all existing Connection items in this element. @@ -776,19 +402,14 @@ def set_metadata_for_all_subitems(self, metadata: Union[Dict[str, Union[Device, metadata from every item) """ if metadata is None: - metadata = { - "from_device": None, "to_device": None, "from_device_node_name": None, "to_device_node_name": None} + metadata = ConnectionMetadata() + + if not isinstance(metadata, ConnectionMetadata): + raise TypeError('metadata must be an instance of `ConnectionMetadata`') for cur_base_elem in self._based_on_connections: - if isinstance(cur_base_elem, Connection): - cur_base_elem.set_metadata_for_all_subitems(metadata=metadata) - elif isinstance(cur_base_elem, tuple): - for cur_tuple_elem in cur_base_elem: - cur_tuple_elem.set_metadata_for_all_subitems(metadata=metadata) - else: - raise TypeError( - f"found illegal type {cur_base_elem.__class__} for based_on item at index {cur_base_elem}") - self.metadata = metadata + cur_base_elem.set_metadata_for_all_subitems(metadata=metadata) + self._metadata = metadata def get_tree_str(self) -> str: """ @@ -809,13 +430,13 @@ def get_tree_str(self) -> str: return f"{self.__class__.__name__}.based_on({'| '.join(based_on_strings)})" - def is_bidirectional(self): + def is_bidirectional(self) -> Union[bool, None]: """ Provides the information in which direction the connection is supported (if the property returns true, the connection must work in both directions, otherwise it is a unidirectional connection from the `from_device` to the` to_device` """ - return self._bidirectional + return self._metadata.bidirectional if self._metadata else None def is_universal(self): """ @@ -830,36 +451,27 @@ def is_resolved(self): Connection object of the tree - there are no undefined connection layers between an object and the given parent). """ - for cur_based_on in self.based_on_elements: - if isinstance(cur_based_on, tuple): - for cur_tuple_elem in cur_based_on: - # check the next parent class type only if the main self type is not a container - # (base `balder.Connection` object) - if self.__class__ != Connection: - if cur_tuple_elem not in self.__class__.get_parents(): - # only one element is not directly parent -> return false - return False - if not cur_tuple_elem.is_resolved(): - # only one subelement is not resolved -> return false - return False - else: - # no tuple, single element + if self.__class__ == Connection: + return self.based_on_elements.is_resolved() - # check the next parent class type only if the main self type is not a container - # (base `balder.Connection` object) - if self.__class__ != Connection: - if cur_based_on.__class__ not in self.__class__.get_parents(): - # only one element is not directly parent -> return false - return False - if not cur_based_on.is_resolved(): - # only one subelement is not resolved -> return false + self_parents = self.__class__.get_parents() + + for cur_based_on in self.based_on_elements.connections: + if isinstance(cur_based_on, Connection): + if cur_based_on.__class__ not in self_parents: return False + elif isinstance(cur_based_on, (AndConnectionRelation, OrConnectionRelation)): + for cur_type in cur_based_on.get_all_used_connection_types(): + if cur_type not in self_parents: + return False + if not cur_based_on.is_resolved(): + return False return True def is_single(self): """ - This method returns true if there exists no logical **OR** in the based on connection(s). One **AND** tuple is - allowed. + This method returns true if there exists no logical **OR** in the based on connection(s). One **AND** relation + is allowed. .. note:: Note that this method also returns False, if the connection is not completely resolved! @@ -875,14 +487,6 @@ def is_single(self): if not self.is_resolved(): return False - if isinstance(self.based_on_elements[0], tuple): - for cur_tuple_elem in self.based_on_elements[0]: - # if only one element of tuple is not single -> return False - if not cur_tuple_elem.is_single(): - return False - # all elements are single - return True - return self.based_on_elements[0].is_single() def get_resolved(self) -> Connection: @@ -893,48 +497,49 @@ def get_resolved(self) -> Connection: .. note:: This method returns the connection without a container :class:`Connection`, if the container - :class:`Connection` would only consist of one single connection (which is no tuple!). In this case the - method returns this child connection directly without any :class:`Connection` container otherwise the + :class:`Connection` would only consist of one single connection (which is no AND relation!). In that case + the method returns this child connection directly without any :class:`Connection` container otherwise the :class:`Connection` container class with all resolved child classes will be returned. """ if self.is_resolved(): - copied_base = self.clone() + copied_base = self.clone_without_based_on_elements() + copied_base.append_to_based_on(self.based_on_elements.get_simplified_relation()) else: copied_base = self.clone_without_based_on_elements() if self.__class__ == Connection: # the base object is a container Connection - iterate over the items and determine the values for them - for cur_item in self.based_on_elements.copy(): - if isinstance(cur_item, tuple): - new_tuple = tuple([cur_tuple_item.get_resolved() for cur_tuple_item in cur_item]) - copied_base.append_to_based_on(new_tuple) - else: - copied_base.append_to_based_on(cur_item.get_resolved()) + copied_base.append_to_based_on(self.based_on_elements.get_simplified_relation().get_resolved()) else: - for next_higher_parent in self.based_on_elements: - if isinstance(next_higher_parent, tuple): + # independent which based-on elements we have, we need to determine all elements between this connection + # and the elements of the relation + simplified_based_on = self.based_on_elements.get_simplified_relation() + + for next_higher_parent in simplified_based_on: + if isinstance(next_higher_parent, AndConnectionRelation): # determine all possibilities - direct_ancestors_tuple = () - for cur_tuple_elem in next_higher_parent: - if cur_tuple_elem.__class__ in self.__class__.get_parents(): + direct_ancestors_relations = () + for cur_and_elem in next_higher_parent: + # `cur_and_elem` needs to be a connection, because we are using simplified which has only + # `OR[AND[Cnn, ...], Cnn, ..]` + if cur_and_elem.__class__ in self.__class__.get_parents(): # element already is a direct ancestor - direct_ancestors_tuple += (cur_tuple_elem, ) + direct_ancestors_relations += (cur_and_elem, ) else: all_pos_possibilities = [] # add all possible direct parents to the possibilities list for cur_direct_parent in self.__class__.get_parents(): - if cur_direct_parent.is_parent_of(cur_tuple_elem.__class__): - all_pos_possibilities.append( - cur_direct_parent.based_on(AndConnectionRelation(*cur_tuple_elem)) - ) - direct_ancestors_tuple += (all_pos_possibilities, ) - # resolve the opportunities and create multiple possible tuples where all elements are direct - # parents - for cur_possibility in itertools.product(*direct_ancestors_tuple): - new_child_tuple = ( - tuple([cur_tuple_element.get_resolved() for cur_tuple_element in cur_possibility])) - copied_base.append_to_based_on(new_child_tuple) + if cur_direct_parent.is_parent_of(cur_and_elem.__class__): + all_pos_possibilities.append(cur_direct_parent.based_on(cur_and_elem)) + direct_ancestors_relations += (all_pos_possibilities, ) + # resolve the opportunities and create multiple possible AND relations where all elements are + # direct parents + for cur_possibility in itertools.product(*direct_ancestors_relations): + new_child_relation = AndConnectionRelation(*cur_possibility).get_resolved() + copied_base.append_to_based_on(new_child_relation) else: + # `next_higher_parent` needs to be a connection, because we are using simplified which has only + # `OR[AND[Cnn, ...], Cnn, ..]` if next_higher_parent.__class__ in self.__class__.get_parents(): # is already a direct parent copied_base.append_to_based_on(next_higher_parent.get_resolved()) @@ -945,10 +550,11 @@ def get_resolved(self) -> Connection: if next_higher_parent.__class__.is_parent_of(cur_self_direct_parent): new_child = cur_self_direct_parent.based_on(next_higher_parent) copied_base.append_to_based_on(new_child.get_resolved()) - # if it is a connection container, where only one element exists that is no tuple -> return this directly + + # if it is a connection container, where only one element exists that is no AND relation -> return this directly # instead of the container if copied_base.__class__ == Connection and len(copied_base.based_on_elements) == 1 and not \ - isinstance(copied_base.based_on_elements[0], tuple): + isinstance(copied_base.based_on_elements[0], AndConnectionRelation): return copied_base.based_on_elements[0] return copied_base @@ -963,42 +569,29 @@ def get_singles(self) -> List[Connection]: If the current object is a container :class:`Connection` object (direct instance of :class:`Connection`), this method returns the single elements without a container class! """ - all_singles = [] + # note: This method always work with the resolved and simplified version of the object because it is using + # `get_resolved()`. + if self.is_universal(): + return [self] - all_self_items = [self] - if self.__class__ == Connection and len(self.based_on_elements) == 0: + if self.is_single(): return [self] - if self.__class__ == Connection: - all_self_items = self.based_on_elements - for cur_item in all_self_items: - if isinstance(cur_item, tuple): - all_singles += Connection.convert_tuple_to_singles(cur_item) - elif cur_item.is_single(): - all_singles.append(copy.copy(cur_item)) - else: - # do this for resolved version of this object - resolved_obj = cur_item.get_resolved() - - if len(resolved_obj.based_on_elements) == 0: - # element has no children -> return only this - return [resolved_obj] - for cur_child in resolved_obj.based_on_elements: - for cur_single_child in cur_child.get_singles(): - copied_base = resolved_obj.clone_without_based_on_elements() - copied_base.append_to_based_on(cur_single_child) - all_singles.append(copied_base) - # convert all tuple objects in a connection object and set metadata of self object - cleaned_singles = [] - - for cur_single in all_singles: - if isinstance(cur_single, Connection): - new_cnn = cur_single - else: - new_cnn = Connection.based_on(AndConnectionRelation(*cur_single)) - new_cnn.set_metadata_for_all_subitems(self.metadata) - cleaned_singles.append(new_cnn) - return cleaned_singles + all_singles = [] + + resolved_self = self.get_resolved() + + for cur_child in resolved_self.based_on_elements: + for cur_single_child in cur_child.get_singles(): + cur_single_child = cur_single_child.clone() + if self.__class__ == Connection and isinstance(cur_single_child, Connection): + all_singles.append(cur_single_child) + else: + copied_base = resolved_self.clone_without_based_on_elements() + copied_base.append_to_based_on(cur_single_child) + all_singles.append(copied_base) + + return all_singles def get_conn_partner_of(self, device: Type[Device], node: Union[str, None] = None) -> Tuple[Type[Device], str]: """ @@ -1009,30 +602,9 @@ def get_conn_partner_of(self, device: Type[Device], node: Union[str, None] = Non :param node: the node name of the device itself (only required if the connection starts and ends with the same device) """ - if device not in (self.from_device, self.to_device): - raise ValueError(f"the given device `{device.__qualname__}` is no component of this connection") - if node is None: - # check that the from_device and to_device are not the same - if self.from_device == self.to_device: - raise ValueError("the connection is a inner-device connection (start and end is the same device) - you " - "have to provide the `node` string too") - if device == self.from_device: - return self.to_device, self.to_node_name - - return self.from_device, self.from_node_name - - if node not in (self.from_node_name, self.to_node_name): - raise ValueError(f"the given node `{node}` is no component of this connection") - - if device == self.from_device and node == self.from_node_name: - return self.to_device, self.to_node_name + return self.metadata.get_conn_partner_of(device, node) - if device == self.to_device and node == self.to_node_name: - return self.from_device, self.from_node_name - - raise ValueError(f"the given node `{node}` is no component of the given device `{device.__qualname__}`") - - def has_connection_from_to(self, start_device, end_device=None): + def has_connection_from_to(self, start_device, end_device=None) -> bool: """ This method checks if there is a connection from ``start_device`` to ``end_device``. This will return true if the ``start_device`` and ``end_device`` given in this method are also the ``start_device`` and @@ -1049,18 +621,7 @@ def has_connection_from_to(self, start_device, end_device=None): :return: returns true if the given direction is possible """ - if end_device is None: - - if self.is_bidirectional(): - return start_device in (self.from_device, self.to_device) - - return start_device == self.from_device - - if self.is_bidirectional(): - return start_device == self.from_device and end_device == self.to_device or \ - start_device == self.to_device and end_device == self.from_device - - return start_device == self.from_device and end_device == self.to_device + return self.metadata.has_connection_from_to(start_device, end_device) def equal_with(self, other_conn: Connection, ignore_metadata=False) -> bool: """ @@ -1073,11 +634,7 @@ def equal_with(self, other_conn: Connection, ignore_metadata=False) -> bool: .. note:: Note that it doesn't matter if the connection is embedded in a container-Connection element (direct - instance of :class`Connection`) or not. It only checks, that the logical data of them are the same. If both - elements are a container for a list of child connections, the method secures that both has the same - children. If only one (same for both) has one child connection which is embedded in a container - :class:`Connection` class, it returns also true if the other connection is like the one child element of - the other container :class:`Connection`. + instance of :class`Connection`) or not. It only checks, that the logical data of them are the same. :param other_conn: the other connection, this connection will be compared with @@ -1085,11 +642,8 @@ def equal_with(self, other_conn: Connection, ignore_metadata=False) -> bool: :return: returns True if both elements are same """ - if self.__class__ not in (Connection, other_conn.__class__): - return False - if not ignore_metadata: - metadata_check_result = self._metadata_equal_with(of_connection=other_conn) + metadata_check_result = self.metadata.equal_with(other_conn.metadata) if metadata_check_result is False: return False @@ -1098,37 +652,18 @@ def equal_with(self, other_conn: Connection, ignore_metadata=False) -> bool: resolved_self = self.get_resolved() resolved_other = other_conn.get_resolved() - self_based_on_elems = [ - cur_elem for cur_elem in resolved_self.based_on_elements if isinstance(cur_elem, Connection)] - self_based_on_tuples = [ - cur_elem for cur_elem in resolved_self.based_on_elements if isinstance(cur_elem, tuple)] - other_based_on_elems = [ - cur_elem for cur_elem in resolved_other.based_on_elements if isinstance(cur_elem, Connection)] - other_based_on_tuples = [ - cur_elem for cur_elem in resolved_other.based_on_elements if isinstance(cur_elem, tuple)] - - # check single connection elements (if they match all in both directions) - if (not self.__class__.check_equal_connections_are_in( - cnns_from=self_based_on_elems, are_in_cnns_from=other_based_on_elems, ignore_metadata=ignore_metadata) - or not self.__class__.check_equal_connections_are_in( - cnns_from=other_based_on_elems, are_in_cnns_from=self_based_on_elems, - ignore_metadata=ignore_metadata)): - return False - - # check tuple connection elements (if they match all in both directions) - if (not self.__class__.check_equal_connection_tuples_are_in( - tuples_from=self_based_on_tuples, are_in_tuples_from=other_based_on_tuples, - ignore_metadata=ignore_metadata) - or not self.__class__.check_equal_connection_tuples_are_in( - tuples_from=other_based_on_tuples, are_in_tuples_from=self_based_on_tuples, - ignore_metadata=ignore_metadata)): + if self.__class__ != other_conn.__class__: return False - return True + return resolved_self.based_on_elements.equal_with(resolved_other.based_on_elements) - def contained_in(self, other_conn: Connection, ignore_metadata=False) -> bool: + def contained_in( + self, + other_conn: Connection | AndConnectionRelation | OrConnectionRelation, + ignore_metadata=False + ) -> bool: """ - This method helps to find out whether this connection-tree fits within a given connection tree. A connection + This method helps to find out whether this connection-tree fits within another connection tree. A connection object is a certain part of the large connection tree that Balder has at its disposal. This method checks whether a possibility of this connection tree fits in one possibility of the given connection tree. @@ -1142,8 +677,11 @@ def contained_in(self, other_conn: Connection, ignore_metadata=False) -> bool: :return: true if the self object is contained in the `other_conn`, otherwise false """ + # note: This method always work with the resolved and simplified version of the object because it is using + # `get_resolved()`. + if not ignore_metadata: - metadata_check_result = self._metadata_contained_in(of_connection=other_conn) + metadata_check_result = self.metadata.contained_in(other_conn.metadata) if metadata_check_result is False: return False @@ -1157,158 +695,111 @@ def contained_in(self, other_conn: Connection, ignore_metadata=False) -> bool: self_possibilities = [resolved_self] if resolved_self.__class__ == Connection: - self_possibilities = resolved_self.based_on_elements + self_possibilities = resolved_self.based_on_elements.connections + + # if one of the self_possibilities is contained_in the other, the method should return true for cur_self in self_possibilities: - if isinstance(cur_self, tuple): - if Connection.check_if_tuple_contained_in_connection(cur_self, resolved_other): - return True - elif not cur_self.__class__ == resolved_other.__class__: - for cur_based_on_elem in resolved_other.based_on_elements: - if isinstance(cur_based_on_elem, tuple): - # check if the current connection fits in one of the tuple items -> allowed too (like a smaller - # AND contained in a bigger AND) - for cur_other_tuple_element in cur_based_on_elem: - if cur_self.contained_in(cur_other_tuple_element, ignore_metadata=ignore_metadata): - return True - else: - if cur_self.contained_in(cur_based_on_elem, ignore_metadata=ignore_metadata): - # element was found in this branch - return True - else: - # The element itself has already matched, now we still have to check whether at least one tuple or a - # connection (i.e. one element of the list) is matched with one of the other + if isinstance(cur_self, OrConnectionRelation): + # should not happen, because we are using resolved elements (using simplified version) + raise ValueError(f'unexpected type `{OrConnectionRelation.__name__}`') + + if cur_self.__class__ == resolved_other.__class__: + # The element itself has already matched, now we still have to check whether at least one inner element + # of this type is contained in the minimum one element of the other singles_self = cur_self.get_singles() singles_other = resolved_other.get_singles() # check that for one single_self element all hierarchical based_on elements are in one of the single # other element - for cur_single_self in singles_self: - for cur_single_other in singles_other: - # check if both consists of only one element - if len(cur_single_self.based_on_elements) == 0: - # the cur self single is only one element -> this is contained in the other + for cur_single_self, cur_single_other in itertools.product(singles_self, singles_other): + # check if both consists of only one element + if len(cur_single_self.based_on_elements) == 0: + # the cur self single is only one element -> this is contained in the other + return True + + if len(cur_single_other.based_on_elements) == 0: + # the other element is only one element, but the self element not -> contained_in + # for this single definitely false + continue + + # note: for both only one `based_on_elements` is possible, because they are singles + self_first_based_on = cur_single_self.based_on_elements[0] + other_first_based_on = cur_single_other.based_on_elements[0] + + self_is_and = isinstance(self_first_based_on, AndConnectionRelation) + self_is_cnn = isinstance(self_first_based_on, Connection) + other_is_and = isinstance(other_first_based_on, AndConnectionRelation) + other_is_cnn = isinstance(other_first_based_on, Connection) + + if self_is_and and (other_is_and or other_is_cnn) or self_is_cnn and other_is_cnn: + # find a complete valid match + if self_first_based_on.contained_in(other_first_based_on, ignore_metadata=ignore_metadata): + return True + # skip all others possibilities + elif isinstance(cur_self, AndConnectionRelation): + if cur_self.contained_in(resolved_other): + return True + else: + # the elements itself do not match -> go deeper within the other connection + if isinstance(resolved_other, AndConnectionRelation): + # check if the current connection fits in one of the AND relation items -> allowed too (f.e. a + # smaller AND contained in a bigger AND) + for cur_other_and_element in resolved_other.connections: + if cur_self.contained_in(cur_other_and_element, ignore_metadata=ignore_metadata): return True - if len(cur_single_other.based_on_elements) == 0: - # the other element is only one element, but the self element not -> contained_in - # for this single definitely false - continue - - # note: for both only one `based_on_elements` is possible, because they are singles - if isinstance(cur_single_self.based_on_elements[0], tuple) and \ - isinstance(cur_single_other.based_on_elements[0], tuple): - # both are tuples -> check that there exist a matching where all are contained_in - tuple_is_complete_contained_in = True - # check that every tuple element of self is contained in minimum one tuple elements of each - # other - for cur_self_tuple_element in cur_single_self.based_on_elements[0]: - find_some_match_for_cur_self_tuple_element = False - for cur_other_tuple_element in cur_single_other.based_on_elements[0]: - if cur_self_tuple_element.contained_in( - cur_other_tuple_element, ignore_metadata=ignore_metadata): - # find a match, where the current tuple element is contained in one tuple - # element of the other - find_some_match_for_cur_self_tuple_element = True - break - if not find_some_match_for_cur_self_tuple_element: - tuple_is_complete_contained_in = False - break - if tuple_is_complete_contained_in: - # find a complete valid match - return True - elif isinstance(cur_single_self.based_on_elements[0], Connection) and \ - isinstance(cur_single_other.based_on_elements[0], Connection): - # both are connection trees -> check if the subtrees are contained in - if cur_single_self.based_on_elements[0].contained_in( - cur_single_other.based_on_elements[0], ignore_metadata=ignore_metadata): - # find a complete valid match + resolved_other_relation = resolved_other.based_on_elements \ + if isinstance(resolved_other, Connection) else resolved_other + + for cur_other_based_on in resolved_other_relation.connections: + if isinstance(cur_other_based_on, AndConnectionRelation): + # check if the current connection fits in one of the AND relation items -> allowed too (f.e. a + # smaller AND contained in a bigger AND) + for cur_other_and_element in cur_other_based_on.connections: + if cur_self.contained_in(cur_other_and_element, ignore_metadata=ignore_metadata): return True - elif isinstance(cur_single_self.based_on_elements[0], tuple) and \ - isinstance(cur_single_other.based_on_elements[0], Connection): - # this is allowed too, if every tuple item is contained_in the relevant connection - # check that every tuple element of self is contained in the other connection - for cur_self_tuple_element in cur_single_self.based_on_elements[0]: - if not cur_self_tuple_element.contained_in( - cur_single_other, ignore_metadata=ignore_metadata): - # find a match, where the current tuple element is not contained in the other - # connection -> tuple can not be contained in - return False + else: + if cur_self.contained_in(cur_other_based_on, ignore_metadata=ignore_metadata): + # element was found in this branch return True - # skip all others possibilities return False def intersection_with( - self, other_conn: Union[Connection, Type[Connection], - List[Connection, Type[Connection], Tuple[Connection]]]) \ + self, other_conn: Union[Connection, Type[Connection], AndConnectionRelation, OrConnectionRelation]) \ -> Union[Connection, None]: """ This method returns a list of subtrees that describe the intersection of this connection subtree and the given ones. Note that this method converts all connections in **single** **resolved** connections first. - For these connections the method checks if there are common intersections between the elements of this object - and the given connection elements. + The method checks if there are common intersections between the elements of this object and the given connection + elements within the single connections. The method cleans up this list and only return unique sub-connection trees! :param other_conn: the other sub connection tree list :return: the intersection connection or none, if the method has no intersection """ - if isinstance(other_conn, type): - if not issubclass(other_conn, Connection): - raise TypeError("the given `other_conn` has to be from type `Connection`") - other_conn = other_conn() - - if isinstance(other_conn, Connection): - if other_conn.__class__ == Connection: - if len(other_conn.based_on_elements) == 0: - return self.clone() - other_conn = other_conn.based_on_elements - else: - other_conn = [other_conn] + other_conn = cnn_type_check_and_convert(other_conn) - if self.__class__ == Connection and len(self.based_on_elements) == 0: - return Connection.based_on( - OrConnectionRelation(*[ - AndConnectionRelation(*inner) if isinstance(inner, tuple) else inner - for inner in other_conn]) - ).clone() + if isinstance(other_conn, Connection) and other_conn.__class__ == Connection: + if len(other_conn.based_on_elements) == 0: + return self.clone() + other_conn = other_conn.based_on_elements - # determine all single connection of the two sides (could contain tuple, where every element is a single + if self.is_universal(): + return other_conn.clone() if isinstance(other_conn, Connection) else Connection.based_on(other_conn.clone()) + + # determine all single connection of the two sides (could contain AND relations, where every element is a single # connection too) - self_conn_singles = [] - other_conn_singles = [] - - idx = 0 - for cur_self_elem in self.get_singles(): - if isinstance(cur_self_elem, Connection): - self_conn_singles.append(cur_self_elem) - elif isinstance(cur_self_elem, type) and issubclass(cur_self_elem, Connection): - self_conn_singles.append(cur_self_elem()) - else: - raise TypeError(f"the element at index {idx} of `self_conn` is not from type `Connection` or is a " - f"tuple of that") - - idx = 0 - for cur_other_elem in other_conn: - if isinstance(cur_other_elem, tuple): - other_conn_singles += Connection.convert_tuple_to_singles(cur_other_elem) - elif isinstance(cur_other_elem, Connection): - other_conn_singles += cur_other_elem.get_singles() - elif isinstance(cur_other_elem, type) and issubclass(cur_other_elem, Connection): - other_conn_singles.append(cur_other_elem()) - else: - raise TypeError(f"the element at index {idx} of `other_conn` is not from type `Connection` or is a " - f"tuple of that") - idx += 1 + self_conn_singles = self.get_singles() + other_conn_singles = other_conn.get_singles() intersections = [] # determine intersections between all of these single components - for cur_self_conn in self_conn_singles: - for cur_other_conn in other_conn_singles: - for cur_intersection in cur_self_conn.get_intersection_with_other_single(cur_other_conn): - if isinstance(cur_intersection, tuple): - cur_intersection = Connection.based_on(AndConnectionRelation(*cur_intersection)) - if cur_intersection not in intersections: - intersections.append(cur_intersection) + for cur_self_conn, cur_other_conn in itertools.product(self_conn_singles, other_conn_singles): + for cur_intersection in cur_self_conn.get_intersection_with_other_single(cur_other_conn): + intersections.append(cur_intersection) + + intersections = set(intersections) #: filter all *contained in each other* connections intersection_filtered = [] @@ -1327,79 +818,50 @@ def intersection_with( if len(intersection_filtered) == 0: # there is no intersection return None - if len(intersection_filtered) > 1 or isinstance(intersection_filtered[0], tuple): - return Connection.based_on(AndConnectionRelation(*intersection_filtered)).clone() - return intersection_filtered[0].clone() + return Connection.based_on(OrConnectionRelation(*[cnn.clone() for cnn in intersection_filtered])) def append_to_based_on( - self, *args: Union[Connection, Type[Connection], Tuple[Union[Type[Connection], Connection]]]) -> None: + self, *args: Union[Type[Connection], Connection, OrConnectionRelation, AndConnectionRelation]) -> None: """ with this method you can extend the internal based_on list with the transferred elements. Any number of - :meth:`Connection` objects or tuples with :meth:`Connection` objects can be given to this method + :meth:`Connection` objects or relations with :meth:`Connection` objects can be given to this method. They will + all be added to the internal OR relation. :param args: all connection items that should be added here """ - for cur_idx, cur_connection in enumerate(args): - if isinstance(cur_connection, type): - if not issubclass(cur_connection, Connection): - raise TypeError(f"illegal type `{cur_connection.__name__}` for parameter number {cur_idx}") - if self.__class__ != Connection: - if not cur_connection.is_parent_of(self.__class__): - raise IllegalConnectionTypeError( - f"the given connection `{cur_connection.__name__}` (parameter pos {cur_idx}) is no parent " - f"class of the `{self.__class__.__name__}`") - # this is a simple Connection type object -> simply add an instance of it to the full list - new_conn = cur_connection() - self._based_on_connections.append(new_conn) - - elif isinstance(cur_connection, Connection): - if cur_connection.__class__ == Connection: - raise ValueError(f"it is not allowed to provide a container Connection object in based_on items - " - f"found at index {cur_idx}") - # `based_on` call for this sub-connection because we get an `Connection` object - if self.__class__ != Connection: - if not cur_connection.__class__.is_parent_of(self.__class__): - raise IllegalConnectionTypeError( - f"the given connection `{cur_connection.__class__.__name__}` (parameter pos {cur_idx}) is " - f"no parent class of the `{self.__class__.__name__}`") - self._based_on_connections.append(cur_connection) - - elif isinstance(cur_connection, tuple): - result_tuple = () - for cur_tuple_idx, cur_tuple_elem in enumerate(cur_connection): - if isinstance(cur_tuple_elem, type): - if not issubclass(cur_tuple_elem, Connection): - raise TypeError(f"illegal type `{cur_tuple_elem.__name__}` for tuple element " - f"{cur_tuple_idx} for parameter number {cur_idx}") - if self.__class__ != Connection: - if not cur_tuple_elem.is_parent_of(self.__class__): - raise IllegalConnectionTypeError( - f"the given connection `{cur_tuple_elem.__name__}` (tuple element {cur_tuple_idx} " - f"for parameter at pos {cur_idx}) is no parent class of the " - f"`{self.__class__.__name__}`") - # this is a simple Connection type object -> simply add the instance of connection to result - # tuple - result_tuple += (cur_tuple_elem(), ) - elif isinstance(cur_tuple_elem, Connection): - # `based_on` call for this sub-connection because we get an `Connection` object - if self.__class__ != Connection: - if not cur_tuple_elem.__class__.is_parent_of(self.__class__): - raise IllegalConnectionTypeError( - f"the given connection `{cur_tuple_elem.__class__.__name__}` (tuple element " - f"{cur_tuple_idx} for parameter at pos {cur_idx}) is no parent class of the " - f"`{self.__class__.__name__}`") - result_tuple += (cur_tuple_elem, ) - elif isinstance(cur_tuple_elem, tuple): - raise TypeError(f"nested tuples (tuple element {cur_tuple_idx} for parameter at pos {cur_idx}) " - f"and thus nested AND operations are not possible") - elif isinstance(cur_tuple_elem, list): - raise TypeError(f"nested lists (tuple element {cur_tuple_idx} for parameter at pos {cur_idx}) " - f"and thus nested OR/AND operations are not possible") - else: - raise TypeError(f"illegal type `{cur_tuple_elem.__name__}` for tuple element {cur_tuple_idx} " - f"for parameter at pos {cur_idx}") - self._based_on_connections.append(result_tuple) + def validate_that_subconnection_is_parent(idx, connection_type: Type[Connection]): + if connection_type == Connection: + raise ValueError(f"it is not allowed to provide a container Connection object in based_on items - " + f"found at index {idx}") + # `based_on` call for this sub-connection because we get an `Connection` object + if self.__class__ != Connection: + if not connection_type.is_parent_of(self.__class__): + raise IllegalConnectionTypeError( + f"the connection `{cur_elem.__class__.__name__}` (at parameter pos {idx}) is " + f"no parent class of the `{self.__class__.__name__}`") + + for cur_idx, cur_elem in enumerate(args): + if isinstance(cur_elem, type) and issubclass(cur_elem, Connection): + cur_elem = cur_elem() + + if isinstance(cur_elem, Connection): + if cur_elem.is_universal(): + # ignore it, because it is irrelevant for this connection + continue + validate_that_subconnection_is_parent(cur_idx, cur_elem.__class__) + self._based_on_connections.append(cur_elem) + elif isinstance(cur_elem, OrConnectionRelation): + for cur_inner_elem in cur_elem.connections: + if isinstance(cur_inner_elem, Connection) and cur_inner_elem.is_universal(): + # ignore it, because it is irrelevant for this connection + continue + validate_that_subconnection_is_parent(cur_idx, cur_inner_elem.__class__) + self._based_on_connections.append(cur_inner_elem) + elif isinstance(cur_elem, AndConnectionRelation): + for cur_inner_type in cur_elem.get_all_used_connection_types(): + validate_that_subconnection_is_parent(cur_idx, cur_inner_type) + self._based_on_connections.append(cur_elem) else: - raise TypeError(f"illegal type `{cur_connection.__name__}` for parameter at pos {cur_idx}") + raise TypeError(f"illegal type `{cur_elem.__name__}` for parameter at pos {cur_idx}") diff --git a/src/_balder/connection_metadata.py b/src/_balder/connection_metadata.py new file mode 100644 index 00000000..47ae5132 --- /dev/null +++ b/src/_balder/connection_metadata.py @@ -0,0 +1,234 @@ +from __future__ import annotations +from typing import Union, Type, Tuple +from .device import Device + + +class ConnectionMetadata: + """ + Describes the metadata of a connection. + """ + + def __init__( + self, + from_device: Union[Type[Device], None] = None, + to_device: Union[Type[Device], None] = None, + from_device_node_name: Union[str, None] = None, + to_device_node_name: Union[str, None] = None, + bidirectional: bool = True, + ): + + self._from_device = None + self._from_device_node_name = None + self.set_from(from_device, from_device_node_name) + + self._to_device = None + self._to_device_node_name = None + self.set_to(to_device, to_device_node_name) + + if not ((from_device is None and to_device is None and from_device_node_name is None + and to_device_node_name is None) or ( + from_device is not None and to_device is not None and from_device_node_name is not None and + to_device_node_name is not None)): + raise ValueError( + "you have to provide all or none of the following items: `from_device`, `from_device_node_name`, " + "`to_device` or `to_device_node_name`") + + # describes if the connection is uni or bidirectional + self._bidirectional = bidirectional + + def __eq__(self, other: ConnectionMetadata): + return self.equal_with(other) + + def __hash__(self): + all_hashes = hash(self._from_device) + hash(self._to_device) + hash(self._from_device_node_name) + \ + hash(self._to_device_node_name) + hash(self._bidirectional) + return hash(all_hashes) + + def __compare_with(self, other: ConnectionMetadata, allow_single_unidirectional_for_both_directions: bool) -> bool: + """ + This method checks, if the metadata of this object is the metadata of the other object. + + The method returns true in the following situations: + * both connections are bidirectional / the FROM and TO elements (device and node name) are the same + * both connections are bidirectional / the FROM is the TO and the TO is the FROM + * both connections are unidirectional and have the same from and to elements + + If the parameter `allow_single_unidirectional_for_both_directions` is True, it additionally checks the following + situations: + * one is unidirectional / the other is bidirectional / the FROM and TO elements are the same + * one is unidirectional / the other is bidirectional / the FROM is the TO and the TO is the FROM + """ + def check_same() -> bool: + return (self.from_device == other.from_device and self.from_node_name == other.from_node_name and + self.to_device == other.to_device and self.to_node_name == other.to_node_name) + + def check_twisted() -> bool: + return (self.from_device == other.to_device and self.from_node_name == other.to_node_name and + self.to_device == other.from_device and self.to_node_name == other.from_node_name) + + # CHECK: both connections are bidirectional / the FROM and TO elements (device and node name) are the same + # CHECK: both connections are bidirectional / the FROM is the TO and the TO is the FROM + if self.bidirectional and other.bidirectional: + return check_same() or check_twisted() + # CHECK: both connections are unidirectional and have the same from and to elements + if not self.bidirectional and not other.bidirectional: + return check_same() + + if allow_single_unidirectional_for_both_directions: + # CHECK: one is unidirectional / the other is bidirectional / the FROM and TO elements are the same + # CHECK: one is unidirectional / the other is bidirectional / the FROM is the TO and the TO is the FROM + if self.bidirectional and not other.bidirectional or not self.bidirectional and other.bidirectional: + return check_same() or check_twisted() + return False + + def set_from(self, from_device: Union[Type[Device], None], from_device_node_name: Union[str, None] = None): + """ + This method sets the FROM device and node for this connection. + + :param from_device: The FROM device of this connection. + :param from_device_node_name: The FROM node of this connection (if it should be set, otherwise None). + """ + if from_device is not None and isinstance(from_device, type) and not issubclass(from_device, Device): + raise TypeError(f"detect illegal argument element {str(from_device)} for given attribute " + f"`from_device` - should be a subclasses of `balder.Device`") + self._from_device = from_device + + if from_device_node_name is not None and not isinstance(from_device_node_name, str): + raise TypeError(f"detect illegal argument type {type(from_device_node_name)} for given attribute " + f"`from_device_node_name` - should be a string value") + self._from_device_node_name = from_device_node_name + + def set_to(self, to_device: Union[Type[Device], None], to_device_node_name: Union[str, None] = None): + """ + This method sets the TO device and node of this connection. + + :param to_device: The TO device of this connection. + :param to_device_node_name: The TO node of this connection (if it should be set, otherwise None). + """ + if to_device is not None and isinstance(to_device, type) and not issubclass(to_device, Device): + raise TypeError(f"detect illegal argument element {str(to_device)} for given attribute " + f"`to_device` - should be a subclasses of `balder.Device`") + self._to_device = to_device + + if to_device_node_name is not None and not isinstance(to_device_node_name, str): + raise TypeError(f"detect illegal argument type {type(to_device_node_name)} for given attribute " + f"`to_device_node_name` - should be a string value") + self._to_device_node_name = to_device_node_name + + def get_conn_partner_of(self, device: Type[Device], node: Union[str, None] = None) -> Tuple[Type[Device], str]: + """ + This method returns the connection partner of this connection - it always returns the other not given side + + :param device: the device itself - the other will be returned + + :param node: the node name of the device itself (only required if the connection starts and ends with the same + device) + """ + if device not in (self.from_device, self.to_device): + raise ValueError(f"the given device `{device.__qualname__}` is no component of this connection") + if node is None: + # check that the from_device and to_device are not the same + if self.from_device == self.to_device: + raise ValueError("the connection is a inner-device connection (start and end is the same device) - you " + "have to provide the `node` string too") + if device == self.from_device: + return self.to_device, self.to_node_name + + return self.from_device, self.from_node_name + + if node not in (self.from_node_name, self.to_node_name): + raise ValueError(f"the given node `{node}` is no component of this connection") + + if device == self.from_device and node == self.from_node_name: + return self.to_device, self.to_node_name + + if device == self.to_device and node == self.to_node_name: + return self.from_device, self.from_node_name + + raise ValueError(f"the given node `{node}` is no component of the given device `{device.__qualname__}`") + + def has_connection_from_to(self, start_device, end_device=None) -> bool: + """ + This method checks if there is a connection from ``start_device`` to ``end_device``. This will return + true if the ``start_device`` and ``end_device`` given in this method are also the ``start_device`` and + ``end_device`` mentioned in this connection object. If this is a bidirectional connection, ``start_device`` and + ``end_device`` can switch places. + + + :param start_device: the device for which the method should check whether it is a communication partner (for + non-bidirectional connection, this has to be the start device) + + :param end_device: the other device for which the method should check whether it is a communication partner (for + non-bidirectional connection, this has to be the end device - this is optional if only the + start device should be checked) + + :return: returns true if the given direction is possible + """ + if end_device is None: + + if self.bidirectional: + return start_device in (self.from_device, self.to_device) + + return start_device == self.from_device + + if self.bidirectional: + return start_device == self.from_device and end_device == self.to_device or \ + start_device == self.to_device and end_device == self.from_device + + return start_device == self.from_device and end_device == self.to_device + + def equal_with(self, other: ConnectionMetadata) -> bool: + """ + This method returns true if the metadata of the current connection is equal with the metadata of the given + connection. + + The method returns true in the following situations: + * both connections are bidirectional and the from and to elements (device and node name) are the same + * both connections are unidirectional and have the same from and to elements + * both connections are bidirectional and the from is the to and the to is the from + + :return: true if the metadata of the current connection is contained in the metadata of the given one + """ + return self.__compare_with(other, allow_single_unidirectional_for_both_directions=False) + + def contained_in(self, other: ConnectionMetadata) -> bool: + """ + This method returns true if the metadata of the current connection is contained in the given one. + + The method returns true in the following situations: + * both connections are bidirectional and the from and to elements (device and node name) are the same + * both connections are unidirectional and have the same from and to elements + * both connections are bidirectional and the from is the to and the to is the from + * one connection is unidirectional and the other is bidirectional and the from and to elements are the same + * one connection is unidirectional and the other is bidirectional and the from is the to and the to is the from + + :return: true if the metadata of the current connection is contained in the metadata of the given one + """ + return self.__compare_with(other, allow_single_unidirectional_for_both_directions=True) + + @property + def from_device(self): + """device from which the connection starts""" + return self._from_device + + @property + def to_device(self): + """device at which the connection ends""" + return self._to_device + + @property + def from_node_name(self): + """the name of the node in the `Device` from which the connection starts""" + return self._from_device_node_name + + @property + def to_node_name(self): + """the name of the node in the `Device` at which the connection ends""" + return self._to_device_node_name + + @property + def bidirectional(self) -> bool: + """ + returns true if the connection is bidirectional (can go in both directions) otherwise false + """ + return self._bidirectional diff --git a/src/_balder/controllers/device_controller.py b/src/_balder/controllers/device_controller.py index 1da266d2..59196a16 100644 --- a/src/_balder/controllers/device_controller.py +++ b/src/_balder/controllers/device_controller.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import Dict, List, Type, Tuple, Union, TYPE_CHECKING +from typing import Dict, List, Type, Union, TYPE_CHECKING import sys import logging @@ -257,7 +257,7 @@ def get_all_absolute_connections(self) -> Dict[str, List[Connection]]: # for cur_gateway in self._gateways: # cur_gateway.validate_given_node_names() - def get_node_types(self) -> Dict[str, List[Connection, Tuple[Connection]]]: + def get_node_types(self) -> Dict[str, List[Connection | None]]: """ This method returns a dictionary with the node name as key and a connection class as value. This class describes the common connection sub-tree, that all incoming and outgoing connections of the related device have @@ -342,16 +342,16 @@ def resolve_connection_device_strings(self): if cur_conn.to_device in all_inner_classes_of_outer.keys(): meta = cur_conn.metadata - meta["to_device"] = all_inner_classes_of_outer[cur_conn.to_device] - if meta["to_device_node_name"] is None: - # no unique node was given -> create one - meta["to_device_node_name"] = \ - DeviceController.get_for(meta["to_device"]).get_new_empty_auto_node() + to_device = all_inner_classes_of_outer[cur_conn.to_device] + # if there was given no unique node -> create one + to_device_node_name = DeviceController.get_for(to_device).get_new_empty_auto_node() \ + if meta.to_node_name is None else meta.to_node_name - # first reset whole metadata - cur_conn.metadata = {} - # now set metadata - cur_conn.metadata = meta + meta.set_to( + to_device=to_device, + to_device_node_name=to_device_node_name) + + cur_conn.set_metadata_for_all_subitems(meta) else: raise DeviceResolvingException( f"cannot resolve the str for the given device class `{cur_conn.to_device}` for " diff --git a/src/_balder/controllers/feature_controller.py b/src/_balder/controllers/feature_controller.py index 53fda9cc..ba5be42b 100644 --- a/src/_balder/controllers/feature_controller.py +++ b/src/_balder/controllers/feature_controller.py @@ -1,9 +1,11 @@ from __future__ import annotations + +import copy from typing import Type, Dict, Union, List, Callable, Tuple import logging import inspect -from _balder.cnnrelations import AndConnectionRelation, OrConnectionRelation +from _balder.cnnrelations import OrConnectionRelation from _balder.device import Device from _balder.vdevice import VDevice from _balder.feature import Feature @@ -44,16 +46,16 @@ def __init__(self, related_cls, _priv_instantiate_key): self._related_cls = related_cls #: holds the defined **Class-Based-Binding** for the related feature class sorted by VDevice types - self._cls_for_vdevice: Dict[Type[VDevice], List[Connection, Type[Connection]]] = {} + self._cls_for_vdevice: Dict[Type[VDevice], Connection] = {} #: holds the absolute calculated **Class-Based-Binding** for the related feature class sorted by VDevice types #: (will be calculated by :meth:`FeatureController.determine_absolute_class_based_for_vdevice`, which will be #: called during collecting) - self._abs_cls_for_vdevice: Union[Dict[Type[VDevice], List[Connection]], None] = None + self._abs_cls_for_vdevice: Union[Dict[Type[VDevice], Connection], None] = None #: contains the **Method-Based-Binding** information for the current feature type (will be automatically set by #: executor) - self._for_vdevice: Union[Dict[str, Dict[Callable, Dict[Type[VDevice], List[Connection]]]], None] = None + self._for_vdevice: Union[Dict[str, Dict[Callable, Dict[Type[VDevice], Connection]]], None] = None #: contains the original defined :class:`VDevice` objects for this feature (will be automatically set by #: :class:`Collector`) @@ -91,14 +93,13 @@ def _validate_vdevice_reference_used_in_for_vdevice_decorators(self): # now check if a definition for this class exists all_vdevices = self.get_abs_inner_vdevice_classes() # check the class based @for_vdevice and check the used vDevice classes here - if self.get_class_based_for_vdevice() is not None: - for cur_decorated_vdevice in self.get_class_based_for_vdevice().keys(): - if cur_decorated_vdevice not in all_vdevices: - raise VDeviceResolvingError( - f"you assign a vDevice to the class based decorator `@for_vdevice()` of the feature class " - f"`{self.related_cls.__name__}` which is no direct member of this feature - note that you have " - f"to define the vDevice in your feature before using it in the decorator - if necessary " - f"overwrite it") + for cur_decorated_vdevice in self.get_class_based_for_vdevice().keys(): + if cur_decorated_vdevice not in all_vdevices: + raise VDeviceResolvingError( + f"you assign a vDevice to the class based decorator `@for_vdevice()` of the feature class " + f"`{self.related_cls.__name__}` which is no direct member of this feature - note that you have " + f"to define the vDevice in your feature before using it in the decorator - if necessary " + f"overwrite it") # check the method based @for_vdevice and check the used vDevice classes here if self.get_method_based_for_vdevice() is not None: for cur_method_name, cur_method_data in self.get_method_based_for_vdevice().items(): @@ -147,9 +148,7 @@ def _determine_all_theoretically_unordered_method_variations( all_possible_method_variations = {} for cur_impl_method, cur_method_impl_dict in self.get_method_based_for_vdevice()[of_method_name].items(): if for_vdevice in cur_method_impl_dict.keys(): - cur_impl_method_cnns = [] - for cur_cnn in cur_method_impl_dict[for_vdevice]: - cur_impl_method_cnns += cur_cnn.get_singles() + cur_impl_method_cnns = cur_method_impl_dict[for_vdevice].get_singles() for cur_single_impl_method_cnn in cur_impl_method_cnns: if cur_single_impl_method_cnn.contained_in(with_connection, ignore_metadata=True): # this variation is possible @@ -163,26 +162,15 @@ def _determine_all_theoretically_unordered_method_variations( cur_single_impl_method_cnn)) return all_possible_method_variations - # ---------------------------------- METHODS ----------------------------------------------------------------------- - def get_class_based_for_vdevice(self) -> Union[Dict[Type[VDevice], List[Connection]], None]: + def get_class_based_for_vdevice(self) -> Dict[Type[VDevice], Connection]: """ - This method returns the class based data for the `@for_vdevice` decorator or None, if there is no decorator - given + This method returns the class based data for the `@for_vdevice` decorator. """ - result = {} - - for cur_device, cnn_list in self._cls_for_vdevice.items(): - result[cur_device] = [] - for cur_cnn in cnn_list: - if isinstance(cur_cnn, type) and issubclass(cur_cnn, Connection): - result[cur_device].append(cur_cnn()) - else: - result[cur_device].append(cur_cnn) - return result + return copy.copy(self._cls_for_vdevice) - def get_abs_class_based_for_vdevice(self) -> Dict[Type[VDevice], List[Union[Connection]]]: + def get_abs_class_based_for_vdevice(self) -> Dict[Type[VDevice], Connection]: """ This method returns the absolute calculated class-based-for-vdevice data for this feature. """ @@ -190,8 +178,7 @@ def get_abs_class_based_for_vdevice(self) -> Dict[Type[VDevice], List[Union[Conn raise RuntimeError('can not access the absolute class based for-vdevices because they are not set yet') return self._abs_cls_for_vdevice - def set_class_based_for_vdevice( - self, data: Union[Dict[Type[VDevice], List[Union[Connection, Type[Connection]]]], None]): + def set_class_based_for_vdevice(self, data: Dict[Type[VDevice], Connection]): """ This method allows to set the data of the class based `@for_vdevice` decorator. """ @@ -235,10 +222,9 @@ def determine_absolute_class_based_for_vdevice(self, print_warning): all_vdevices = self.get_abs_inner_vdevice_classes() cls_based_for_vdevice = self.get_class_based_for_vdevice() - cls_based_for_vdevice = {} if cls_based_for_vdevice is None else cls_based_for_vdevice for cur_vdevice in all_vdevices: # determine the class based for_vdevice value only if there is no one defined for this vDevice - if cur_vdevice in cls_based_for_vdevice.keys() and len(cls_based_for_vdevice[cur_vdevice]) > 0: + if cur_vdevice in cls_based_for_vdevice.keys(): # there already exists a definition for this vDevice -> IGNORE continue @@ -265,13 +251,11 @@ def determine_absolute_class_based_for_vdevice(self, print_warning): if vdevice_of_interest is not None and next_parent_feat is not None: next_parent_feat_cls_based_for_vdevice = \ FeatureController.get_for(next_parent_feat).get_abs_class_based_for_vdevice() - next_parent_feat_cls_based_for_vdevice = {} if next_parent_feat_cls_based_for_vdevice is None else \ - next_parent_feat_cls_based_for_vdevice if vdevice_of_interest in next_parent_feat_cls_based_for_vdevice.keys(): - for cur_cnn in next_parent_feat_cls_based_for_vdevice[vdevice_of_interest]: - # clean metadata here because this is no connection between real devices - cur_cnn.set_metadata_for_all_subitems(None) - parent_values.append(cur_cnn) + cnn = next_parent_feat_cls_based_for_vdevice[vdevice_of_interest] + # clean metadata here because this is no connection between real devices + cnn.set_metadata_for_all_subitems(None) + parent_values.append(cnn) this_vdevice_intersection = parent_values @@ -291,12 +275,12 @@ def determine_absolute_class_based_for_vdevice(self, print_warning): f'{", ".join([cur_cnn.get_tree_str() for cur_cnn in this_vdevice_intersection])})\n\n') # set the determined data into the class based `@for_vdevice` class property - cls_based_for_vdevice[cur_vdevice] = this_vdevice_intersection + cls_based_for_vdevice[cur_vdevice] = Connection.based_on(OrConnectionRelation(*this_vdevice_intersection)) self._abs_cls_for_vdevice = cls_based_for_vdevice def get_method_based_for_vdevice(self) -> \ - Union[Dict[str, Dict[Callable, Dict[Type[VDevice], List[Connection]]]], None]: + Union[Dict[str, Dict[Callable, Dict[Type[VDevice], Connection]]], None]: """ This method returns the method based data for the `@for_vdevice` decorator or None, if there is no decorator given @@ -304,16 +288,19 @@ def get_method_based_for_vdevice(self) -> \ return self._for_vdevice def set_method_based_for_vdevice( - self, data: Union[Dict[str, Dict[Callable, Dict[Type[VDevice], List[Connection]]]], None]): + self, data: Union[Dict[str, Dict[Callable, Dict[Type[VDevice], Connection]]], None]): """ This method allows to set the data for the method based `@for_vdevice` decorator. """ self._for_vdevice = data def get_method_variation( - self, of_method_name: str, for_vdevice: Type[VDevice], - with_connection: Union[Connection, Tuple[Connection]], ignore_no_findings: bool = False) \ - -> Union[Callable, None]: + self, + of_method_name: str, + for_vdevice: Type[VDevice], + with_connection: Connection, + ignore_no_findings: bool = False + ) -> Union[Callable, None]: """ This method searches for the unique possible method variation and returns it. In its search, the method also includes the parent classes of the related feature element of this controller. @@ -347,9 +334,6 @@ def get_method_variation( all_vdevice_method_variations = self.get_method_based_for_vdevice() - if isinstance(with_connection, tuple): - with_connection = Connection.based_on(AndConnectionRelation(*with_connection)) - if all_vdevice_method_variations is None: raise ValueError("the current feature has no method variations") if of_method_name not in all_vdevice_method_variations.keys(): @@ -594,26 +578,14 @@ def validate_inherited_class_based_vdevice_cnn_subset(self): FeatureController.get_for( parent_vdevice_feature).get_abs_class_based_for_vdevice()[relevant_parent_class] # check if VDevice connection elements are all contained in the parent connection - for cur_element in cur_vdevice_cls_cnn: - if isinstance(cur_element, tuple): - if not Connection.check_if_tuple_contained_in_connection( - cur_element, Connection.based_on(OrConnectionRelation(*parent_vdevice_cnn))): - raise VDeviceResolvingError( - f"the VDevice `{cur_vdevice.__name__}` is a child of the VDevice " - f"`{relevant_parent_class.__name__}`, which doesn't implements the connection of " - f"the child - the connection tuple `(" - f"{', '.join([cur_tuple_item.get_tree_str() for cur_tuple_item in cur_element])})´" - f" is not contained in the connection-tree of the parent VDevice") - else: - if not cur_element.contained_in( - Connection.based_on(OrConnectionRelation(*parent_vdevice_cnn)), ignore_metadata=True): - raise VDeviceResolvingError( - f"the VDevice `{cur_vdevice.__name__}` is a child of the VDevice " - f"`{relevant_parent_class.__name__}`, which doesn't implements the connection of " - f"the child - the connection element `{cur_element.get_tree_str()})´ is not " - f"contained in the connection-tree of the parent VDevice") - - # check all features where we have found parent VDevices as inner-classes to check next inheritance levels + if not cur_vdevice_cls_cnn.contained_in(parent_vdevice_cnn, ignore_metadata=True): + raise VDeviceResolvingError( + f"the VDevice `{cur_vdevice.__name__}` is a child of the VDevice " + f"`{relevant_parent_class.__name__}`, which doesn't implements the connection of " + f"the child - the connection element `{cur_vdevice_cls_cnn.get_tree_str()})´ is not " + f"contained in the connection-tree of the parent VDevice") + + # validate inheritance levels for all features with parent VDevices as inner-classes for cur_feature in to_checking_parent_features: FeatureController.get_for(cur_feature).validate_inherited_class_based_vdevice_cnn_subset() diff --git a/src/_balder/controllers/normal_scenario_setup_controller.py b/src/_balder/controllers/normal_scenario_setup_controller.py index 80aca310..ef1c6f2b 100644 --- a/src/_balder/controllers/normal_scenario_setup_controller.py +++ b/src/_balder/controllers/normal_scenario_setup_controller.py @@ -7,6 +7,7 @@ from _balder.setup import Setup from _balder.device import Device from _balder.scenario import Scenario +from _balder.connection_metadata import ConnectionMetadata from _balder.controllers.controller import Controller from _balder.controllers.device_controller import DeviceController from _balder.controllers.vdevice_controller import VDeviceController @@ -374,11 +375,10 @@ def determine_raw_absolute_device_connections(self): all_devices[all_devices_as_strings.index(cur_parent_cnn.from_device.__name__)] related_to_device = all_devices[all_devices_as_strings.index(cur_parent_cnn.to_device.__name__)] new_cnn = cur_parent_cnn.clone() - new_cnn.set_metadata_for_all_subitems(None) - new_cnn.set_metadata_for_all_subitems( - {"from_device": related_from_device, "to_device": related_to_device, - "from_device_node_name": cur_parent_cnn.from_node_name, - "to_device_node_name": cur_parent_cnn.to_node_name}) + new_cnn.set_metadata_for_all_subitems(ConnectionMetadata( + from_device=related_from_device, from_device_node_name=cur_parent_cnn.from_node_name, + to_device=related_to_device, to_device_node_name=cur_parent_cnn.to_node_name) + ) all_relevant_cnns.append(new_cnn) # throw warning (but only if this scenario/setup has minimum one of the parent classes has inner diff --git a/src/_balder/controllers/scenario_controller.py b/src/_balder/controllers/scenario_controller.py index a2b1d338..a1d17630 100644 --- a/src/_balder/controllers/scenario_controller.py +++ b/src/_balder/controllers/scenario_controller.py @@ -216,8 +216,8 @@ def validate_feature_clearance_for_parallel_connections(self): # now check if one or more single of the classbased connection are CONTAINED IN the possible # parallel connection (only if there exists more than one parallel) - feature_cnn = Connection.based_on(OrConnectionRelation(*FeatureController.get_for( - cur_feature.__class__).get_abs_class_based_for_vdevice()[mapped_vdevice])) + feature_cnn = FeatureController.get_for( + cur_feature.__class__).get_abs_class_based_for_vdevice()[mapped_vdevice] # search node names that is the relevant connection relevant_cnns: List[Connection] = [] @@ -339,10 +339,9 @@ def get_single_cnns_between_device_for_feature(from_device, to_device, relevant_ continue # now try to reduce the scenario connections according to the requirements of the feature class - cur_feature_class_based_for_vdevice = \ + cur_feature_cnn = \ FeatureController.get_for( cur_feature.__class__).get_abs_class_based_for_vdevice()[mapped_vdevice] - cur_feature_cnn = Connection.based_on(OrConnectionRelation(*cur_feature_class_based_for_vdevice)) device_cnn_singles = get_single_cnns_between_device_for_feature( from_device=cur_from_device, to_device=mapped_device, relevant_feature_cnn=cur_feature_cnn) diff --git a/src/_balder/controllers/setup_controller.py b/src/_balder/controllers/setup_controller.py index 8d5f6567..8a2fe884 100644 --- a/src/_balder/controllers/setup_controller.py +++ b/src/_balder/controllers/setup_controller.py @@ -2,9 +2,7 @@ from typing import Type, Dict, Union, TYPE_CHECKING import logging -from _balder.cnnrelations import OrConnectionRelation from _balder.setup import Setup -from _balder.connection import Connection from _balder.exceptions import IllegalVDeviceMappingError, MultiInheritanceError from _balder.controllers.feature_controller import FeatureController from _balder.controllers.device_controller import DeviceController @@ -114,8 +112,7 @@ def validate_feature_possibility(self): continue # there exists a class based requirement for this vDevice - class_based_cnn = Connection.based_on( - OrConnectionRelation(*feature_class_based_for_vdevice[mapped_vdevice])) + class_based_cnn = feature_class_based_for_vdevice[mapped_vdevice] # search relevant connection cur_device_controller = DeviceController.get_for(cur_device) for _, cur_cnn_list in cur_device_controller.get_all_absolute_connections().items(): diff --git a/src/_balder/decorator_connect.py b/src/_balder/decorator_connect.py index bd6c9c09..df0d790a 100644 --- a/src/_balder/decorator_connect.py +++ b/src/_balder/decorator_connect.py @@ -96,8 +96,8 @@ def __new__(cls, *args, **kwargs): # pylint: disable=unused-argument cur_cnn_instance = over_connection() elif isinstance(over_connection, (AndConnectionRelation, OrConnectionRelation)): over_connection = Connection.based_on(over_connection) - cur_cnn_instance.set_devices(from_device=decorated_cls, to_device=with_device) - cur_cnn_instance.update_node_names(from_device_node_name=self_node_name, to_device_node_name=dest_node_name) + cur_cnn_instance.metadata.set_from(from_device=decorated_cls, from_device_node_name=self_node_name) + cur_cnn_instance.metadata.set_to(to_device=with_device, to_device_node_name=dest_node_name) decorated_cls_device_controller.add_new_raw_connection(cur_cnn_instance) return decorated_cls diff --git a/src/_balder/decorator_for_vdevice.py b/src/_balder/decorator_for_vdevice.py index 1f3a321a..ab4dec86 100644 --- a/src/_balder/decorator_for_vdevice.py +++ b/src/_balder/decorator_for_vdevice.py @@ -1,7 +1,8 @@ from __future__ import annotations -from typing import List, Union, Type, Tuple +from typing import Union, Type import inspect +from _balder.cnnrelations import AndConnectionRelation, OrConnectionRelation from _balder.collector import Collector from _balder.feature import Feature from _balder.vdevice import VDevice @@ -13,9 +14,9 @@ def for_vdevice( vdevice: Union[str, Type[VDevice]], with_connections: Union[ - Type[Connection], Connection, Tuple[Union[Type[Connection], Connection]], - List[Union[Type[Connection], Connection, Tuple[Union[Type[Connection], Connection]]]]] = Connection(), - ): + Type[Connection], Connection, AndConnectionRelation, OrConnectionRelation + ] = Connection(), +): """ With the `@for_vdevice` you can limit the decorated object for a special allowed connection tree for every existing vDevice. This decorator can be used to decorate whole :class:`Feature` classes just like single methods of a @@ -37,26 +38,18 @@ def for_vdevice( :param with_connections: the assigned connection trees for this class/method (default: a universal connection) """ - idx = 0 - if not isinstance(with_connections, list): - with_connections = [with_connections] - for cur_conn in with_connections: - if isinstance(cur_conn, type): - if not issubclass(cur_conn, Connection): - raise ValueError(f"the given element type for the element on position `{idx}` has to be a subclass of " - f"`{Connection.__name__}` or a element of it") - elif isinstance(cur_conn, tuple): - tuple_idx = 0 - for cur_tuple_elem in cur_conn: - if not isinstance(cur_tuple_elem, Connection) and not issubclass(cur_tuple_elem, Connection): - raise ValueError(f"the given tuple element `{tuple_idx}` that is given on position `{idx}` " - f"has to be a subclass of `{Connection.__name__}`") - tuple_idx += 1 - elif not isinstance(cur_conn, Connection): - raise ValueError(f"the given type on position `{idx}` has to be a subclass of `{Connection.__name__}` or " - f"a element of it") - - idx += 1 + if isinstance(with_connections, Connection): + # do nothing + pass + elif isinstance(with_connections, (AndConnectionRelation, OrConnectionRelation)): + # use container connection + with_connections = Connection.based_on(with_connections) + elif isinstance(with_connections, type) and issubclass(with_connections, Connection): + # instantiate it + with_connections = with_connections() + else: + raise TypeError(f"the given element ``with_connection`` needs to be from type `AndConnectionRelation`, " + f"`OrConnectionRelation` or `Connection` - `{type(with_connections)}` is not allowed") # note: if `args` is an empty list - no special sub-connection-tree bindings @@ -105,7 +98,6 @@ def __new__(cls, *args, **kwargs): # pylint: disable=unused-argument vdevice = relevant_vdevices[0] cls_for_vdevice = fn_feature_controller.get_class_based_for_vdevice() - cls_for_vdevice = {} if cls_for_vdevice is None else cls_for_vdevice if vdevice in cls_for_vdevice.keys(): raise DuplicateForVDeviceError( f'there already exists a decorator for the vDevice `{vdevice}` in the Feature class ' diff --git a/src/_balder/executor/testcase_executor.py b/src/_balder/executor/testcase_executor.py index 293c6916..e2c430c3 100644 --- a/src/_balder/executor/testcase_executor.py +++ b/src/_balder/executor/testcase_executor.py @@ -12,7 +12,6 @@ from _balder.utils import inspect_method if TYPE_CHECKING: - from _balder.executor.unresolved_parametrized_testcase_executor import UnresolvedParametrizedTestcaseExecutor from _balder.executor.variation_executor import VariationExecutor from _balder.fixture_manager import FixtureManager from _balder.scenario import Scenario diff --git a/src/_balder/executor/variation_executor.py b/src/_balder/executor/variation_executor.py index 94ce4513..a1316fef 100644 --- a/src/_balder/executor/variation_executor.py +++ b/src/_balder/executor/variation_executor.py @@ -3,7 +3,7 @@ import inspect import logging -from _balder.cnnrelations import AndConnectionRelation, OrConnectionRelation +from _balder.cnnrelations import OrConnectionRelation from _balder.device import Device from _balder.connection import Connection from _balder.fixture_execution_level import FixtureExecutionLevel @@ -727,7 +727,7 @@ def determine_absolute_scenario_device_connections(self): # get relevant class based connections for the current feature on setup level (this is really be used # here) - feature_cnns = \ + feature_cnn = \ FeatureController.get_for( cur_setup_feature.__class__).get_abs_class_based_for_vdevice()[cur_setup_feature_vdevice] # connection that are relevant for this feature @@ -739,19 +739,18 @@ def determine_absolute_scenario_device_connections(self): # we have parallel possibilities -> determine the selected one (only one is allowed to fit) for cur_relevant_cnn in relevant_cnns: for cur_relevant_single_cnn in cur_relevant_cnn.get_singles(): - for cur_feature_cnn in feature_cnns: - for cur_feature_single_cnn in cur_feature_cnn.get_singles(): - if cur_feature_single_cnn.contained_in(cur_relevant_single_cnn): - if relevant_device_cnn is not None: - raise UnclearAssignableFeatureConnectionError( - f"the devices {cur_scenario_device.__name__} and " - f"{cur_mapped_scenario_device.__name__} have multiple parallel " - f"connections - the device `{cur_scenario_device.__name__}` uses a " - f"feature `{cur_scenario_feature.__class__.__name__}` that matches " - f"with the device `{cur_mapped_scenario_device.__name__}`, but it is " - f"not clear which of the parallel connection could be used" - ) - relevant_device_cnn = cur_relevant_cnn + for cur_feature_single_cnn in feature_cnn.get_singles(): + if cur_feature_single_cnn.contained_in(cur_relevant_single_cnn): + if relevant_device_cnn is not None: + raise UnclearAssignableFeatureConnectionError( + f"the devices {cur_scenario_device.__name__} and " + f"{cur_mapped_scenario_device.__name__} have multiple parallel " + f"connections - the device `{cur_scenario_device.__name__}` uses a " + f"feature `{cur_scenario_feature.__class__.__name__}` that matches " + f"with the device `{cur_mapped_scenario_device.__name__}`, but it is " + f"not clear which of the parallel connection could be used" + ) + relevant_device_cnn = cur_relevant_cnn elif len(relevant_cnns) == 1: relevant_device_cnn = relevant_cnns[0] if relevant_device_cnn is None: @@ -762,9 +761,8 @@ def determine_absolute_scenario_device_connections(self): # connection new_cleaned_singles = OrConnectionRelation() for cur_old_cnn_single in relevant_device_cnn.get_singles(): - for cur_feature_cnn in feature_cnns: - if cur_feature_cnn.contained_in(cur_old_cnn_single, ignore_metadata=True): - new_cleaned_singles.append(cur_old_cnn_single) + if feature_cnn.contained_in(cur_old_cnn_single, ignore_metadata=True): + new_cleaned_singles.append(cur_old_cnn_single) new_cnn_to_replace = Connection.based_on(new_cleaned_singles) new_cnn_to_replace.set_metadata_for_all_subitems(new_cleaned_singles[0].metadata) @@ -878,10 +876,7 @@ def set_conn_dependent_methods(self): if cur_cnn.has_connection_from_to(start_device=setup_device, end_device=mapped_setup_device): if cur_cnn.__class__ == Connection: # add the children - for cur_inner_cnn in cur_cnn.based_on_elements: - if isinstance(cur_inner_cnn, tuple): - cur_inner_cnn = AndConnectionRelation(*cur_inner_cnn) - relevant_abs_conn.append(cur_inner_cnn) + relevant_abs_conn.extend(cur_cnn.based_on_elements.connections) else: relevant_abs_conn.append(cur_cnn) if len(relevant_abs_conn) is None: diff --git a/src/_balder/routing_path.py b/src/_balder/routing_path.py index c76ee0c3..a452deba 100644 --- a/src/_balder/routing_path.py +++ b/src/_balder/routing_path.py @@ -167,7 +167,7 @@ def route_through( # ---------------------------------- PROPERTIES -------------------------------------------------------------------- @property - def elements(self) -> List[Connection, NodeGateway]: + def elements(self) -> List[Union[Connection, NodeGateway]]: """returns all elements that belongs to this routing path""" return self._routing_elems @@ -324,8 +324,7 @@ def get_virtual_connection(self) -> Connection: # todo pass # set metadata based on this routing - virtual_connection.set_devices(from_device=self.start_device, to_device=self.end_device) - virtual_connection.update_node_names(from_device_node_name=self.start_node_name, - to_device_node_name=self.end_node_name) + virtual_connection.metadata.set_from(from_device=self.start_device, from_device_node_name=self.start_node_name) + virtual_connection.metadata.set_to(to_device=self.end_device, to_device_node_name=self.end_node_name) return virtual_connection diff --git a/src/_balder/solver.py b/src/_balder/solver.py index 184e948c..0130aa9f 100644 --- a/src/_balder/solver.py +++ b/src/_balder/solver.py @@ -117,7 +117,7 @@ def _get_all_unfiltered_mappings(self) -> List[Tuple[Type[Setup], Type[Scenario] # ---------------------------------- METHODS ----------------------------------------------------------------------- - def get_initial_mapping(self) -> List[Tuple[Type[Setup], Type[Scenario], Dict[Device, Device]]]: + def get_initial_mapping(self) -> List[Tuple[Type[Setup], Type[Scenario], Dict[Type[Device], Type[Device]]]]: """ This method creates the initial amount of data for `self._mapping`. Only those elements are returned where the :meth:`Setup` class has more or the same amount of :meth:`Device`'s than the :meth:`Scenario` class.