Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
build/
.cache/
__pycache__/
.env/
.venv/
1 change: 1 addition & 0 deletions client/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.13
9 changes: 9 additions & 0 deletions client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Botzo SDK

Client Library for Botzo Quadruped Robot

This is a client library targetted to be used by Botzo. However, this library
can be expanded to be used by other robots that use serial communication or
any other interface that is supported.

For now, only Serial communication is supported.
32 changes: 32 additions & 0 deletions client/botzo_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Client Library for Botzo Quadruped Robot

This is a client library targetted to be used by Botzo. However, this library
can be expanded to be used by other robots that use serial communication or
any other interface that is supported.

For now, only Serial communication is supported.
"""

from botzo_sdk.clients import BotzoClient
from botzo_sdk.interfaces import SerialInterface
from botzo_sdk.logger import get_logger, logger_manager
from botzo_sdk.protocol import CMDType, Packet, Protocol

# Version
__version__ = "0.1.0"

# Configure default logging (can be overridden by users)
import logging

logger_manager.configure(level=logging.INFO, use_colors=True)

__all__ = [
# Core classes
"BotzoClient",
"get_logger",
"SerialInterface",
"Protocol",
"Packet",
"CMDType",
"__version__",
]
3 changes: 3 additions & 0 deletions client/botzo_sdk/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .botzo_client import BotzoClient

__all__ = ["BotzoClient"]
51 changes: 51 additions & 0 deletions client/botzo_sdk/clients/botzo_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from botzo_sdk.interfaces import SerialInterface
from botzo_sdk.logger import get_logger
from botzo_sdk.protocol import Protocol
from botzo_sdk.protocol.types import CMDType

# TODO: What is still missing
# - [ ] Make the client take a configuration
# - [ ] Add reconnect attempts on connect method
# - [ ] Add possibility to register callbacks/events
# - [ ] Add new methods for new functionality as needed


class BotzoClient:
def __init__(self, port: str, baudrate: int):
self._interface = SerialInterface(port, baudrate)
self._protocol = Protocol()
self._logger = get_logger("Client")

def print_data(self, packet):
print(packet)
print(self._protocol.parse_imu_data(packet))

async def connect(self) -> bool:
if not await self._interface.connect():
return False

return await self.is_connected()

async def disconnect(self):
await self.stop_imu_streaming()
await self._interface.disconnect()

async def is_connected(self) -> bool:
packet = self._protocol.create_ping()
resp = await self._interface.send_and_wait(packet, CMDType.ACK)
return resp is not None

async def set_gpio(self, pin: int, state: bool) -> bool:
packet = self._protocol.create_set_gpio(pin, state)
resp = await self._interface.send_and_wait(packet, CMDType.ACK)
return resp is not None

async def start_imu_streaming(self) -> bool:
packet = self._protocol.create_start_imu_stream(100)
resp = await self._interface.send_and_wait(packet, CMDType.ACK)
return resp is not None

async def stop_imu_streaming(self) -> bool:
packet = self._protocol.create_stop_imu_stream()
resp = await self._interface.send_and_wait(packet, CMDType.ACK)
return resp is not None
3 changes: 3 additions & 0 deletions client/botzo_sdk/interfaces/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .serial_interface import SerialInterface

__all__ = ["SerialInterface"]
112 changes: 112 additions & 0 deletions client/botzo_sdk/interfaces/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import logging
from abc import ABC, abstractmethod
from typing import Callable, Optional

from botzo_sdk.protocol.packet import Packet
from botzo_sdk.protocol.types import CMDType


class Interface(ABC):
"""Abstract base class for MCU communication interfaces.

This class defines the standard interface for communicating with a
microcontroller unit (MCU) through various protocols such as serial,
UDP, TCP, or others.

All concrete implementations must provide their own connection handling,
packet transmission, and event callback mechanisms.

Attributes:
_logger (logging.Logger): Logger instance for interface operations.
"""

def __init__(self, logger: logging.Logger):
"""Initialize the interface with a logger.

Args:
logger (logging.Logger): Logger instance for recording interface
operations and debugging information.
"""
self._logger = logger

@abstractmethod
def connect(self):
"""Establish connection with the target MCU.

This method must be called before any communication operations
(send, send_and_wait, etc.) can be performed.

Raises:
NotImplementedError: This method must be implemented by subclasses.
"""
raise NotImplementedError

@abstractmethod
def disconnect(self):
"""Terminate the connection with the MCU.

Releases any resources held by the interface and closes the
communication channel.

Raises:
NotImplementedError: This method must be implemented by subclasses.
"""
raise NotImplementedError

@abstractmethod
async def send(self, packet: Packet) -> bool:
"""Send a packet to the MCU asynchronously.

Args:
packet (Packet): The packet to transmit to the MCU.

Returns:
bool: True if the packet was sent successfully, False otherwise.

Raises:
NotImplementedError: This method must be implemented by subclasses.
"""
raise NotImplementedError

@abstractmethod
async def send_and_wait(
self, packet: Packet, expected_response: CMDType, timeout: float = 1.0
) -> Optional[Packet]:
"""Send a packet and wait for a specific response from the MCU.

This method transmits a packet and blocks until either the expected
response is received or the timeout period expires.

Args:
packet (Packet): The packet to transmit to the MCU.
expected_response (CmdType): The command type of the expected
response packet (e.g., acknowledgment).
timeout (float, optional): Maximum time in seconds to wait for
the response. Defaults to 1.0.

Returns:
Optional[Packet]: The received response packet if it arrives
within the timeout period, None otherwise.

Raises:
NotImplementedError: This method must be implemented by subclasses.
"""
raise NotImplementedError

@abstractmethod
def on(self, cmd_type: CMDType, callback: Callable[[Packet], None]):
"""Register a callback for incoming packets of a specific command type.

Sets up an event handler that triggers when a packet with the
specified command type is received from the MCU.

Args:
cmd_type (CmdType): The command type to listen for.
callback (Callable): The function to call when a matching packet
is received. The callback should accept the received packet
as its argument.

Raises:
NotImplementedError: This method must be implemented by subclasses.
"""
raise NotImplementedError
Loading