Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions space_packet_parser/xarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


def _min_dtype_for_encoding(data_encoding: encodings.DataEncoding):
"""Find the minimum data type capaable of representing an XTCE data encoding.
"""Find the minimum data type capable of representing an XTCE data encoding.

This only works for raw values and does not apply to calibrated or otherwise derived values.

Expand Down Expand Up @@ -91,6 +91,10 @@ def _get_minimum_numpy_datatype(
# If we are using raw values, we can determine the minimal dtype from the parameter data encoding
return _min_dtype_for_encoding(data_encoding)

if isinstance(parameter_type, parameter_types.EnumeratedParameterType):
# Enums are always strings in their derived state
return "str"

if isinstance(data_encoding, encodings.NumericDataEncoding):
if not (data_encoding.context_calibrators is not None or data_encoding.default_calibrator is not None):
# If there are no calibrators attached to the encoding, then we can proceed as if we're using
Expand All @@ -103,10 +107,6 @@ def _get_minimum_numpy_datatype(
if isinstance(data_encoding, encodings.BinaryDataEncoding):
return "bytes"

if isinstance(parameter_type, parameter_types.EnumeratedParameterType):
# Enums are always strings in their derived state
return "str"

if isinstance(data_encoding, encodings.StringDataEncoding):
return "str"

Expand Down
43 changes: 11 additions & 32 deletions space_packet_parser/xtce/comparisons.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Matching logical objects"""
import warnings
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from typing import Any, Optional, Union
Expand Down Expand Up @@ -55,8 +54,13 @@ def evaluate(self,
class Comparison(MatchCriteria):
"""<xtce:Comparison>"""

def __init__(self, required_value: str, referenced_parameter: str,
operator: str = "==", use_calibrated_value: bool = True):
def __init__(
self,
required_value: str,
referenced_parameter: str,
operator: str = "==",
use_calibrated_value: bool = True
):
"""Constructor

Parameters
Expand Down Expand Up @@ -176,18 +180,11 @@ def evaluate(self,
if self.referenced_parameter in packet:
if self.use_calibrated_value:
parsed_value = packet[self.referenced_parameter]
if not parsed_value:
raise ComparisonError(f"Comparison {self} was instructed to useCalibratedValue (the default)"
f"but {self.referenced_parameter} does not appear to have a derived value.")
else:
parsed_value = packet[self.referenced_parameter].raw_value
elif current_parsed_value is not None:
# Assume then that the comparison is a reference to its own uncalibrated value
parsed_value = current_parsed_value
if self.use_calibrated_value:
warnings.warn("Performing a comparison against a current value (e.g. a Comparison within a "
"context calibrator contains a reference to its own uncalibrated value but use_"
"calibrated_value is set to true. This is nonsensical. Using the uncalibrated value...")
else:
raise ValueError("Attempting to resolve a Comparison expression but the referenced parameter does not "
"appear in the parsed data so far and no current raw value was passed "
Expand All @@ -200,9 +197,6 @@ def evaluate(self,
except ValueError as err:
raise ComparisonError(f"Unable to coerce {self.required_value} of type {type(self.required_value)} to "
f"type {t_comparate} for comparison evaluation.") from err
if required_value is None or parsed_value is None:
raise ValueError(f"Error in Comparison. Cannot compare {required_value} with {parsed_value}. "
"Neither should be None.")

# x.__le__(y) style call
return getattr(parsed_value, operator)(required_value)
Expand Down Expand Up @@ -379,7 +373,7 @@ def evaluate(self,
packet : packets.CCSDSPacket
Packet data used to evaluate truthyness of the match criteria.
current_parsed_value : Optional[Union[int, float]]
Current value being parsed. NOTE: This is currently ignored. See the TODO item below.
Ignored.

Returns
-------
Expand All @@ -389,21 +383,8 @@ def evaluate(self,

def _get_parsed_value(parameter_name: str, use_calibrated: bool):
"""Retrieves the previously parsed value from the passed in packet"""
try:
return packet[parameter_name] if use_calibrated \
else packet[parameter_name].raw_value
except KeyError as e:
raise ComparisonError(f"Attempting to perform a Condition evaluation on {self.left_param} but "
"the referenced parameter does not appear in the hitherto parsed data passed to "
"the evaluate method. If you intended a comparison against the raw value of the "
"parameter currently being parsed, unfortunately that is not currently supported."
) from e

# TODO: Consider allowing one of the parameters to be the parameter currently being evaluated.
# This isn't explicitly provided for in the XTCE spec but it seems reasonable to be able to
# perform conditionals against the current raw value of a parameter, e.g. while determining if it
# should be calibrated. Note that only one of the parameters can be used this way and it must reference
# an uncalibrated value so the logic and error handling must be done carefully.
return packet[parameter_name] if use_calibrated else packet[parameter_name].raw_value

left_value = _get_parsed_value(self.left_param, self.left_use_calibrated_value)
# Convert XML operator representation to a python-compatible operator (e.g. '&gt;' to '__gt__')
operator = self._valid_operators[self.operator]
Expand All @@ -415,8 +396,6 @@ def _get_parsed_value(parameter_name: str, use_calibrated: bool):
right_value = t_left_param(self.right_value)
else:
raise ValueError(f"Error when evaluating condition {self}. Neither right_param nor right_value is set.")
if left_value is None or right_value is None:
raise ComparisonError(f"Error comparing {left_value} and {right_value}. Neither should be None.")

# x.__le__(y) style call
return getattr(left_value, operator)(right_value)
Expand Down Expand Up @@ -526,7 +505,7 @@ def evaluate(self,
packet : packets.CCSDSPacket
Packet data used to evaluate truthyness of the match criteria.
current_parsed_value : Optional[Union[int, float]]
Current value being parsed.
Ignored.

Returns
-------
Expand Down
2 changes: 1 addition & 1 deletion space_packet_parser/xtce/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def parse_ccsds_packet(self,

def packet_generator(
self,
binary_data: Union[BinaryIO, socket.socket],
binary_data: Union[BinaryIO, socket.socket, bytes],
*,
parse_bad_pkts: bool = True,
root_container_name: Optional[str] = None,
Expand Down
53 changes: 49 additions & 4 deletions space_packet_parser/xtce/encodings.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,8 +638,51 @@ def to_xml(self, *, elmaker: ElementMaker) -> ElementTree.Element:

class IntegerDataEncoding(NumericDataEncoding):
"""<xtce:IntegerDataEncoding>"""
_encodings = ("unsigned", "signed", "twosCompliment", "twosComplement")
_byte_orders = ("leastSignificantByteFirst", "mostSignificantByteFirst")
_data_return_class = common.IntParameter

def __init__(self,
size_in_bits: int,
encoding: str,
*,
byte_order: str = "mostSignificantByteFirst",
default_calibrator: Optional[calibrators.Calibrator] = None,
context_calibrators: Optional[list[calibrators.ContextCalibrator]] = None):
"""Constructor

Parameters
----------
size_in_bits : int
Size of the integer
encoding : str
String indicating the type of encoding for the integer. FSW seems to use primarily 'signed' and 'unsigned',
though 'signed' is not actually a valid specifier according to XTCE. 'twosCompliment' [sic] should be used
instead, though we support the unofficial 'signed' specifier here.
For supported specifiers, see XTCE spec 4.3.2.2.5.6.2
byte_order : str
Description of the byte order. Default is 'mostSignficantByteFirst' (big-endian).
default_calibrator : Optional[Calibrator]
Optional Calibrator object, containing information on how to transform the integer-encoded data, e.g. via
a polynomial conversion or spline interpolation.
context_calibrators : Optional[List[ContextCalibrator]]
List of ContextCalibrator objects, containing match criteria and corresponding calibrators to use in
various scenarios, based on other parameters.
"""
if encoding not in self._encodings:
raise ValueError(f"Encoding must be one of {self._encodings}")

if byte_order not in self._byte_orders:
raise ValueError(f"Byte order must be one of {self._byte_orders}")

super().__init__(
size_in_bits,
encoding,
byte_order=byte_order,
default_calibrator=default_calibrator,
context_calibrators=context_calibrators
)

def _get_raw_value(self, packet: packets.CCSDSPacket) -> int:
# Extract the bits from the data in big-endian order from the packet
val = packet.raw_data.read_as_int(self.size_in_bits)
Expand Down Expand Up @@ -873,6 +916,11 @@ def __init__(self,
Function that linearly adjusts a size. e.g. if the size reference parameter gives a length in bytes, the
linear adjuster should multiply by 8 to give the size in bits.
"""
if not any([fixed_size_in_bits, size_reference_parameter, size_discrete_lookup_list]):
raise ValueError("Binary data encoding initialized with no way to determine a size. "
"You must provide one of "
"fixed_size_in_bits, size_reference_parameter, size_discrete_lookup_list.")

self.fixed_size_in_bits = fixed_size_in_bits
self.size_reference_parameter = size_reference_parameter
self.use_calibrated_value = use_calibrated_value
Expand All @@ -895,17 +943,14 @@ def _calculate_size(self, packet: packets.CCSDSPacket) -> int:
len_bits = packet[field_length_reference]
else:
len_bits = packet[field_length_reference].raw_value
elif self.size_discrete_lookup_list is not None:
else: # self.size_discrete_lookup_list is not None:
for discrete_lookup in self.size_discrete_lookup_list:
len_bits = discrete_lookup.evaluate(packet)
if len_bits is not None:
break
else:
raise ValueError('List of discrete lookup values being used for determining length of '
f'string {self} found no matches based on {packet}.')
else:
raise ValueError("Unable to parse BinaryDataEncoding. "
"No fixed size, dynamic size, or dynamic lookup size were provided.")

if self.linear_adjuster is not None:
# NOTE: This is assumed to be an integer value, represented as a float. If the linear adjuster
Expand Down
37 changes: 36 additions & 1 deletion tests/unit/test_packets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Tests for packets"""
import socket

import pytest

from space_packet_parser import packets
Expand Down Expand Up @@ -112,7 +114,7 @@ def test_ccsds_packet_data_lookups():
assert packet.user_data == {x: x for x in range(7, 10)}

with pytest.raises(KeyError):
packet[10]
_ = packet[10]


def test_continuation_packets(test_data_dir):
Expand Down Expand Up @@ -204,3 +206,36 @@ def test__extract_bits(start, nbits):
data = int(s, 2).to_bytes(2, byteorder="big")

assert packets._extract_bits(data, start, nbits) == int(s[start:start + nbits], 2)


def test_ccsds_generator(jpss_test_data_dir):
"""Test ccsds_generator"""
test_data_file = jpss_test_data_dir / "J01_G011_LZ_2021-04-09T00-00-00Z_V01.DAT1"
test_packet = packets.create_ccsds_packet() # defaults

# From file
with test_data_file.open('rb') as f:
assert next(packets.ccsds_generator(f))

# From socket
send, recv = socket.socketpair()
send.send(test_packet)
assert next(packets.ccsds_generator(recv))
send.close()
recv.close()

# From bytes
# This covers show_progress conditional code and also the end of the iterator
gen_from_bytes = packets.ccsds_generator(test_packet, show_progress=True)
assert next(gen_from_bytes)
with pytest.raises(StopIteration):
next(gen_from_bytes)

# From Text file (error)
with test_data_file.open('rt') as f:
with pytest.raises(OSError, match="Packet data file opened in TextIO mode"):
next(packets.ccsds_generator(f))

# Unrecognized source (error)
with pytest.raises(OSError, match="Unrecognized data source"):
next(packets.ccsds_generator(1))
99 changes: 99 additions & 0 deletions tests/unit/test_xarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Tests for the xarr.py extras module"""
import pytest

from space_packet_parser import xarr
from space_packet_parser.xtce import calibrators, containers, definitions, encodings, parameter_types, parameters

np = pytest.importorskip("numpy", reason="numpy is not available")


@pytest.fixture
def test_xtce():
"""Test definition for testing surmising data types"""
container_set = [
containers.SequenceContainer(
"CONTAINER",
entry_list=[
parameters.Parameter(
"INT32_PARAM",
parameter_type=parameter_types.IntegerParameterType(
"I32_TYPE",
encoding=encodings.IntegerDataEncoding(size_in_bits=32, encoding="twosComplement")
)
),
parameters.Parameter(
"F32_PARAM",
parameter_type=parameter_types.FloatParameterType(
"F32_TYPE",
encoding=encodings.FloatDataEncoding(size_in_bits=32, encoding="IEEE754")
)
),
parameters.Parameter(
"CAL_INT_PARAM",
parameter_type=parameter_types.IntegerParameterType(
"I32_TYPE",
encoding=encodings.IntegerDataEncoding(
size_in_bits=32,
encoding="twosComplement",
default_calibrator=calibrators.PolynomialCalibrator(
coefficients=[
calibrators.PolynomialCoefficient(1, 1)
]
)
)
)
),
parameters.Parameter(
"BIN_PARAM",
parameter_type=parameter_types.BinaryParameterType(
"BIN_TYPE",
encoding=encodings.BinaryDataEncoding(
fixed_size_in_bits=32
)
)
),
parameters.Parameter(
"INT_ENUM_PARAM",
parameter_type=parameter_types.EnumeratedParameterType(
"INT_ENUM_TYPE",
encoding=encodings.IntegerDataEncoding(size_in_bits=8, encoding="unsigned"),
enumeration={
"ONE": 1,
"TWO": 2
}
)
),
parameters.Parameter(
"STR_PARAM",
parameter_type=parameter_types.StringParameterType(
"STR_TYPE",
encoding=encodings.StringDataEncoding(
fixed_raw_length=32
)
)
),
]
)
]
return definitions.XtcePacketDefinition(container_set=container_set)

@pytest.mark.parametrize(
("pname", "use_raw_value", "expected_dtype"),
[
("INT32_PARAM", True, "int32"),
("INT32_PARAM", False, "int32"),
("F32_PARAM", False, "float32"),
("F32_PARAM", True, "float32"),
("CAL_INT_PARAM", True, "int32"),
("CAL_INT_PARAM", False, None),
("BIN_PARAM", True, "bytes"),
("BIN_PARAM", False, "bytes"),
("INT_ENUM_PARAM", True, "uint8"),
("INT_ENUM_PARAM", False, "str"),
("STR_PARAM", True, "str"),
("STR_PARAM", False, "str"),
]
)
def test_minimum_numpy_dtype(test_xtce, pname, use_raw_value, expected_dtype):
"""Test finding the minimum numpy data type for a parameter"""
assert xarr._get_minimum_numpy_datatype(pname, test_xtce, use_raw_value) == expected_dtype
Loading