diff --git a/space_packet_parser/xarr.py b/space_packet_parser/xarr.py index f9471de3..b5abd241 100644 --- a/space_packet_parser/xarr.py +++ b/space_packet_parser/xarr.py @@ -17,7 +17,7 @@ def _min_dtype_for_encoding(data_encoding: encodings.DataEncoding): - """Find the minimum data type capaable of representing an XTCE data encoding. + """Find the minimum data type capable of representing an XTCE data encoding. This only works for raw values and does not apply to calibrated or otherwise derived values. @@ -91,6 +91,10 @@ def _get_minimum_numpy_datatype( # If we are using raw values, we can determine the minimal dtype from the parameter data encoding return _min_dtype_for_encoding(data_encoding) + if isinstance(parameter_type, parameter_types.EnumeratedParameterType): + # Enums are always strings in their derived state + return "str" + if isinstance(data_encoding, encodings.NumericDataEncoding): if not (data_encoding.context_calibrators is not None or data_encoding.default_calibrator is not None): # If there are no calibrators attached to the encoding, then we can proceed as if we're using @@ -103,10 +107,6 @@ def _get_minimum_numpy_datatype( if isinstance(data_encoding, encodings.BinaryDataEncoding): return "bytes" - if isinstance(parameter_type, parameter_types.EnumeratedParameterType): - # Enums are always strings in their derived state - return "str" - if isinstance(data_encoding, encodings.StringDataEncoding): return "str" diff --git a/space_packet_parser/xtce/comparisons.py b/space_packet_parser/xtce/comparisons.py index 965f1f1c..9b89d122 100644 --- a/space_packet_parser/xtce/comparisons.py +++ b/space_packet_parser/xtce/comparisons.py @@ -1,5 +1,4 @@ """Matching logical objects""" -import warnings from abc import ABCMeta, abstractmethod from collections import namedtuple from typing import Any, Optional, Union @@ -55,8 +54,13 @@ def evaluate(self, class Comparison(MatchCriteria): """""" - def __init__(self, required_value: str, referenced_parameter: str, - operator: str = "==", use_calibrated_value: bool = True): + def __init__( + self, + required_value: str, + referenced_parameter: str, + operator: str = "==", + use_calibrated_value: bool = True + ): """Constructor Parameters @@ -176,18 +180,11 @@ def evaluate(self, if self.referenced_parameter in packet: if self.use_calibrated_value: parsed_value = packet[self.referenced_parameter] - if not parsed_value: - raise ComparisonError(f"Comparison {self} was instructed to useCalibratedValue (the default)" - f"but {self.referenced_parameter} does not appear to have a derived value.") else: parsed_value = packet[self.referenced_parameter].raw_value elif current_parsed_value is not None: # Assume then that the comparison is a reference to its own uncalibrated value parsed_value = current_parsed_value - if self.use_calibrated_value: - warnings.warn("Performing a comparison against a current value (e.g. a Comparison within a " - "context calibrator contains a reference to its own uncalibrated value but use_" - "calibrated_value is set to true. This is nonsensical. Using the uncalibrated value...") else: raise ValueError("Attempting to resolve a Comparison expression but the referenced parameter does not " "appear in the parsed data so far and no current raw value was passed " @@ -200,9 +197,6 @@ def evaluate(self, except ValueError as err: raise ComparisonError(f"Unable to coerce {self.required_value} of type {type(self.required_value)} to " f"type {t_comparate} for comparison evaluation.") from err - if required_value is None or parsed_value is None: - raise ValueError(f"Error in Comparison. Cannot compare {required_value} with {parsed_value}. " - "Neither should be None.") # x.__le__(y) style call return getattr(parsed_value, operator)(required_value) @@ -379,7 +373,7 @@ def evaluate(self, packet : packets.CCSDSPacket Packet data used to evaluate truthyness of the match criteria. current_parsed_value : Optional[Union[int, float]] - Current value being parsed. NOTE: This is currently ignored. See the TODO item below. + Ignored. Returns ------- @@ -389,21 +383,8 @@ def evaluate(self, def _get_parsed_value(parameter_name: str, use_calibrated: bool): """Retrieves the previously parsed value from the passed in packet""" - try: - return packet[parameter_name] if use_calibrated \ - else packet[parameter_name].raw_value - except KeyError as e: - raise ComparisonError(f"Attempting to perform a Condition evaluation on {self.left_param} but " - "the referenced parameter does not appear in the hitherto parsed data passed to " - "the evaluate method. If you intended a comparison against the raw value of the " - "parameter currently being parsed, unfortunately that is not currently supported." - ) from e - - # TODO: Consider allowing one of the parameters to be the parameter currently being evaluated. - # This isn't explicitly provided for in the XTCE spec but it seems reasonable to be able to - # perform conditionals against the current raw value of a parameter, e.g. while determining if it - # should be calibrated. Note that only one of the parameters can be used this way and it must reference - # an uncalibrated value so the logic and error handling must be done carefully. + return packet[parameter_name] if use_calibrated else packet[parameter_name].raw_value + left_value = _get_parsed_value(self.left_param, self.left_use_calibrated_value) # Convert XML operator representation to a python-compatible operator (e.g. '>' to '__gt__') operator = self._valid_operators[self.operator] @@ -415,8 +396,6 @@ def _get_parsed_value(parameter_name: str, use_calibrated: bool): right_value = t_left_param(self.right_value) else: raise ValueError(f"Error when evaluating condition {self}. Neither right_param nor right_value is set.") - if left_value is None or right_value is None: - raise ComparisonError(f"Error comparing {left_value} and {right_value}. Neither should be None.") # x.__le__(y) style call return getattr(left_value, operator)(right_value) @@ -526,7 +505,7 @@ def evaluate(self, packet : packets.CCSDSPacket Packet data used to evaluate truthyness of the match criteria. current_parsed_value : Optional[Union[int, float]] - Current value being parsed. + Ignored. Returns ------- diff --git a/space_packet_parser/xtce/definitions.py b/space_packet_parser/xtce/definitions.py index 79cf3e3a..42e71aed 100644 --- a/space_packet_parser/xtce/definitions.py +++ b/space_packet_parser/xtce/definitions.py @@ -423,7 +423,7 @@ def parse_ccsds_packet(self, def packet_generator( self, - binary_data: Union[BinaryIO, socket.socket], + binary_data: Union[BinaryIO, socket.socket, bytes], *, parse_bad_pkts: bool = True, root_container_name: Optional[str] = None, diff --git a/space_packet_parser/xtce/encodings.py b/space_packet_parser/xtce/encodings.py index b046aad4..31042f02 100644 --- a/space_packet_parser/xtce/encodings.py +++ b/space_packet_parser/xtce/encodings.py @@ -638,8 +638,51 @@ def to_xml(self, *, elmaker: ElementMaker) -> ElementTree.Element: class IntegerDataEncoding(NumericDataEncoding): """""" + _encodings = ("unsigned", "signed", "twosCompliment", "twosComplement") + _byte_orders = ("leastSignificantByteFirst", "mostSignificantByteFirst") _data_return_class = common.IntParameter + def __init__(self, + size_in_bits: int, + encoding: str, + *, + byte_order: str = "mostSignificantByteFirst", + default_calibrator: Optional[calibrators.Calibrator] = None, + context_calibrators: Optional[list[calibrators.ContextCalibrator]] = None): + """Constructor + + Parameters + ---------- + size_in_bits : int + Size of the integer + encoding : str + String indicating the type of encoding for the integer. FSW seems to use primarily 'signed' and 'unsigned', + though 'signed' is not actually a valid specifier according to XTCE. 'twosCompliment' [sic] should be used + instead, though we support the unofficial 'signed' specifier here. + For supported specifiers, see XTCE spec 4.3.2.2.5.6.2 + byte_order : str + Description of the byte order. Default is 'mostSignficantByteFirst' (big-endian). + default_calibrator : Optional[Calibrator] + Optional Calibrator object, containing information on how to transform the integer-encoded data, e.g. via + a polynomial conversion or spline interpolation. + context_calibrators : Optional[List[ContextCalibrator]] + List of ContextCalibrator objects, containing match criteria and corresponding calibrators to use in + various scenarios, based on other parameters. + """ + if encoding not in self._encodings: + raise ValueError(f"Encoding must be one of {self._encodings}") + + if byte_order not in self._byte_orders: + raise ValueError(f"Byte order must be one of {self._byte_orders}") + + super().__init__( + size_in_bits, + encoding, + byte_order=byte_order, + default_calibrator=default_calibrator, + context_calibrators=context_calibrators + ) + def _get_raw_value(self, packet: packets.CCSDSPacket) -> int: # Extract the bits from the data in big-endian order from the packet val = packet.raw_data.read_as_int(self.size_in_bits) @@ -873,6 +916,11 @@ def __init__(self, Function that linearly adjusts a size. e.g. if the size reference parameter gives a length in bytes, the linear adjuster should multiply by 8 to give the size in bits. """ + if not any([fixed_size_in_bits, size_reference_parameter, size_discrete_lookup_list]): + raise ValueError("Binary data encoding initialized with no way to determine a size. " + "You must provide one of " + "fixed_size_in_bits, size_reference_parameter, size_discrete_lookup_list.") + self.fixed_size_in_bits = fixed_size_in_bits self.size_reference_parameter = size_reference_parameter self.use_calibrated_value = use_calibrated_value @@ -895,7 +943,7 @@ def _calculate_size(self, packet: packets.CCSDSPacket) -> int: len_bits = packet[field_length_reference] else: len_bits = packet[field_length_reference].raw_value - elif self.size_discrete_lookup_list is not None: + else: # self.size_discrete_lookup_list is not None: for discrete_lookup in self.size_discrete_lookup_list: len_bits = discrete_lookup.evaluate(packet) if len_bits is not None: @@ -903,9 +951,6 @@ def _calculate_size(self, packet: packets.CCSDSPacket) -> int: else: raise ValueError('List of discrete lookup values being used for determining length of ' f'string {self} found no matches based on {packet}.') - else: - raise ValueError("Unable to parse BinaryDataEncoding. " - "No fixed size, dynamic size, or dynamic lookup size were provided.") if self.linear_adjuster is not None: # NOTE: This is assumed to be an integer value, represented as a float. If the linear adjuster diff --git a/tests/unit/test_packets.py b/tests/unit/test_packets.py index c64d8cf6..a559d87f 100644 --- a/tests/unit/test_packets.py +++ b/tests/unit/test_packets.py @@ -1,4 +1,6 @@ """Tests for packets""" +import socket + import pytest from space_packet_parser import packets @@ -112,7 +114,7 @@ def test_ccsds_packet_data_lookups(): assert packet.user_data == {x: x for x in range(7, 10)} with pytest.raises(KeyError): - packet[10] + _ = packet[10] def test_continuation_packets(test_data_dir): @@ -204,3 +206,36 @@ def test__extract_bits(start, nbits): data = int(s, 2).to_bytes(2, byteorder="big") assert packets._extract_bits(data, start, nbits) == int(s[start:start + nbits], 2) + + +def test_ccsds_generator(jpss_test_data_dir): + """Test ccsds_generator""" + test_data_file = jpss_test_data_dir / "J01_G011_LZ_2021-04-09T00-00-00Z_V01.DAT1" + test_packet = packets.create_ccsds_packet() # defaults + + # From file + with test_data_file.open('rb') as f: + assert next(packets.ccsds_generator(f)) + + # From socket + send, recv = socket.socketpair() + send.send(test_packet) + assert next(packets.ccsds_generator(recv)) + send.close() + recv.close() + + # From bytes + # This covers show_progress conditional code and also the end of the iterator + gen_from_bytes = packets.ccsds_generator(test_packet, show_progress=True) + assert next(gen_from_bytes) + with pytest.raises(StopIteration): + next(gen_from_bytes) + + # From Text file (error) + with test_data_file.open('rt') as f: + with pytest.raises(OSError, match="Packet data file opened in TextIO mode"): + next(packets.ccsds_generator(f)) + + # Unrecognized source (error) + with pytest.raises(OSError, match="Unrecognized data source"): + next(packets.ccsds_generator(1)) diff --git a/tests/unit/test_xarr.py b/tests/unit/test_xarr.py new file mode 100644 index 00000000..f05346e7 --- /dev/null +++ b/tests/unit/test_xarr.py @@ -0,0 +1,99 @@ +"""Tests for the xarr.py extras module""" +import pytest + +from space_packet_parser import xarr +from space_packet_parser.xtce import calibrators, containers, definitions, encodings, parameter_types, parameters + +np = pytest.importorskip("numpy", reason="numpy is not available") + + +@pytest.fixture +def test_xtce(): + """Test definition for testing surmising data types""" + container_set = [ + containers.SequenceContainer( + "CONTAINER", + entry_list=[ + parameters.Parameter( + "INT32_PARAM", + parameter_type=parameter_types.IntegerParameterType( + "I32_TYPE", + encoding=encodings.IntegerDataEncoding(size_in_bits=32, encoding="twosComplement") + ) + ), + parameters.Parameter( + "F32_PARAM", + parameter_type=parameter_types.FloatParameterType( + "F32_TYPE", + encoding=encodings.FloatDataEncoding(size_in_bits=32, encoding="IEEE754") + ) + ), + parameters.Parameter( + "CAL_INT_PARAM", + parameter_type=parameter_types.IntegerParameterType( + "I32_TYPE", + encoding=encodings.IntegerDataEncoding( + size_in_bits=32, + encoding="twosComplement", + default_calibrator=calibrators.PolynomialCalibrator( + coefficients=[ + calibrators.PolynomialCoefficient(1, 1) + ] + ) + ) + ) + ), + parameters.Parameter( + "BIN_PARAM", + parameter_type=parameter_types.BinaryParameterType( + "BIN_TYPE", + encoding=encodings.BinaryDataEncoding( + fixed_size_in_bits=32 + ) + ) + ), + parameters.Parameter( + "INT_ENUM_PARAM", + parameter_type=parameter_types.EnumeratedParameterType( + "INT_ENUM_TYPE", + encoding=encodings.IntegerDataEncoding(size_in_bits=8, encoding="unsigned"), + enumeration={ + "ONE": 1, + "TWO": 2 + } + ) + ), + parameters.Parameter( + "STR_PARAM", + parameter_type=parameter_types.StringParameterType( + "STR_TYPE", + encoding=encodings.StringDataEncoding( + fixed_raw_length=32 + ) + ) + ), + ] + ) + ] + return definitions.XtcePacketDefinition(container_set=container_set) + +@pytest.mark.parametrize( + ("pname", "use_raw_value", "expected_dtype"), + [ + ("INT32_PARAM", True, "int32"), + ("INT32_PARAM", False, "int32"), + ("F32_PARAM", False, "float32"), + ("F32_PARAM", True, "float32"), + ("CAL_INT_PARAM", True, "int32"), + ("CAL_INT_PARAM", False, None), + ("BIN_PARAM", True, "bytes"), + ("BIN_PARAM", False, "bytes"), + ("INT_ENUM_PARAM", True, "uint8"), + ("INT_ENUM_PARAM", False, "str"), + ("STR_PARAM", True, "str"), + ("STR_PARAM", False, "str"), + ] +) +def test_minimum_numpy_dtype(test_xtce, pname, use_raw_value, expected_dtype): + """Test finding the minimum numpy data type for a parameter""" + assert xarr._get_minimum_numpy_datatype(pname, test_xtce, use_raw_value) == expected_dtype diff --git a/tests/unit/test_xtce/test_comparisons.py b/tests/unit/test_xtce/test_comparisons.py index 5763460d..e48a4b33 100644 --- a/tests/unit/test_xtce/test_comparisons.py +++ b/tests/unit/test_xtce/test_comparisons.py @@ -4,6 +4,7 @@ from space_packet_parser import common from space_packet_parser.exceptions import ComparisonError +from space_packet_parser.packets import CCSDSPacket from space_packet_parser.xtce import XTCE_1_2_XMLNS, comparisons @@ -14,82 +15,82 @@ """, - {'MSN__PARAM': common.FloatParameter(678, 3)}, None, True), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(678, 3)}), None, True), (f""" """, - {'MSN__PARAM': common.FloatParameter(668, 3)}, None, False), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(668, 3)}), None, False), (f""" """, - {'MSN__PARAM': common.FloatParameter(678, 3)}, None, False), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(678, 3)}), None, False), (f""" """, - {'MSN__PARAM': common.FloatParameter(658, 3)}, None, True), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(658, 3)}), None, True), (f""" """, - {'MSN__PARAM': common.FloatParameter(679, 3)}, None, False), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(679, 3)}), None, False), (f""" """, - {'MSN__PARAM': common.FloatParameter(670, 3)}, None, True), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(670, 3)}), None, True), (f""" """, - {'MSN__PARAM': common.FloatParameter(678, 3)}, None, False), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(678, 3)}), None, False), (f""" """, - {'MSN__PARAM': common.FloatParameter(679, 3)}, None, True), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(679, 3)}), None, True), (f""" """, - {'MSN__PARAM': common.FloatParameter(660, 3)}, None, True), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(660, 3)}), None, True), (f""" """, - {'MSN__PARAM': common.FloatParameter(690, 3)}, None, False), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(690, 3)}), None, False), (f""" """, - {'MSN__PARAM': common.FloatParameter(660, 3)}, None, False), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(660, 3)}), None, False), (f""" """, - {'MSN__PARAM': common.FloatParameter(690, 3)}, None, True), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(690, 3)}), None, True), (f""" """, - {'MSN__PARAM': common.FloatParameter(690, 678)}, None, True), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(690, 678)}), None, True), (f""" """, - {'MSN__PARAM': common.FloatParameter(678, 3)}, None, True), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(678, 3)}), None, True), (f""" """, - {'MSN__PARAM': common.StrParameter('calibratedfoostring', 'foostring')}, None, True), + CCSDSPacket(**{'MSN__PARAM': common.StrParameter('calibratedfoostring', 'foostring')}), None, True), (f""" """, - {'MSN__PARAM': common.FloatParameter(3.14, 1)}, None, True), + CCSDSPacket(**{'MSN__PARAM': common.FloatParameter(3.14, 1)}), None, True), (f""" @@ -129,6 +130,16 @@ def test_comparison(elmaker, xtce_parser, assert full_circle.evaluate(test_parsed_data, current_parsed_value) == expected_comparison_result +@pytest.mark.parametrize( + ("args", "kwargs", "expected_error", "expected_error_msg"), + [(("3", "REFERENCE_TO_OWN_RAW_VAL", "~="), {}, ValueError, "Unrecognized operator syntax ~=")] +) +def test_comparison_validity_check(args, kwargs, expected_error, expected_error_msg): + """Test validation checks when creating a Comparison""" + with pytest.raises(expected_error, match=expected_error_msg): + comparisons.Comparison(*args, **kwargs) + + @pytest.mark.parametrize( ('xml_string', 'test_parsed_data', 'expected_condition_result'), [ @@ -139,8 +150,8 @@ def test_comparison(elmaker, xtce_parser, """, - {'P1': common.IntParameter(700, 4), - 'P2': common.IntParameter(678, 3)}, True), + CCSDSPacket(**{'P1': common.IntParameter(700, 4), + 'P2': common.IntParameter(678, 3)}), True), (f""" @@ -148,7 +159,7 @@ def test_comparison(elmaker, xtce_parser, 4 """, - {'P1': common.IntParameter(700, 4)}, True), + CCSDSPacket(**{'P1': common.IntParameter(700, 4)}), True), (f""" @@ -156,8 +167,8 @@ def test_comparison(elmaker, xtce_parser, """, - {'P1': common.IntParameter(700, 4), - 'P2': common.IntParameter(678, 3)}, False), + CCSDSPacket(**{'P1': common.IntParameter(700, 4), + 'P2': common.IntParameter(678, 3)}), False), (f""" @@ -165,8 +176,8 @@ def test_comparison(elmaker, xtce_parser, """, - {'P1': common.StrParameter('abcd'), - 'P2': common.StrParameter('abcd')}, True), + CCSDSPacket(**{'P1': common.StrParameter('abcd'), + 'P2': common.StrParameter('abcd')}), True), (f""" @@ -174,8 +185,8 @@ def test_comparison(elmaker, xtce_parser, """, - {'P1': common.FloatParameter(3.14, 1), - 'P2': common.FloatParameter(3.14, 180)}, True), + CCSDSPacket(**{'P1': common.FloatParameter(3.14, 1), + 'P2': common.FloatParameter(3.14, 180)}), True), ] ) def test_condition(elmaker, xtce_parser, xml_string, test_parsed_data, expected_condition_result): @@ -189,6 +200,23 @@ def test_condition(elmaker, xtce_parser, xml_string, test_parsed_data, expected_ assert full_circle.evaluate(test_parsed_data) == expected_condition_result +@pytest.mark.parametrize( + ("args", "kwargs", "expected_error", "expected_error_msg"), + [ + (("X", "~="), {"right_value": "4"}, + ValueError, "Unrecognized operator syntax ~="), + (("X", "=="), {"right_param": "R", "right_value": "1"}, + comparisons.ComparisonError, "Received both a right_value and a right_param reference to Condition"), + (("X", "=="), {"right_value": "4", "right_use_calibrated_value": True}, + comparisons.ComparisonError, "Unable to use calibrated form of a fixed value in Condition") + ] +) +def test_condition_validity_check(args, kwargs, expected_error, expected_error_msg): + """Test validation checks when creating a Condition""" + with pytest.raises(expected_error, match=expected_error_msg): + comparisons.Condition(*args, **kwargs) + + @pytest.mark.parametrize( ('xml_string', 'test_parsed_data', 'expected_result'), [ @@ -215,10 +243,10 @@ def test_condition(elmaker, xtce_parser, xml_string, test_parsed_data, expected_ """, - {'P': common.IntParameter(0, 4), - 'P2': common.IntParameter(700, 4), - 'P3': common.IntParameter(701, 4), - 'P4': common.IntParameter(98, 4)}, True), + CCSDSPacket(**{'P': common.IntParameter(0, 4), + 'P2': common.IntParameter(700, 4), + 'P3': common.IntParameter(701, 4), + 'P4': common.IntParameter(98, 4)}), True), (f""" @@ -247,12 +275,12 @@ def test_condition(elmaker, xtce_parser, xml_string, test_parsed_data, expected_ """, - {'P': common.IntParameter(100, 4), - 'P0': common.IntParameter(678, 4), - 'P1': common.IntParameter(500, 4), - 'P2': common.IntParameter(700, 4), - 'P3': common.IntParameter(701, 4), - 'P4': common.IntParameter(99, 4)}, True), + CCSDSPacket(**{'P': common.IntParameter(100, 4), + 'P0': common.IntParameter(678, 4), + 'P1': common.IntParameter(500, 4), + 'P2': common.IntParameter(700, 4), + 'P3': common.IntParameter(701, 4), + 'P4': common.IntParameter(99, 4)}), True), ] ) def test_boolean_expression(elmaker, xtce_parser, xml_string, test_parsed_data, expected_result): @@ -278,13 +306,13 @@ def test_boolean_expression(elmaker, xtce_parser, xml_string, test_parsed_data, """, - {'P1': common.IntParameter(678, 1)}, 10), + CCSDSPacket(**{'P1': common.IntParameter(678, 1)}), 10), (f""" """, - {'P1': common.IntParameter(678, 0)}, None), + CCSDSPacket(**{'P1': common.IntParameter(678, 0)}), None), (f""" @@ -293,10 +321,10 @@ def test_boolean_expression(elmaker, xtce_parser, xml_string, test_parsed_data, """, - { + CCSDSPacket(**{ 'MSN__PARAM1': common.IntParameter(680, 3), 'MSN__PARAM2': common.IntParameter(3000, 3), - }, 11), + }), 11), ] ) def test_discrete_lookup(elmaker, xtce_parser, xml_string, test_parsed_data, expected_lookup_result): diff --git a/tests/unit/test_xtce/test_encodings.py b/tests/unit/test_xtce/test_encodings.py index 852ab66d..e946e1d3 100644 --- a/tests/unit/test_xtce/test_encodings.py +++ b/tests/unit/test_xtce/test_encodings.py @@ -130,6 +130,31 @@ def test_string_data_encoding(elmaker, xtce_parser, xml_string: str, expectation assert full_circle == expectation +@pytest.mark.parametrize( + ("args", "kwargs", "expected_error", "expected_error_msg"), + [ + ((), {"encoding": "bad"}, ValueError, + "Encoding must be one of"), + ((), {"encoding": "UTF-16"}, ValueError, + "Byte order must be specified for multi-byte character encodings."), + ((), {"byte_order": "invalid"}, ValueError, + "If specified, byte order must be one of"), + ((), {"termination_character": "FF", "leading_length_size": 8}, ValueError, + "Got both a termination character and a leading size"), + ((), {}, ValueError, + "Expected exactly one of dynamic length reference, discrete length lookup, or fixed length"), + ((), {"length_linear_adjuster": lambda x: x, "fixed_raw_length": 32}, ValueError, + "Got a length linear adjuster for a string whose length is not specified by a dynamic"), + ((), {"fixed_raw_length": 32, "termination_character": "0F0F"}, ValueError, + "Expected a hex string representation of a single character"), + ] +) +def test_string_data_encoding_validation(args, kwargs, expected_error, expected_error_msg): + """Test initialization errors for StringDataEncoding""" + with pytest.raises(expected_error, match=expected_error_msg): + encodings.StringDataEncoding(*args, **kwargs) + + @pytest.mark.parametrize( ('xml_string', 'expectation'), [ @@ -230,6 +255,19 @@ def test_integer_data_encoding(elmaker, xtce_parser, xml_string: str, expectatio assert full_circle == expectation +@pytest.mark.parametrize( + ("args", "kwargs", "expected_error", "expected_error_msg"), + [ + ((32, "invalid-encoding"), {}, ValueError, "Encoding must be one of"), + ((32, "unsigned"), {"byte_order": "noSignificantBitsAtAll!"}, ValueError, "Byte order must be one of"), + ] +) +def test_integer_data_encoding_validation(args, kwargs, expected_error, expected_error_msg): + """Test initialization errors for IntegerDataEncoding""" + with pytest.raises(expected_error, match=expected_error_msg): + encodings.IntegerDataEncoding(*args, **kwargs) + + @pytest.mark.parametrize( ('xml_string', 'expectation'), [ @@ -334,6 +372,22 @@ def test_float_data_encoding(elmaker, xtce_parser, xml_string: str, expectation) assert full_circle == expectation +@pytest.mark.parametrize( + ("args", "kwargs", "expected_error", "expected_error_msg"), + [ + ((32,), {"encoding": "foo"}, ValueError, "Invalid encoding type"), + ((32,), {"encoding": "DEC"}, NotImplementedError, "Although the XTCE spec allows"), + ((16,), {"encoding": "MILSTD_1750A"}, ValueError, "MIL-1750A encoded floats must be 32 bits"), + ((8,), {"encoding": "IEEE754"}, ValueError, "Invalid size_in_bits value for IEEE754 FloatDataEncoding"), + ((8,), {"encoding": "IEEE754_1985"}, ValueError, "Invalid size_in_bits value for IEEE754 FloatDataEncoding"), + ] +) +def test_float_data_encoding_validation(args, kwargs, expected_error, expected_error_msg): + """Test initialization errors for FloatDataEncoding""" + with pytest.raises(expected_error, match=expected_error_msg): + encodings.FloatDataEncoding(*args, **kwargs) + + @pytest.mark.parametrize( ('xml_string', 'expectation'), [ @@ -392,3 +446,15 @@ def test_binary_data_encoding(elmaker, xtce_parser, xml_string: str, expectation result_string = ElementTree.tostring(result.to_xml(elmaker=elmaker), pretty_print=True).decode() full_circle = encodings.BinaryDataEncoding.from_xml(ElementTree.fromstring(result_string, parser=xtce_parser)) assert full_circle == expectation + + +@pytest.mark.parametrize( + ("args", "kwargs", "expected_error", "expected_error_msg"), + [ + ((), {}, ValueError, "Binary data encoding initialized with no way to determine a size"), + ] +) +def test_binary_data_encoding_validation(args, kwargs, expected_error, expected_error_msg): + """Test initialization errors for BinaryDataEncoding""" + with pytest.raises(expected_error, match=expected_error_msg): + encodings.BinaryDataEncoding(*args, **kwargs)