forked from OpenCyphal/pydsdl
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This is a draft PR to review a proposed change to the public APIs for pydsdl. This is non-breaking change that adds an optional named tuple to the `read_namespace` method. This named tuple allows specifying a set of filtering rules to apply when discovering DSDL types from files under a given namespace filetree. The intent is to connect the proposed TypeFilterEngine to the `_construct_dsdl_definitions_from_namespace` method after it creates a `DSDLDefinition` to apply the initial filtering of the target type. From there I need to figure out how to mark that selected type and its transitive dependencies as either weakly-excluded or strongly-included and detect any conflict where user provided rules conflict as processing proceeds. Any advice as to where I could best work with a tree-representation ahead of any expensive processing would be appreciated.
- Loading branch information
1 parent
6a6828b
commit 4446e0f
Showing
3 changed files
with
248 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,233 @@ | ||
# Copyright (C) OpenCyphal Development Team <opencyphal.org> | ||
# Copyright Amazon.com Inc. or its affiliates. | ||
# SPDX-License-Identifier: MIT | ||
|
||
import enum | ||
import re | ||
from typing import Any, List, Optional, Tuple, Union, NamedTuple | ||
|
||
from ._dsdl_definition import DSDLDefinition | ||
|
||
|
||
class DefaultFilterRule(enum.Enum): | ||
""" | ||
Rule to apply for types that do not match any filter rule. | ||
""" | ||
|
||
EXCLUDE = "exclude" | ||
INCLUDE = "include" | ||
|
||
|
||
class FilterOrder(enum.Enum): | ||
""" | ||
The order of the filter rules. For example, INCLUDE_FIRST means that the include rules are applied first and | ||
filter processing stops for that type as soon as an include rule is matched regardless of any other rules. | ||
EXCLUDE_FIRST means that the exclude rules are applied first and filter processing stops for that type as | ||
soon as an exclude rule is matched regardless of any other rules. | ||
""" | ||
|
||
INCLUDE_FIRST = "include-first" | ||
EXCLUDE_FIRST = "exclude-first" | ||
|
||
|
||
class FilterType(enum.Enum): | ||
""" | ||
What part of a DSDL-file-defined type to apply the filter to. | ||
""" | ||
|
||
FULL_NAME_AND_VERSION = "full-name-and-version" | ||
SHORT_NAME_AND_VERSION = "short-name-and-version" | ||
FULL_NAME = "full-name" | ||
SHORT_NAME = "short-name" | ||
FILE_PATH = "file-path" | ||
ROOT_NAMESPACE = "root-namespace" | ||
FULL_NAMESPACE = "full-namespace" | ||
VERSION = "version" | ||
|
||
|
||
TypeFilter = Union[str, re.Pattern[str], Tuple[FilterType, Union[str, re.Pattern[str]]]] | ||
""" | ||
The type of a filter rule. This can be an un-compiled regular expression (string), pre-compiled regular expression | ||
pattern, or a tuple of a FilterType and an un-compiled regular expression or pre-compiled regular expression pattern. | ||
Filters without types are given a default type when normalized. | ||
""" | ||
|
||
NormalizedTypeFilter = Tuple[FilterType, re.Pattern[str]] | ||
""" | ||
A TypeFilter that is normalized to a tuple of a FilterType and a pre-compiled regular expression pattern. | ||
""" | ||
|
||
TypeFilters = NamedTuple( | ||
"TypeFilters", | ||
[ | ||
("default_rule", DefaultFilterRule), | ||
("whitelist", Optional[List[TypeFilter]]), | ||
("blacklist", Optional[List[TypeFilter]]), | ||
("filter_order", Optional[FilterOrder]), | ||
], | ||
) | ||
""" | ||
A set of filter rules to use when processing DSDL namespaces. | ||
""" | ||
|
||
|
||
class TypeFilterEngine: | ||
""" | ||
Engine that processes filter rules given DsdlDefinitions. | ||
""" | ||
|
||
DEFAULT_FILTER_TYPE = FilterType.FULL_NAME_AND_VERSION | ||
DEFAULT_FILTER_ORDER = FilterOrder.INCLUDE_FIRST | ||
DEFAULT_FILTER_RULE = DefaultFilterRule.INCLUDE | ||
|
||
@classmethod | ||
def _normalize_filter( | ||
cls, filter_rule: TypeFilter, default_type: Optional[FilterType] = None | ||
) -> NormalizedTypeFilter: | ||
if not isinstance(filter_rule, tuple) and default_type is None: | ||
raise ValueError("Cannot normalize a pattern without a default filter type") | ||
|
||
if isinstance(filter_rule, tuple): | ||
if len(filter_rule) != 2 or not isinstance(filter_rule[0], FilterType) or isinstance(filter_rule[1], tuple): | ||
raise ValueError("Invalid filter type") | ||
filter_type, pattern = cls._normalize_filter(filter_rule[1], filter_rule[0]) | ||
elif isinstance(filter_rule, str): | ||
filter_type = default_type # type: ignore | ||
pattern = re.compile(filter_rule) | ||
elif isinstance(filter_rule, re.Pattern): | ||
filter_type = default_type # type: ignore | ||
pattern = filter_rule | ||
else: | ||
raise ValueError("Invalid filter format") | ||
return filter_type, pattern | ||
|
||
@classmethod | ||
def create( | ||
cls, | ||
filters: Optional[TypeFilters] = None, | ||
**kwargs: Any, | ||
) -> "TypeFilterEngine": | ||
""" | ||
Create a new filter engine with the given rules. If no arguments are provided the resulting engine | ||
will simply apply the default rule on select. | ||
:param filters: The filters specification. | ||
:param kwargs: Additional arguments to pass to the filter engine. | ||
:return: A new filter engine. | ||
""" | ||
if filters is not None: | ||
kwargs["whitelist"] = ( | ||
[cls._normalize_filter(rule, cls.DEFAULT_FILTER_TYPE) for rule in filters.whitelist] | ||
if filters.whitelist is not None | ||
else [] | ||
) | ||
kwargs["blacklist"] = ( | ||
[cls._normalize_filter(rule, cls.DEFAULT_FILTER_TYPE) for rule in filters.blacklist] | ||
if filters.blacklist is not None | ||
else [] | ||
) | ||
kwargs["filter_order"] = ( | ||
filters.filter_order if filters.filter_order is not None else cls.DEFAULT_FILTER_ORDER | ||
) | ||
kwargs["default_rule"] = filters.default_rule | ||
else: | ||
kwargs["whitelist"] = [] | ||
kwargs["blacklist"] = [] | ||
|
||
return cls(**kwargs) | ||
|
||
# pylint: disable=too-many-arguments | ||
def __init__( | ||
self, | ||
whitelist: List[NormalizedTypeFilter], | ||
blacklist: List[NormalizedTypeFilter], | ||
filter_order: FilterOrder = DEFAULT_FILTER_ORDER, | ||
default_rule: DefaultFilterRule = DEFAULT_FILTER_RULE, | ||
): | ||
self._filter_order = filter_order | ||
self._default_rule = default_rule | ||
self._whitelist = whitelist | ||
self._blacklist = blacklist | ||
|
||
@property | ||
def filter_order(self) -> FilterOrder: | ||
""" | ||
The order of the filter rules. | ||
""" | ||
return self._filter_order | ||
|
||
@property | ||
def default_rule(self) -> DefaultFilterRule: | ||
""" | ||
The default rule to apply for types that do not match any filter rule. | ||
""" | ||
return self._default_rule | ||
|
||
def select(self, dsdl_type: DSDLDefinition) -> bool: | ||
""" | ||
Filter the type based on the filter rules. | ||
:param type_name: The name of the type to filter. | ||
:return: True if the type should be included, False if it should be excluded. | ||
""" | ||
if self._filter_order == FilterOrder.INCLUDE_FIRST: | ||
if self._include(dsdl_type): | ||
return True | ||
if self._exclude(dsdl_type): | ||
return False | ||
else: | ||
if self._exclude(dsdl_type): | ||
return False | ||
if self._include(dsdl_type): | ||
return True | ||
return self._default_rule == DefaultFilterRule.INCLUDE | ||
|
||
def _include(self, dsdl_type: DSDLDefinition) -> bool: | ||
for rule in self._whitelist: | ||
if self._match(rule, dsdl_type): | ||
return True | ||
return False | ||
|
||
def _exclude(self, dsdl_type: DSDLDefinition) -> bool: | ||
for rule in self._blacklist: | ||
if self._match(rule, dsdl_type): | ||
return True | ||
return False | ||
|
||
# pylint: disable=too-many-return-statements | ||
def _match(self, rule: NormalizedTypeFilter, dsdl_type: DSDLDefinition) -> bool: | ||
if rule[0] == FilterType.FULL_NAME_AND_VERSION: | ||
return self._match_1(rule[1], f"{dsdl_type.full_name}.{dsdl_type.version.major}.{dsdl_type.version.minor}") | ||
if rule[0] == FilterType.SHORT_NAME_AND_VERSION: | ||
return self._match_1(rule[1], f"{dsdl_type.short_name}.{dsdl_type.version.major}.{dsdl_type.version.minor}") | ||
if rule[0] == FilterType.FULL_NAME: | ||
return self._match_1(rule[1], dsdl_type.full_name) | ||
if rule[0] == FilterType.SHORT_NAME: | ||
return self._match_1(rule[1], dsdl_type.short_name) | ||
if rule[0] == FilterType.FILE_PATH: | ||
return self._match_1(rule[1], str(dsdl_type.file_path)) | ||
if rule[0] == FilterType.ROOT_NAMESPACE: | ||
return self._match_1(rule[1], dsdl_type.root_namespace) | ||
if rule[0] == FilterType.FULL_NAMESPACE: | ||
return self._match_1(rule[1], dsdl_type.full_namespace) | ||
if rule[0] == FilterType.VERSION: | ||
return self._match_1(rule[1], f"{dsdl_type.version.major}.{dsdl_type.version.minor}") | ||
return False | ||
|
||
def _match_1(self, rule: re.Pattern[str], text: str) -> bool: | ||
return rule.search(text) is not None | ||
|
||
|
||
def _unittest_filter_engine_create() -> None: | ||
# pylint: disable=import-outside-toplevel | ||
from unittest.mock import MagicMock | ||
|
||
type_a = MagicMock(spec=DSDLDefinition) | ||
|
||
default_engine = TypeFilterEngine.create() | ||
assert default_engine.default_rule == TypeFilterEngine.DEFAULT_FILTER_RULE | ||
assert default_engine.filter_order == TypeFilterEngine.DEFAULT_FILTER_ORDER | ||
|
||
assert default_engine.select(type_a) == (TypeFilterEngine.DEFAULT_FILTER_RULE == DefaultFilterRule.INCLUDE) | ||
|
||
assert TypeFilterEngine.create(TypeFilters(DefaultFilterRule.EXCLUDE, None, None, None)).select(type_a) is False | ||
assert TypeFilterEngine.create(TypeFilters(DefaultFilterRule.INCLUDE, None, None, None)).select(type_a) is True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters