diff --git a/telemetry/gparser/parser.py b/telemetry/gparser/parser.py
index 2e0308e..87c2ed1 100644
--- a/telemetry/gparser/parser.py
+++ b/telemetry/gparser/parser.py
@@ -6,6 +6,8 @@
import junitparser
import telemetry
from junitparser import JUnitXml, Skipped, Error, Failure
+import xml.etree.ElementTree as ET
+import ast
def get_parser(url,grabber=None):
'''Factory method that provides appropriate parser object base on given url'''
@@ -243,13 +245,153 @@ def get_payload_parsed(self):
payload_param = param
return (payload, payload_param)
-class PytestFailure(xmlParser):
+class pytestxml_parser(xmlParser):
+ def __init__(self, url, grabber):
+ super(pytestxml_parser, self).__init__(url, grabber)
+
+ def get_payload_raw(self):
+ payload = []
+ try:
+ file_path = self.grabber.download_file(self.url, self.file_name)
+ # Load and parse the XML using element tree
+ tree = ET.parse(file_path)
+ root = tree.getroot()
+
+ # Iterate over all test cases
+ for testcase in root.findall(".//testcase"):
+ # Get the name of the test case
+ test_name = testcase.get("name")
+ # Find the properties tag
+ properties = testcase.find("properties")
+ # Find the failure tag
+ failure = testcase.find("failure")
+
+ attr_link = "https://analogdevicesinc.github.io/pyadi-iio/dev/test_attr.html"
+ # Set default description link for all tests
+ test_name_link = f"[{test_name}]({attr_link})"
+
+ if properties is not None:
+ if failure is not None:
+ # Get parameters from test case name
+ param_parsed = test_name.split("[", 1)
+ test_name = param_parsed[0]
+ param_parsed_last = (param_parsed[-1])[:-1].strip()
+ # Separate the parameters
+ param_separate = re.split(r'-(?!\d)', param_parsed_last)
+ # Check parameters list for a dictionary"
+ for index, param in enumerate(param_separate):
+ param_list = []
+ # Separate values from parameter name
+ new_param_split = param.split("=", 1)
+ if len(new_param_split) > 1:
+ param_name = new_param_split[0]
+ new_param = new_param_split[1].strip()
+ # Check if param is a dictionary and not empty
+ if new_param[0] == "{" and new_param != "\{\}":
+ # Convert remaining string to a dictionary
+ param_dict = ast.literal_eval(new_param)
+ if param_dict:
+ for key, value in param_dict.items():
+ # Convert parameters back to string
+ new_updated = "'" + key + "'" + ": " + str(value)
+ param_list.append(new_updated)
+ insert_param_name = param_name + "="
+ param_list.insert(0, insert_param_name)
+ param_list_string = " \n".join(param_list)
+ # Update param in param_separate list
+ param_separate[index] = param_list_string
+ # Compile final parameter details
+ param_separate.insert(0,"**Parameters:**")
+ param_display = "\n - ".join(param_separate)
+
+ # Get failure tag content
+ failure_text = failure.text
+ fail_content_lines = failure_text.splitlines()
+ exc_param_value = ""
+ # Get exception statement with parameter names
+ for item in fail_content_lines[::-1]:
+ if len(item) > 0:
+ if item[0] == ">":
+ exc_param_value = item[1:].lstrip()
+ break
+
+ test_name_function = ""
+ test_function_module = ""
+ exctype_message = ""
+ # Iterate through each property in the properties tag
+ for prop in properties.findall("property"):
+ # Get the property name and value
+ prop_name = prop.get("name")
+ prop_value = prop.get("value")
+ if prop_name == "exception_type_and_message":
+ prop_list = prop_value.splitlines()
+ new_props = prop_value
+ prop_list_updated = []
+ # Check if exception and message has mutiple lines
+ if len(prop_list) > 1:
+ for props in prop_list:
+ if len(props) > 0:
+ # Remove leading spaces
+ prop_strip = props.lstrip()
+ prop_list_updated.append(prop_strip)
+ if len(prop_list_updated) > 0:
+ new_props = " ".join(prop_list_updated)
+ # Combine exception type, message, and parameters
+ exctype_message = "\n" + " " + new_props + " ( " + exc_param_value + " )"
+ # Get test name function
+ if prop_name == "test_name_function":
+ # Get test description
+ test_name_function = prop_value
+ if prop_name == "test_function_module":
+ test_function_module = prop_value
+
+ # Create dictionary of pyadi-iio test module links
+ test_links = {
+ "test.attr_tests" : "https://analogdevicesinc.github.io/pyadi-iio/dev/test_attr.html",
+ "test.dma_tests" : "https://analogdevicesinc.github.io/pyadi-iio/dev/test_dma.html",
+ "test.generics" : "https://analogdevicesinc.github.io/pyadi-iio/dev/test_generics.html"
+ }
+
+ # Set test description link
+ if test_function_module != "":
+ if test_name_function != "":
+ if test_function_module in test_links:
+ test_permalink = test_links[test_function_module] + "#" + test_function_module + "." + test_name_function
+ test_name_link = f"[{test_name}]({test_permalink})"
+ else:
+ if test_function_module in test_links:
+ test_permalink = test_links[test_function_module]
+ test_name_link = f"[{test_name}]({test_permalink})"
+
+ # Compile the test details
+ test_details = [test_name_link, param_display]
+ test_details_final = "
".join(test_details)
+ test_details_final = test_details_final + "\n\n" + exctype_message
+
+ payload.append(test_details_final)
+
+ except Exception as ex:
+ traceback.print_exc()
+ print("Error Parsing File!")
+ finally:
+ os.remove(file_path)
+ return payload
+
+ def get_payload_parsed(self):
+ num_payload = len(self.payload_raw)
+ param = list(range(num_payload))
+
+ payload = self.payload_raw
+ payload_param = param
+ return (payload, payload_param)
+
+class PytestFailure(pytestxml_parser):
def __init__(self, url, grabber):
super(PytestFailure, self).__init__(url, grabber)
-class PytestSkipped(xmlParser):
+class PytestSkipped(pytestxml_parser):
def __init__(self, url, grabber):
super(PytestSkipped, self).__init__(url, grabber)
-class PytestError(xmlParser,):
+class PytestError(pytestxml_parser,):
def __init__(self, url, grabber):
super(PytestError, self).__init__(url, grabber)
diff --git a/telemetry/report/markdown.py b/telemetry/report/markdown.py
index 20ab917..3a25df0 100644
--- a/telemetry/report/markdown.py
+++ b/telemetry/report/markdown.py
@@ -73,9 +73,16 @@ def generate_param(self,data):
else:
iio_drivers_missing_details = "No missing drivers" if len(info["missing_devs"]) == 0 else ("
").join(info["missing_devs"])
iio_drivers_found_details = "No iio drivers found" if len(info["enumerated_devs"]) == 0 else ("
").join(info["enumerated_devs"])
- dmesg_errors_found_details = "No errors" if len(info["dmesg_err"]) == 0 else ("
").join(info["dmesg_err"])
- pytest_failures_details = "No failures" if len(info["pytest_failure"]) == 0 else ("
").join(info["pytest_failure"])
+ dmesg_errors_found_details = "No errors" if len(info["dmesg_err"]) == 0 else ("
").join(info["dmesg_err"])
+ pytest_failures_details = "No failures"
pytest_failures_details = "Invalid" if pytest_tests_status == "⛔" else pytest_failures_details
+ pytest_details = []
+ if len(info["pytest_failure"]) != 0:
+ pytest_details.append(info["pytest_failure"][0])
+ for item in info["pytest_failure"][1:]:
+ item_update = "- " + item
+ pytest_details.append(item_update)
+ pytest_failures_details = ("\n\n").join(pytest_details)
last_failing_stage = str(info["last_failing_stage"])
last_failing_stage_failure = str(info["last_failing_stage_failure"])
diff --git a/tests/test_artifacts/zynqmp_zcu102_rev10_ad9081_vm4_l8_reports.xml b/tests/test_artifacts/zynqmp_zcu102_rev10_ad9081_vm4_l8_reports.xml
new file mode 100644
index 0000000..a213a70
--- /dev/null
+++ b/tests/test_artifacts/zynqmp_zcu102_rev10_ad9081_vm4_l8_reports.xml
@@ -0,0 +1,1102 @@
+test_attribute_multiple_values = <function attribute_multiple_values at 0x7bb6970d8af0>
+iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081', attr = 'rx_test_mode'
+val = ['midscale_short', 'pos_fullscale', 'neg_fullscale', 'checkerboard', 'pn23', 'pn9', ...]
+
+ @pytest.mark.iio_hardware(hardware)
+ @pytest.mark.parametrize("classname", [(classname)])
+ @pytest.mark.parametrize(
+ "attr, val",
+ [
+ ("rx_nyquist_zone", ["even", "odd"]),
+ ("loopback_mode", [0]),
+ (
+ "rx_test_mode",
+ [
+ "midscale_short",
+ "pos_fullscale",
+ "neg_fullscale",
+ "checkerboard",
+ "pn23",
+ "pn9",
+ "one_zero_toggle",
+ "user",
+ "pn7",
+ "pn15",
+ "pn31",
+ "ramp",
+ "off",
+ ],
+ ),
+ (
+ "tx_main_ffh_mode",
+ ["phase_continuous", "phase_incontinuous", "phase_coherent"],
+ ),
+ ],
+ )
+ def test_ad9081_str_attr(test_attribute_multiple_values, iio_uri, classname, attr, val):
+> test_attribute_multiple_values(iio_uri, classname, attr, val, 0)
+
+test/test_ad9081.py:65:
+_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
+test/attr_tests.py:221: in attribute_multiple_values
+ assert dev_interface(uri, classname, val, attr, tol, sleep=sleep)
+test/common.py:114: in dev_interface
+ setattr(sdr, attr, val)
+adi/ad9081.py:268: in rx_test_mode
+ self._set_iio_attr_single(
+adi/ad9081.py:170: in _set_iio_attr_single
+ return self._set_iio_attr(channel_name, attr, output, value, _ctrl)
+adi/attribute.py:71: in _set_iio_attr
+ raise ex
+adi/attribute.py:69: in _set_iio_attr
+ channel.attrs[attr_name].value = str(value)
+/usr/local/lib/python3.8/dist-packages/iio.py:704: in <lambda>
+ lambda self, x: self._write(x),
+/usr/local/lib/python3.8/dist-packages/iio.py:736: in _write
+ _c_write_attr(self._channel, self._name_ascii, value.encode("ascii"))
+_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
+
+result = -22, func = <_FuncPtr object at 0x7bb6c56de880>
+arguments = (<iio.LP__Channel object at 0x7bb69f77a5c0>, b'test_mode', b'ramp')
+
+ def _check_negative(result, func, arguments):
+ if result >= 0:
+ return result
+> raise OSError(-result, _strerror(-result))
+E OSError: [Errno 22] Invalid argument
+
+/usr/local/lib/python3.8/dist-packages/iio.py:62: OSError/var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:121: Skipping test: Channel 2not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:121: Skipping test: Channel 3not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:131: Skipping test: Channel 2not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:131: Skipping test: Channel 3not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:163: Skipping test: Channel 2not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:163: Skipping test: Channel 3not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:194: Skipping test: Channel 2not available./var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_ad9081.py:194: Skipping test: Channel 3not available.test_sfdr = <function t_sfdr at 0x7bb6a9ca9820>, iio_uri = 'ip:192.168.10.114'
+classname = 'adi.ad9081', channel = 0
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...}
+sfdr_min = 70
+
+ @pytest.mark.iio_hardware(hardware, True)
+ @pytest.mark.parametrize("classname", [(classname)])
+ @pytest.mark.parametrize("channel", [0])
+ @pytest.mark.parametrize(
+ "param_set",
+ [
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ rx_channel_nco_frequencies=[0, 0, 0, 0],
+ tx_channel_nco_frequencies=[0, 0, 0, 0],
+ rx_main_nco_phases=[0, 0, 0, 0],
+ tx_main_nco_phases=[0, 0, 0, 0],
+ rx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_test_tone_en=[0, 0, 0, 0],
+ tx_main_nco_test_tone_en=[0, 0, 0, 0],
+ )
+ ],
+ )
+ @pytest.mark.parametrize("sfdr_min", [70])
+ def test_ad9081_sfdr(test_sfdr, iio_uri, classname, channel, param_set, sfdr_min):
+ param_set = scale_field(param_set, iio_uri)
+> test_sfdr(iio_uri, classname, channel, param_set, sfdr_min, full_scale=0.5)
+
+test/test_ad9081.py:232:
+_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
+
+uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...}
+sfdr_min = 70, use_obs = False, full_scale = 0.5
+
+ def t_sfdr(uri, classname, channel, param_set, sfdr_min, use_obs=False, full_scale=0.9):
+ """t_sfdr: Test SFDR loopback of tone with connected loopback cables.
+ This test requires a devices with TX and RX onboard where the transmit
+ signal can be recovered. Sinuoidal data is passed to DMAs which is then
+ estimated on the RX side. The peak and second peak are determined in
+ the received signal to determine the sfdr.
+
+ parameters:
+ uri: type=string
+ URI of IIO context of target board/system
+ classname: type=string
+ Name of pyadi interface class which contain attribute
+ channel: type=list
+ List of integers or list of list of integers of channels to
+ enable through tx_enabled_channels
+ param_set: type=dict
+ Dictionary of attribute and values to be set before tone is
+ generated and received
+ sfdr_min: type=float
+ Minimum acceptable value of SFDR in dB
+
+ """
+ # See if we can tone using DMAs
+ sdr = eval(classname + "(uri='" + uri + "')")
+ # Set custom device parameters
+ for p in param_set.keys():
+ setattr(sdr, p, param_set[p])
+ time.sleep(5) # Wait for QEC to kick in
+ # Set common buffer settings
+ N = 2 ** 14
+ sdr.tx_cyclic_buffer = True
+ sdr.tx_enabled_channels = [channel]
+ sdr.tx_buffer_size = N * 2 * len(sdr.tx_enabled_channels)
+
+ if use_obs:
+ sdr.obs.rx_enabled_channels = [0]
+ sdr.obs.rx_buffer_size = N * 2 * len(sdr.obs.rx_enabled_channels)
+ else:
+ sdr.rx_enabled_channels = [channel]
+ sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels)
+
+ # Create a sinewave waveform
+ if hasattr(sdr, "sample_rate"):
+ RXFS = int(sdr.sample_rate)
+ else:
+ RXFS = int(sdr.rx_sample_rate)
+
+ if hasattr(sdr, "tx_sample_rate"):
+ FS = int(sdr.tx_sample_rate)
+ else:
+ FS = RXFS
+
+ fc = FS * 0.1
+ fc = int(fc / (FS / N)) * (FS / N)
+ ts = 1 / float(FS)
+ t = np.arange(0, N * ts, ts)
+ i = np.cos(2 * np.pi * t * fc) * 2 ** 15 * full_scale
+ q = np.sin(2 * np.pi * t * fc) * 2 ** 15 * full_scale
+ iq = i + 1j * q
+ # Pass through SDR
+ try:
+ sdr.tx(iq)
+ time.sleep(3)
+ for _ in range(10): # Wait for IQ correction to stabilize
+ data = sdr.obs.rx() if use_obs else sdr.rx()
+ except Exception as e:
+ del sdr
+ raise Exception(e)
+ del sdr
+ val, amp, freqs = spec.sfdr(data, fs=RXFS, plot=False)
+ if do_html_log:
+ pytest.data_log = {
+ "html": gen_line_plot_html(
+ freqs,
+ amp,
+ "Frequency (Hz)",
+ "Amplitude (dBFS)",
+ "SDFR {} dBc ({})".format(val, classname),
+ )
+ }
+ print("SFDR:", val, "dB")
+> assert val > sfdr_min
+E assert 0.5749985164147731 > 70
+
+test/dma_tests.py:733: AssertionErrortest_dds_loopback = <function dds_loopback at 0x7bb6a9ca95e0>
+iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081'
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...}
+channel = 0, frequency = 10000000, scale = 0.5, peak_min = -30
+
+ @pytest.mark.iio_hardware(hardware, True)
+ @pytest.mark.parametrize("classname", [(classname)])
+ @pytest.mark.parametrize("channel", [0])
+ @pytest.mark.parametrize("frequency, scale", [(10000000, 0.5)])
+ @pytest.mark.parametrize(
+ "param_set",
+ [
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ rx_channel_nco_frequencies=[0, 0, 0, 0],
+ tx_channel_nco_frequencies=[0, 0, 0, 0],
+ rx_main_nco_phases=[0, 0, 0, 0],
+ tx_main_nco_phases=[0, 0, 0, 0],
+ rx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_phases=[0, 0, 0, 0],
+ )
+ ],
+ )
+ @pytest.mark.parametrize("peak_min", [-30])
+ def test_ad9081_dds_loopback(
+ test_dds_loopback,
+ iio_uri,
+ classname,
+ param_set,
+ channel,
+ frequency,
+ scale,
+ peak_min,
+ ):
+ param_set = scale_field(param_set, iio_uri)
+> test_dds_loopback(
+ iio_uri, classname, param_set, channel, frequency, scale, peak_min
+ )
+
+test/test_ad9081.py:268:
+_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
+
+uri = 'ip:192.168.10.114', classname = 'adi.ad9081'
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...}
+channel = 0, frequency = 10000000, scale = 0.5, peak_min = -30, use_obs = False
+use_rx2 = False
+
+ def dds_loopback(
+ uri,
+ classname,
+ param_set,
+ channel,
+ frequency,
+ scale,
+ peak_min,
+ use_obs=False,
+ use_rx2=False,
+ ):
+ """dds_loopback: Test DDS loopback with connected loopback cables.
+ This test requires a devices with TX and RX onboard where the transmit
+ signal can be recovered. TX FPGA DDSs are used to generate a sinusoid
+ which is then estimated on the RX side. The receive tone must be within
+ 1% of its expected frequency with a specified peak
+
+ parameters:
+ uri: type=string
+ URI of IIO context of target board/system
+ classname: type=string
+ Name of pyadi interface class which contain attribute
+ param_set: type=dict
+ Dictionary of attribute and values to be set before tone is
+ generated and received
+ channel: type=list
+ List of integers or list of list of integers of channels to
+ enable through tx_enabled_channels
+ frequency: type=integer
+ Frequency in Hz of transmitted tone
+ scale: type=float
+ Scale of DDS tone. Range [0,1]
+ peak_min: type=float
+ Minimum acceptable value of maximum peak in dBFS of received tone
+
+ """
+ # See if we can tone using DMAs
+ sdr = eval(classname + "(uri='" + uri + "')")
+ # Set custom device parameters
+ for p in param_set.keys():
+ setattr(sdr, p, param_set[p])
+ # Set common buffer settings
+ sdr.tx_cyclic_buffer = True
+ N = 2 ** 14
+
+ if use_obs and use_rx2:
+ raise Exception("Both RX2 and OBS are selected. Select one at a time.")
+
+ if use_rx2:
+ sdr.rx2_enabled_channels = [channel]
+ sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels)
+ elif use_obs:
+ sdr.obs.rx_enabled_channels = [0]
+ sdr.obs.rx_buffer_size = N * 2 * len(sdr.obs.rx_enabled_channels)
+ else:
+ sdr.rx_enabled_channels = [channel]
+ sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels)
+
+ # Create a sinewave waveform
+ if hasattr(sdr, "sample_rate"):
+ RXFS = int(sdr.sample_rate)
+ else:
+ RXFS = int(sdr.orx_sample_rate) if use_obs else int(sdr.rx_sample_rate)
+
+ sdr.dds_single_tone(frequency, scale, channel)
+
+ # Pass through SDR
+ try:
+ for _ in range(10): # Wait
+ data = sdr.rx2() if use_rx2 else sdr.obs.rx() if use_obs else sdr.rx()
+ except Exception as e:
+ del sdr
+ raise Exception(e)
+ del sdr
+ tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=2 ** 15, plot=False)
+ indx = np.argmax(tone_peaks)
+ diff = np.abs(tone_freqs[indx] - frequency)
+ s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx])
+ print(s)
+
+ if do_html_log:
+ pytest.data_log = {
+ "html": gen_line_plot_html(
+ tone_freqs,
+ tone_peaks,
+ "Frequency (Hz)",
+ "Amplitude (dBFS)",
+ "{} ({})".format(s, classname),
+ )
+ }
+
+> assert (frequency * 0.01) > diff
+E assert (10000000 * 0.01) > 607431640.625
+
+test/dma_tests.py:340: AssertionErrortest_iq_loopback = <function cw_loopback at 0x7bb6a9ca9790>
+iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [1234567, 1234567], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [500000000, 500000000], ...}
+
+ @pytest.mark.iio_hardware(hardware, True)
+ @pytest.mark.parametrize("classname", [(classname)])
+ @pytest.mark.parametrize("channel", [0])
+ @pytest.mark.parametrize(
+ "param_set",
+ [
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000],
+ tx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000],
+ rx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567],
+ tx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567],
+ rx_main_nco_phases=[0, 0, 0, 0],
+ tx_main_nco_phases=[0, 0, 0, 0],
+ rx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_phases=[0, 0, 0, 0],
+ ),
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000],
+ tx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000],
+ rx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567],
+ tx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567],
+ rx_main_nco_phases=[0, 0, 0, 0],
+ tx_main_nco_phases=[0, 0, 0, 0],
+ rx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_phases=[0, 0, 0, 0],
+ ),
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ rx_channel_nco_frequencies=[0, 0, 0, 0],
+ tx_channel_nco_frequencies=[0, 0, 0, 0],
+ rx_main_nco_phases=[0, 0, 0, 0],
+ tx_main_nco_phases=[0, 0, 0, 0],
+ rx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_phases=[0, 0, 0, 0],
+ ),
+ ],
+ )
+ def test_ad9081_iq_loopback(test_iq_loopback, iio_uri, classname, channel, param_set):
+ param_set = scale_field(param_set, iio_uri)
+> test_iq_loopback(iio_uri, classname, channel, param_set)
+
+test/test_ad9081.py:317:
+_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
+
+uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [1234567, 1234567], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [500000000, 500000000], ...}
+use_tx2 = False, use_rx2 = False
+
+ def cw_loopback(uri, classname, channel, param_set, use_tx2=False, use_rx2=False):
+ """cw_loopback: Test CW loopback with connected loopback cables.
+ This test requires a devices with TX and RX onboard where the transmit
+ signal can be recovered. Sinuoidal data is passed to DMAs which is then
+ estimated on the RX side. The receive tone must be within
+ 1% of its expected frequency at the max peak found
+
+ parameters:
+ uri: type=string
+ URI of IIO context of target board/system
+ classname: type=string
+ Name of pyadi interface class which contain attribute
+ channel: type=list
+ List of integers or list of list of integers of channels to
+ enable through tx_enabled_channels
+ param_set: type=dict
+ Dictionary of attribute and values to be set before tone is
+ generated and received
+ use_tx2: type=bool
+ Boolean if set will use tx2() as tx method
+ use_rx2: type=bool
+ Boolean if set will use rx2() as rx method
+ """
+ # See if we can tone using DMAs
+ sdr = eval(classname + "(uri='" + uri + "')")
+ # Set custom device parameters
+ for p in param_set.keys():
+ setattr(sdr, p, param_set[p])
+ time.sleep(1)
+ # Verify still set
+ for p in param_set.keys():
+ if isinstance(param_set[p], str):
+ assert getattr(sdr, p) == param_set[p]
+ else:
+ assert (
+ np.argmax(np.abs(np.array(getattr(sdr, p)) - np.array(param_set[p])))
+ < 4
+ )
+ # Set common buffer settings
+ N = 2 ** 14
+ if use_tx2:
+ sdr.tx2_cyclic_buffer = True
+ sdr.tx2_enabled_channels = [channel]
+ sdr.tx2_buffer_size = N * 2 * len(sdr.tx2_enabled_channels)
+ else:
+ sdr.tx_cyclic_buffer = True
+ sdr.tx_enabled_channels = [channel]
+ sdr.tx_buffer_size = N * 2 * len(sdr.tx_enabled_channels)
+
+ if use_rx2:
+ sdr.rx2_enabled_channels = [channel]
+ sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels)
+ else:
+ sdr.rx_enabled_channels = [channel]
+ sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels)
+
+ # Create a sinewave waveform
+ if hasattr(sdr, "sample_rate"):
+ RXFS = int(sdr.sample_rate)
+ elif hasattr(sdr, "rx_sample_rate"):
+ RXFS = int(sdr.rx_sample_rate)
+ else:
+ """no sample_rate nor rx_sample_rate. Let's try something like
+ rx($channel)_sample_rate"""
+ attr = "rx" + str(channel) + "_sample_rate"
+ RXFS = int(getattr(sdr, attr))
+
+ A = 2 ** 15
+ if hasattr(sdr, "tx_sample_rate"):
+ FS = int(sdr.tx_sample_rate)
+ else:
+ FS = RXFS
+ fc = FS * 0.1
+ fc = int(fc / (FS / N)) * (FS / N)
+
+ ts = 1 / float(FS)
+ t = np.arange(0, N * ts, ts)
+ if sdr._complex_data:
+ i = np.cos(2 * np.pi * t * fc) * A * 0.5
+ q = np.sin(2 * np.pi * t * fc) * A * 0.5
+ cw = i + 1j * q
+ else:
+ cw = np.cos(2 * np.pi * t * fc) * A * 1
+
+ # Pass through SDR
+ try:
+ if use_tx2:
+ sdr.tx2(cw)
+ else:
+ sdr.tx(cw)
+ for _ in range(30): # Wait to stabilize
+ data = sdr.rx2() if use_rx2 else sdr.rx()
+ except Exception as e:
+ del sdr
+ raise Exception(e)
+ del sdr
+ # tone_freq = freq_est(data, RXFS)
+ # diff = np.abs(tone_freq - fc)
+ # print("Peak: @"+str(tone_freq) )
+ # assert (fc * 0.01) > diff
+
+ tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=A, plot=False)
+ indx = np.argmax(tone_peaks)
+ diff = np.abs(tone_freqs[indx] - fc)
+ s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx])
+ print(s)
+
+ if do_html_log:
+ pytest.data_log = {
+ "html": gen_line_plot_html(
+ tone_freqs,
+ tone_peaks,
+ "Frequency (Hz)",
+ "Amplitude (dBFS)",
+ "{} ({})".format(s, classname),
+ )
+ }
+
+> assert (fc * 0.01) > diff
+E assert (149963378.90625 * 0.01) > 802505493.1640625
+
+test/dma_tests.py:648: AssertionErrortest_iq_loopback = <function cw_loopback at 0x7bb6a9ca9790>
+iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [-1234567, -1234567], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [750000000, 750000000], ...}
+
+ @pytest.mark.iio_hardware(hardware, True)
+ @pytest.mark.parametrize("classname", [(classname)])
+ @pytest.mark.parametrize("channel", [0])
+ @pytest.mark.parametrize(
+ "param_set",
+ [
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000],
+ tx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000],
+ rx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567],
+ tx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567],
+ rx_main_nco_phases=[0, 0, 0, 0],
+ tx_main_nco_phases=[0, 0, 0, 0],
+ rx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_phases=[0, 0, 0, 0],
+ ),
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000],
+ tx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000],
+ rx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567],
+ tx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567],
+ rx_main_nco_phases=[0, 0, 0, 0],
+ tx_main_nco_phases=[0, 0, 0, 0],
+ rx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_phases=[0, 0, 0, 0],
+ ),
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ rx_channel_nco_frequencies=[0, 0, 0, 0],
+ tx_channel_nco_frequencies=[0, 0, 0, 0],
+ rx_main_nco_phases=[0, 0, 0, 0],
+ tx_main_nco_phases=[0, 0, 0, 0],
+ rx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_phases=[0, 0, 0, 0],
+ ),
+ ],
+ )
+ def test_ad9081_iq_loopback(test_iq_loopback, iio_uri, classname, channel, param_set):
+ param_set = scale_field(param_set, iio_uri)
+> test_iq_loopback(iio_uri, classname, channel, param_set)
+
+test/test_ad9081.py:317:
+_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
+
+uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [-1234567, -1234567], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [750000000, 750000000], ...}
+use_tx2 = False, use_rx2 = False
+
+ def cw_loopback(uri, classname, channel, param_set, use_tx2=False, use_rx2=False):
+ """cw_loopback: Test CW loopback with connected loopback cables.
+ This test requires a devices with TX and RX onboard where the transmit
+ signal can be recovered. Sinuoidal data is passed to DMAs which is then
+ estimated on the RX side. The receive tone must be within
+ 1% of its expected frequency at the max peak found
+
+ parameters:
+ uri: type=string
+ URI of IIO context of target board/system
+ classname: type=string
+ Name of pyadi interface class which contain attribute
+ channel: type=list
+ List of integers or list of list of integers of channels to
+ enable through tx_enabled_channels
+ param_set: type=dict
+ Dictionary of attribute and values to be set before tone is
+ generated and received
+ use_tx2: type=bool
+ Boolean if set will use tx2() as tx method
+ use_rx2: type=bool
+ Boolean if set will use rx2() as rx method
+ """
+ # See if we can tone using DMAs
+ sdr = eval(classname + "(uri='" + uri + "')")
+ # Set custom device parameters
+ for p in param_set.keys():
+ setattr(sdr, p, param_set[p])
+ time.sleep(1)
+ # Verify still set
+ for p in param_set.keys():
+ if isinstance(param_set[p], str):
+ assert getattr(sdr, p) == param_set[p]
+ else:
+ assert (
+ np.argmax(np.abs(np.array(getattr(sdr, p)) - np.array(param_set[p])))
+ < 4
+ )
+ # Set common buffer settings
+ N = 2 ** 14
+ if use_tx2:
+ sdr.tx2_cyclic_buffer = True
+ sdr.tx2_enabled_channels = [channel]
+ sdr.tx2_buffer_size = N * 2 * len(sdr.tx2_enabled_channels)
+ else:
+ sdr.tx_cyclic_buffer = True
+ sdr.tx_enabled_channels = [channel]
+ sdr.tx_buffer_size = N * 2 * len(sdr.tx_enabled_channels)
+
+ if use_rx2:
+ sdr.rx2_enabled_channels = [channel]
+ sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels)
+ else:
+ sdr.rx_enabled_channels = [channel]
+ sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels)
+
+ # Create a sinewave waveform
+ if hasattr(sdr, "sample_rate"):
+ RXFS = int(sdr.sample_rate)
+ elif hasattr(sdr, "rx_sample_rate"):
+ RXFS = int(sdr.rx_sample_rate)
+ else:
+ """no sample_rate nor rx_sample_rate. Let's try something like
+ rx($channel)_sample_rate"""
+ attr = "rx" + str(channel) + "_sample_rate"
+ RXFS = int(getattr(sdr, attr))
+
+ A = 2 ** 15
+ if hasattr(sdr, "tx_sample_rate"):
+ FS = int(sdr.tx_sample_rate)
+ else:
+ FS = RXFS
+ fc = FS * 0.1
+ fc = int(fc / (FS / N)) * (FS / N)
+
+ ts = 1 / float(FS)
+ t = np.arange(0, N * ts, ts)
+ if sdr._complex_data:
+ i = np.cos(2 * np.pi * t * fc) * A * 0.5
+ q = np.sin(2 * np.pi * t * fc) * A * 0.5
+ cw = i + 1j * q
+ else:
+ cw = np.cos(2 * np.pi * t * fc) * A * 1
+
+ # Pass through SDR
+ try:
+ if use_tx2:
+ sdr.tx2(cw)
+ else:
+ sdr.tx(cw)
+ for _ in range(30): # Wait to stabilize
+ data = sdr.rx2() if use_rx2 else sdr.rx()
+ except Exception as e:
+ del sdr
+ raise Exception(e)
+ del sdr
+ # tone_freq = freq_est(data, RXFS)
+ # diff = np.abs(tone_freq - fc)
+ # print("Peak: @"+str(tone_freq) )
+ # assert (fc * 0.01) > diff
+
+ tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=A, plot=False)
+ indx = np.argmax(tone_peaks)
+ diff = np.abs(tone_freqs[indx] - fc)
+ s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx])
+ print(s)
+
+ if do_html_log:
+ pytest.data_log = {
+ "html": gen_line_plot_html(
+ tone_freqs,
+ tone_peaks,
+ "Frequency (Hz)",
+ "Amplitude (dBFS)",
+ "{} ({})".format(s, classname),
+ )
+ }
+
+> assert (fc * 0.01) > diff
+E assert (149963378.90625 * 0.01) > 606536865.234375
+
+test/dma_tests.py:648: AssertionErrortest_iq_loopback = <function cw_loopback at 0x7bb6a9ca9790>
+iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...}
+
+ @pytest.mark.iio_hardware(hardware, True)
+ @pytest.mark.parametrize("classname", [(classname)])
+ @pytest.mark.parametrize("channel", [0])
+ @pytest.mark.parametrize(
+ "param_set",
+ [
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000],
+ tx_main_nco_frequencies=[500000000, 500000000, 500000000, 500000000],
+ rx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567],
+ tx_channel_nco_frequencies=[1234567, 1234567, 1234567, 1234567],
+ rx_main_nco_phases=[0, 0, 0, 0],
+ tx_main_nco_phases=[0, 0, 0, 0],
+ rx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_phases=[0, 0, 0, 0],
+ ),
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000],
+ tx_main_nco_frequencies=[750000000, 750000000, 750000000, 750000000],
+ rx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567],
+ tx_channel_nco_frequencies=[-1234567, -1234567, -1234567, -1234567],
+ rx_main_nco_phases=[0, 0, 0, 0],
+ tx_main_nco_phases=[0, 0, 0, 0],
+ rx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_phases=[0, 0, 0, 0],
+ ),
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ rx_channel_nco_frequencies=[0, 0, 0, 0],
+ tx_channel_nco_frequencies=[0, 0, 0, 0],
+ rx_main_nco_phases=[0, 0, 0, 0],
+ tx_main_nco_phases=[0, 0, 0, 0],
+ rx_channel_nco_phases=[0, 0, 0, 0],
+ tx_channel_nco_phases=[0, 0, 0, 0],
+ ),
+ ],
+ )
+ def test_ad9081_iq_loopback(test_iq_loopback, iio_uri, classname, channel, param_set):
+ param_set = scale_field(param_set, iio_uri)
+> test_iq_loopback(iio_uri, classname, channel, param_set)
+
+test/test_ad9081.py:317:
+_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
+
+uri = 'ip:192.168.10.114', classname = 'adi.ad9081', channel = 0
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_channel_nco_phases': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], ...}
+use_tx2 = False, use_rx2 = False
+
+ def cw_loopback(uri, classname, channel, param_set, use_tx2=False, use_rx2=False):
+ """cw_loopback: Test CW loopback with connected loopback cables.
+ This test requires a devices with TX and RX onboard where the transmit
+ signal can be recovered. Sinuoidal data is passed to DMAs which is then
+ estimated on the RX side. The receive tone must be within
+ 1% of its expected frequency at the max peak found
+
+ parameters:
+ uri: type=string
+ URI of IIO context of target board/system
+ classname: type=string
+ Name of pyadi interface class which contain attribute
+ channel: type=list
+ List of integers or list of list of integers of channels to
+ enable through tx_enabled_channels
+ param_set: type=dict
+ Dictionary of attribute and values to be set before tone is
+ generated and received
+ use_tx2: type=bool
+ Boolean if set will use tx2() as tx method
+ use_rx2: type=bool
+ Boolean if set will use rx2() as rx method
+ """
+ # See if we can tone using DMAs
+ sdr = eval(classname + "(uri='" + uri + "')")
+ # Set custom device parameters
+ for p in param_set.keys():
+ setattr(sdr, p, param_set[p])
+ time.sleep(1)
+ # Verify still set
+ for p in param_set.keys():
+ if isinstance(param_set[p], str):
+ assert getattr(sdr, p) == param_set[p]
+ else:
+ assert (
+ np.argmax(np.abs(np.array(getattr(sdr, p)) - np.array(param_set[p])))
+ < 4
+ )
+ # Set common buffer settings
+ N = 2 ** 14
+ if use_tx2:
+ sdr.tx2_cyclic_buffer = True
+ sdr.tx2_enabled_channels = [channel]
+ sdr.tx2_buffer_size = N * 2 * len(sdr.tx2_enabled_channels)
+ else:
+ sdr.tx_cyclic_buffer = True
+ sdr.tx_enabled_channels = [channel]
+ sdr.tx_buffer_size = N * 2 * len(sdr.tx_enabled_channels)
+
+ if use_rx2:
+ sdr.rx2_enabled_channels = [channel]
+ sdr.rx2_buffer_size = N * 2 * len(sdr.rx2_enabled_channels)
+ else:
+ sdr.rx_enabled_channels = [channel]
+ sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels)
+
+ # Create a sinewave waveform
+ if hasattr(sdr, "sample_rate"):
+ RXFS = int(sdr.sample_rate)
+ elif hasattr(sdr, "rx_sample_rate"):
+ RXFS = int(sdr.rx_sample_rate)
+ else:
+ """no sample_rate nor rx_sample_rate. Let's try something like
+ rx($channel)_sample_rate"""
+ attr = "rx" + str(channel) + "_sample_rate"
+ RXFS = int(getattr(sdr, attr))
+
+ A = 2 ** 15
+ if hasattr(sdr, "tx_sample_rate"):
+ FS = int(sdr.tx_sample_rate)
+ else:
+ FS = RXFS
+ fc = FS * 0.1
+ fc = int(fc / (FS / N)) * (FS / N)
+
+ ts = 1 / float(FS)
+ t = np.arange(0, N * ts, ts)
+ if sdr._complex_data:
+ i = np.cos(2 * np.pi * t * fc) * A * 0.5
+ q = np.sin(2 * np.pi * t * fc) * A * 0.5
+ cw = i + 1j * q
+ else:
+ cw = np.cos(2 * np.pi * t * fc) * A * 1
+
+ # Pass through SDR
+ try:
+ if use_tx2:
+ sdr.tx2(cw)
+ else:
+ sdr.tx(cw)
+ for _ in range(30): # Wait to stabilize
+ data = sdr.rx2() if use_rx2 else sdr.rx()
+ except Exception as e:
+ del sdr
+ raise Exception(e)
+ del sdr
+ # tone_freq = freq_est(data, RXFS)
+ # diff = np.abs(tone_freq - fc)
+ # print("Peak: @"+str(tone_freq) )
+ # assert (fc * 0.01) > diff
+
+ tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=A, plot=False)
+ indx = np.argmax(tone_peaks)
+ diff = np.abs(tone_freqs[indx] - fc)
+ s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx])
+ print(s)
+
+ if do_html_log:
+ pytest.data_log = {
+ "html": gen_line_plot_html(
+ tone_freqs,
+ tone_peaks,
+ "Frequency (Hz)",
+ "Amplitude (dBFS)",
+ "{} ({})".format(s, classname),
+ )
+ }
+
+> assert (fc * 0.01) > diff
+E assert (149963378.90625 * 0.01) > 590194702.1484375
+
+test/dma_tests.py:648: AssertionErrortest_tone_loopback = <function nco_loopback at 0x7bb6a9ca9700>
+iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081'
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], 'tx_channel_nco_frequencies': [0, 0], ...}
+channel = 0, frequency = 10000000, peak_min = -30
+
+ @pytest.mark.iio_hardware(hardware, True)
+ @pytest.mark.parametrize("classname", [(classname)])
+ @pytest.mark.parametrize("channel", [0])
+ @pytest.mark.parametrize("frequency", [10000000])
+ @pytest.mark.parametrize(
+ "param_set",
+ [
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ tx_main_nco_frequencies=[1010000000, 1010000000, 1010000000, 1010000000],
+ rx_channel_nco_frequencies=[0, 0, 0, 0],
+ tx_channel_nco_frequencies=[0, 0, 0, 0],
+ tx_main_nco_test_tone_scales=[0.5, 0.5, 0.5, 0.5],
+ tx_main_nco_test_tone_en=[1, 1, 1, 1],
+ tx_channel_nco_test_tone_en=[0, 0, 0, 0],
+ ),
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ rx_channel_nco_frequencies=[0, 0, 0, 0],
+ tx_channel_nco_frequencies=[10000000, 10000000, 10000000, 10000000],
+ tx_channel_nco_test_tone_scales=[0.5, 0.5, 0.5, 0.5],
+ tx_main_nco_test_tone_en=[0, 0, 0, 0],
+ tx_channel_nco_test_tone_en=[1, 1, 1, 1],
+ ),
+ ],
+ )
+ @pytest.mark.parametrize("peak_min", [-30])
+ def test_ad9081_nco_loopback(
+ test_tone_loopback, iio_uri, classname, param_set, channel, frequency, peak_min,
+ ):
+ param_set = scale_field(param_set, iio_uri)
+> test_tone_loopback(iio_uri, classname, param_set, channel, frequency, peak_min)
+
+test/test_ad9081.py:355:
+_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
+
+uri = 'ip:192.168.10.114', classname = 'adi.ad9081'
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], 'tx_channel_nco_frequencies': [0, 0], ...}
+channel = 0, frequency = 10000000, peak_min = -30
+
+ def nco_loopback(uri, classname, param_set, channel, frequency, peak_min):
+ """nco_loopback: TX/DAC Test tone loopback with connected loopback cables.
+ This test requires a devices with TX and RX onboard where the transmit
+ signal can be recovered. TX/DAC internal NCOs are used to generate a sinusoid
+ which is then estimated on the RX side. The receive tone must be within
+ 1% of its expected frequency with a specified peak
+
+ parameters:
+ uri: type=string
+ URI of IIO context of target board/system
+ classname: type=string
+ Name of pyadi interface class which contain attribute
+ param_set: type=dict
+ Dictionary of attribute and values to be set before tone is
+ generated and received
+ channel: type=list
+ List of integers or list of list of integers of channels to
+ enable through tx_enabled_channels
+ frequency: type=integer
+ Frequency in Hz of transmitted tone
+ peak_min: type=float
+ Minimum acceptable value of maximum peak in dBFS of received tone
+
+ """
+ # See if we can tone using DMAs
+ sdr = eval(classname + "(uri='" + uri + "')")
+ # Set custom device parameters
+ for p in param_set.keys():
+ setattr(sdr, p, param_set[p])
+
+ N = 2 ** 14
+ sdr.rx_enabled_channels = [channel]
+ sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels)
+ # Create a sinewave waveform
+ if hasattr(sdr, "sample_rate"):
+ RXFS = int(sdr.sample_rate)
+ elif hasattr(sdr, "rx_sample_rate"):
+ RXFS = int(sdr.rx_sample_rate)
+ else:
+ """no sample_rate nor rx_sample_rate. Let's try something like
+ rx($channel)_sample_rate"""
+ attr = "rx" + str(channel) + "_sample_rate"
+ RXFS = int(getattr(sdr, attr))
+
+ # Pass through SDR
+ try:
+ for _ in range(10): # Wait
+ data = sdr.rx()
+ except Exception as e:
+ del sdr
+ raise Exception(e)
+ del sdr
+ tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=2 ** 15)
+ indx = np.argmax(tone_peaks)
+ diff = np.abs(tone_freqs[indx] - frequency)
+ s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx])
+ print(s)
+ if do_html_log:
+ pytest.data_log = {
+ "html": gen_line_plot_html(
+ tone_freqs,
+ tone_peaks,
+ "Frequency (Hz)",
+ "Amplitude (dBFS)",
+ "{} ({})".format(s, classname),
+ )
+ }
+
+> assert (frequency * 0.01) > diff
+E assert (10000000 * 0.01) > 538350830.078125
+
+test/dma_tests.py:526: AssertionErrortest_tone_loopback = <function nco_loopback at 0x7bb6a9ca9700>
+iio_uri = 'ip:192.168.10.114', classname = 'adi.ad9081'
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], 'tx_channel_nco_frequencies': [10000000, 10000000], ...}
+channel = 0, frequency = 10000000, peak_min = -30
+
+ @pytest.mark.iio_hardware(hardware, True)
+ @pytest.mark.parametrize("classname", [(classname)])
+ @pytest.mark.parametrize("channel", [0])
+ @pytest.mark.parametrize("frequency", [10000000])
+ @pytest.mark.parametrize(
+ "param_set",
+ [
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ tx_main_nco_frequencies=[1010000000, 1010000000, 1010000000, 1010000000],
+ rx_channel_nco_frequencies=[0, 0, 0, 0],
+ tx_channel_nco_frequencies=[0, 0, 0, 0],
+ tx_main_nco_test_tone_scales=[0.5, 0.5, 0.5, 0.5],
+ tx_main_nco_test_tone_en=[1, 1, 1, 1],
+ tx_channel_nco_test_tone_en=[0, 0, 0, 0],
+ ),
+ dict(
+ loopback_mode=0,
+ rx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ tx_main_nco_frequencies=[1000000000, 1000000000, 1000000000, 1000000000],
+ rx_channel_nco_frequencies=[0, 0, 0, 0],
+ tx_channel_nco_frequencies=[10000000, 10000000, 10000000, 10000000],
+ tx_channel_nco_test_tone_scales=[0.5, 0.5, 0.5, 0.5],
+ tx_main_nco_test_tone_en=[0, 0, 0, 0],
+ tx_channel_nco_test_tone_en=[1, 1, 1, 1],
+ ),
+ ],
+ )
+ @pytest.mark.parametrize("peak_min", [-30])
+ def test_ad9081_nco_loopback(
+ test_tone_loopback, iio_uri, classname, param_set, channel, frequency, peak_min,
+ ):
+ param_set = scale_field(param_set, iio_uri)
+> test_tone_loopback(iio_uri, classname, param_set, channel, frequency, peak_min)
+
+test/test_ad9081.py:355:
+_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
+
+uri = 'ip:192.168.10.114', classname = 'adi.ad9081'
+param_set = {'loopback_mode': 0, 'rx_channel_nco_frequencies': [0, 0], 'rx_main_nco_frequencies': [1000000000, 1000000000], 'tx_channel_nco_frequencies': [10000000, 10000000], ...}
+channel = 0, frequency = 10000000, peak_min = -30
+
+ def nco_loopback(uri, classname, param_set, channel, frequency, peak_min):
+ """nco_loopback: TX/DAC Test tone loopback with connected loopback cables.
+ This test requires a devices with TX and RX onboard where the transmit
+ signal can be recovered. TX/DAC internal NCOs are used to generate a sinusoid
+ which is then estimated on the RX side. The receive tone must be within
+ 1% of its expected frequency with a specified peak
+
+ parameters:
+ uri: type=string
+ URI of IIO context of target board/system
+ classname: type=string
+ Name of pyadi interface class which contain attribute
+ param_set: type=dict
+ Dictionary of attribute and values to be set before tone is
+ generated and received
+ channel: type=list
+ List of integers or list of list of integers of channels to
+ enable through tx_enabled_channels
+ frequency: type=integer
+ Frequency in Hz of transmitted tone
+ peak_min: type=float
+ Minimum acceptable value of maximum peak in dBFS of received tone
+
+ """
+ # See if we can tone using DMAs
+ sdr = eval(classname + "(uri='" + uri + "')")
+ # Set custom device parameters
+ for p in param_set.keys():
+ setattr(sdr, p, param_set[p])
+
+ N = 2 ** 14
+ sdr.rx_enabled_channels = [channel]
+ sdr.rx_buffer_size = N * 2 * len(sdr.rx_enabled_channels)
+ # Create a sinewave waveform
+ if hasattr(sdr, "sample_rate"):
+ RXFS = int(sdr.sample_rate)
+ elif hasattr(sdr, "rx_sample_rate"):
+ RXFS = int(sdr.rx_sample_rate)
+ else:
+ """no sample_rate nor rx_sample_rate. Let's try something like
+ rx($channel)_sample_rate"""
+ attr = "rx" + str(channel) + "_sample_rate"
+ RXFS = int(getattr(sdr, attr))
+
+ # Pass through SDR
+ try:
+ for _ in range(10): # Wait
+ data = sdr.rx()
+ except Exception as e:
+ del sdr
+ raise Exception(e)
+ del sdr
+ tone_peaks, tone_freqs = spec.spec_est(data, fs=RXFS, ref=2 ** 15)
+ indx = np.argmax(tone_peaks)
+ diff = np.abs(tone_freqs[indx] - frequency)
+ s = "Peak: " + str(tone_peaks[indx]) + "@" + str(tone_freqs[indx])
+ print(s)
+ if do_html_log:
+ pytest.data_log = {
+ "html": gen_line_plot_html(
+ tone_freqs,
+ tone_peaks,
+ "Frequency (Hz)",
+ "Amplitude (dBFS)",
+ "{} ({})".format(s, classname),
+ )
+ }
+
+> assert (frequency * 0.01) > diff
+E assert (10000000 * 0.01) > 484472045.8984375
+
+test/dma_tests.py:526: AssertionError/var/lib/jenkins/workspace/cct/HW_test_hdl@4/pyadi-iio/test/test_all_inits.py:47: No required hardware found
\ No newline at end of file
diff --git a/tests/test_gparser.py b/tests/test_gparser.py
index e564c45..8be5eea 100644
--- a/tests/test_gparser.py
+++ b/tests/test_gparser.py
@@ -50,6 +50,7 @@ def test_get_parser(artifact, parser_object, parser_type):
('zynq-zed-adv7511-adrv9002-rx2tx2-vcmos_missing_devs.log', \
telemetry.parser.MissingDevs, 'missing_devs'),
('info.txt', telemetry.parser.InfoTxt, 'info_txt'),
+ ('zynqmp_zcu102_rev10_ad9081_vm4_l8_reports.xml', telemetry.parser.PytestFailure, 'pytest_failure'),
]
)
def test_parser(artifact, parser_object, parser_type):