Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions adi/ad9084.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX short identifier: ADIBSD

import os
from typing import Dict, List

from adi.adrv9002 import rx1, rx2, tx1, tx2
Expand All @@ -10,6 +11,13 @@
from adi.rx_tx import rx_tx
from adi.sync_start import sync_start, sync_start_b

from .ad9088_cal_dump import validate_data

try:
from .sshfs import sshfs
except ImportError:
sshfs = None


def _map_to_dict(paths, ch):
if ch.attrs["label"].value == "buffer_only":
Expand Down Expand Up @@ -82,6 +90,8 @@ def __init__(
rx2_device_name="axi-ad9084b-rx-b",
tx1_device_name="axi-ad9084-tx-hpc",
tx2_device_name="axi-ad9084-tx-b",
ssh_username="root",
ssh_password="analog",
):
"""Create a new instance of the AD9084 MxFE

Expand All @@ -104,6 +114,9 @@ def __init__(
self._tx_fine_duc_channel_names = []
self._dds_channel_names = []

self._ssh_username = ssh_username
self._ssh_password = ssh_password

context_manager.__init__(self, uri, self._device_name)
# Default device for attribute writes
self._ctrl = self._ctx.find_device(rx1_device_name)
Expand Down Expand Up @@ -583,3 +596,77 @@ def chip_version(self):
def api_version(self):
"""api_version: API version"""
return self._get_iio_debug_attr_str("api_version", self._rxadc)

def save_calibrations(self, filename="calibration_data.bin"):
"""save_calibrations: Save ADC and DAC JESD calibrations to flash

Args:
filename: Name of file to save calibration data to

Raises:
RuntimeError: Calibration data not found on device
RuntimeError: Calibration data CRC check failed
"""
# IIO Method (does not work)
# buf = iio.create_string_buffer(2**25)
# _name_ascii = "calibration_data".encode("ascii")
# data_length = iio._d_read_attr(self._rxadc._device, _name_ascii, buf, 2**25)
# data = buf.raw[0:data_length]
# with open(filename, "wb") as f:
# f.write(data)

# SSH Method
if "ip:" not in self.uri:
raise RuntimeError("Calibration save/load only supported over IP")
if sshfs is None:
raise RuntimeError("sshfs module not available")
ssh = sshfs(self.uri.split("ip:")[1], self._ssh_username, self._ssh_password)
path = f"/sys/bus/iio/devices/{self._rxadc.id}/calibration_data"
if not ssh.isfile(path):
raise RuntimeError("Calibration data not found on device")
cal_data = ssh.ssh.exec_command(
f"cat /sys/bus/iio/devices/{self._rxadc.id}/calibration_data"
)[1].read()

ret = validate_data(cal_data)
if ret != 0:
raise RuntimeError("Calibration data CRC check failed")

with open(filename, "wb") as f:
f.write(cal_data)

def load_calibrations(
self,
filename="calibration_data.bin",
target_path="/lib/firmware/",
load_now=False,
):
"""load_calibrations: Load ADC and DAC JESD calibrations from flash

Args:
filename: Name of file to load calibration data from
target_path: Target path on device to copy calibration file to
load_now: If true instruct driver to load calibration immediately

Raises:
RuntimeError: Calibration file not found
RuntimeError: Calibration save/load only supported over IP
"""
# Check file exists
if not os.path.isfile(filename):
raise RuntimeError("Calibration file not found")
# Copy file to device
if "ip:" not in self.uri:
raise RuntimeError("Calibration save/load only supported over IP")
if sshfs is None:
raise RuntimeError("sshfs module not available")
ssh = sshfs(self.uri.split("ip:")[1], self._ssh_username, self._ssh_password)
remote_path = target_path + os.path.basename(filename)
ftp_client = ssh.ssh.open_sftp()
ftp_client.put(filename, remote_path)
ftp_client.close()
# Instruct driver to load calibration
if load_now:
ssh.ssh.exec_command(
f'echo "{remote_path}" > /sys/bus/iio/devices/{self._rxadc.id}/calibration_file'
)
Loading
Loading