Skip to content

Commit c930d21

Browse files
committed
Updated gateway operational example and attribute response processing
1 parent f8f13f7 commit c930d21

File tree

2 files changed

+117
-102
lines changed

2 files changed

+117
-102
lines changed

examples/gateway/operational_example.py

Lines changed: 113 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -15,148 +15,162 @@
1515
import asyncio
1616
import logging
1717
import signal
18-
from datetime import datetime, UTC
19-
from random import randint, uniform
18+
from random import uniform, randint
19+
from time import time
2020

2121
from tb_mqtt_client.common.config_loader import GatewayConfig
22+
from tb_mqtt_client.common.logging_utils import configure_logging, get_logger
2223
from tb_mqtt_client.entities.data.attribute_entry import AttributeEntry
23-
from tb_mqtt_client.entities.data.timeseries_entry import TimeseriesEntry
2424
from tb_mqtt_client.entities.data.attribute_update import AttributeUpdate
25+
from tb_mqtt_client.entities.data.requested_attribute_response import RequestedAttributeResponse
26+
from tb_mqtt_client.entities.data.timeseries_entry import TimeseriesEntry
27+
from tb_mqtt_client.entities.gateway.gateway_attribute_request import GatewayAttributeRequest
28+
from tb_mqtt_client.entities.gateway.gateway_rpc_request import GatewayRPCRequest
29+
from tb_mqtt_client.entities.gateway.gateway_rpc_response import GatewayRPCResponse
2530
from tb_mqtt_client.service.gateway.client import GatewayClient
26-
from tb_mqtt_client.common.logging_utils import configure_logging, get_logger
31+
from tb_mqtt_client.service.gateway.device_session import DeviceSession
2732

33+
# ---- Logging Setup ----
2834
configure_logging()
2935
logger = get_logger(__name__)
3036
logger.setLevel(logging.DEBUG)
3137
logging.getLogger("tb_mqtt_client").setLevel(logging.DEBUG)
3238

39+
# ---- Constants ----
40+
GATEWAY_HOST = "localhost"
41+
GATEWAY_PORT = 1883 # Default MQTT port, change if needed
42+
GATEWAY_ACCESS_TOKEN = "YOUR_ACCESS_TOKEN" # Replace with your actual access token
43+
44+
DELAY_BETWEEN_DATA_PUBLISH = 1 # seconds
45+
46+
47+
# ---- Callbacks ----
48+
async def attribute_update_handler(device_session: DeviceSession, update: AttributeUpdate):
49+
logger.info("Received attribute update for %s: %s",
50+
device_session.device_info.device_name, update.entries)
51+
52+
53+
async def requested_attributes_handler(device_session: DeviceSession, response: RequestedAttributeResponse):
54+
logger.info("Requested attributes for %s -> client: %r, shared: %r",
55+
device_session.device_info.device_name,
56+
response.client,
57+
response.shared)
58+
3359

34-
async def device_attribute_update_callback(update: AttributeUpdate):
35-
"""
36-
Callback function to handle device attribute updates.
37-
:param update: The attribute update object.
38-
"""
39-
logger.info("Received attribute update for device %s: %s", update.device, update.attributes)
40-
41-
42-
async def device_rpc_request_callback(device_name: str, method: str, params: dict):
43-
"""
44-
Callback function to handle device RPC requests.
45-
:param device_name: Name of the device
46-
:param method: RPC method
47-
:param params: RPC parameters
48-
:return: RPC response
49-
"""
50-
logger.info("Received RPC request for device %s: method=%s, params=%s", device_name, method, params)
51-
52-
# Example response based on method
53-
if method == "getTemperature":
54-
return {"temperature": round(uniform(20.0, 30.0), 2)}
55-
elif method == "setLedState":
56-
state = params.get("state", False)
57-
return {"success": True, "state": state}
58-
else:
59-
return {"error": f"Unsupported method: {method}"}
60-
61-
62-
async def device_disconnect_callback(device_name: str):
63-
"""
64-
Callback function to handle device disconnections.
65-
:param device_name: Name of the disconnected device
66-
"""
67-
logger.info("Device %s disconnected", device_name)
60+
async def device_rpc_request_handler(device_session: DeviceSession, rpc_request: GatewayRPCRequest) -> GatewayRPCResponse:
61+
logger.info("Received RPC request for %s: %r", device_session.device_info.device_name, rpc_request)
62+
response_data = {
63+
"status": "success",
64+
"echo_method": rpc_request.method,
65+
"params": rpc_request.params
66+
}
67+
return GatewayRPCResponse.build(device_session.device_info.device_name, rpc_request.request_id, response_data)
6868

6969

70+
# ---- Main ----
7071
async def main():
7172
stop_event = asyncio.Event()
7273

7374
def _shutdown_handler():
7475
stop_event.set()
76+
asyncio.create_task(client.stop())
7577

7678
loop = asyncio.get_running_loop()
7779
for sig in (signal.SIGINT, signal.SIGTERM):
7880
try:
7981
loop.add_signal_handler(sig, _shutdown_handler)
8082
except NotImplementedError:
81-
# Windows compatibility fallback
8283
signal.signal(sig, lambda *_: _shutdown_handler())
8384

85+
# ---- Gateway Config ----
8486
config = GatewayConfig()
85-
config.host = "localhost"
86-
config.access_token = "YOUR_GATEWAY_ACCESS_TOKEN"
87+
config.host = GATEWAY_HOST
88+
config.port = GATEWAY_PORT
89+
config.access_token = GATEWAY_ACCESS_TOKEN
8790

91+
global client
8892
client = GatewayClient(config)
89-
client.set_device_attribute_update_callback(device_attribute_update_callback)
90-
client.set_device_rpc_request_callback(device_rpc_request_callback)
91-
client.set_device_disconnect_callback(device_disconnect_callback)
93+
9294
await client.connect()
95+
logger.info("Gateway connected to ThingsBoard.")
96+
97+
# ---- Register Devices ----
98+
devices = [
99+
("Test Device A1", "Test devices"),
100+
("Test Device B1", "Test devices")
101+
]
102+
sessions = {}
103+
104+
for name, profile in devices:
105+
session, _ = await client.connect_device(name, profile, wait_for_publish=True)
106+
if not session:
107+
logger.error("Failed to connect %s", name)
108+
continue
109+
sessions[name] = session
110+
logger.info("Device connected: %s", name)
111+
112+
# Register callbacks for each device
113+
client.device_manager.set_attribute_update_callback(session.device_info.device_id, attribute_update_handler)
114+
client.device_manager.set_attribute_response_callback(session.device_info.device_id, requested_attributes_handler)
115+
client.device_manager.set_rpc_request_callback(session.device_info.device_id, device_rpc_request_handler)
116+
117+
# ---- Main loop ----
118+
while not stop_event.is_set():
119+
iteration_start = time()
93120

94-
logger.info("Connected to ThingsBoard as gateway.")
121+
for device_name, session in sessions.items():
122+
logger.info("Publishing data for %s", device_name)
95123

96-
# Connect devices to the gateway
97-
device_names = ["sensor-1", "sensor-2", "actuator-1"]
98-
for device_name in device_names:
99-
await client.gw_connect_device(device_name)
100-
logger.info("Connected device: %s", device_name)
124+
# --- Attributes ---
125+
raw_attrs = {"firmwareVersion": "2.0.0", "location": "office"}
126+
await client.send_device_attributes(session, raw_attrs, wait_for_publish=True)
101127

102-
while not stop_event.is_set():
103-
# Send device attributes
104-
for device_name in device_names:
105-
# Send device attributes
106-
attributes = {
107-
"firmwareVersion": "1.0.4",
108-
"serialNumber": f"SN-{randint(1000, 9999)}",
109-
"deviceType": "sensor" if "sensor" in device_name else "actuator"
110-
}
111-
await client.gw_send_attributes(device_name, attributes)
112-
logger.info("Sent attributes for device %s: %s", device_name, attributes)
113-
114-
# Send single attribute
115-
single_attr = AttributeEntry("lastUpdateTime", datetime.now(UTC).isoformat())
116-
await client.gw_send_attributes(device_name, single_attr)
117-
logger.info("Sent single attribute for device %s: %s", device_name, single_attr)
118-
119-
# Send device telemetry
120-
if "sensor" in device_name:
121-
# For sensor devices
122-
telemetry = {
123-
"temperature": round(uniform(20.0, 30.0), 2),
124-
"humidity": round(uniform(40.0, 80.0), 2),
125-
"batteryLevel": randint(1, 100)
126-
}
127-
await client.gw_send_telemetry(device_name, telemetry)
128-
logger.info("Sent telemetry for device %s: %s", device_name, telemetry)
129-
130-
# Send single telemetry entry
131-
single_entry = TimeseriesEntry("signalStrength", randint(-90, -30))
132-
await client.gw_send_telemetry(device_name, single_entry)
133-
logger.info("Sent single telemetry entry for device %s: %s", device_name, single_entry)
134-
else:
135-
# For actuator devices
136-
telemetry = {
137-
"state": "ON" if randint(0, 1) == 1 else "OFF",
138-
"powerConsumption": round(uniform(0.1, 5.0), 2),
139-
"uptime": randint(1, 1000)
140-
}
141-
await client.gw_send_telemetry(device_name, telemetry)
142-
logger.info("Sent telemetry for device %s: %s", device_name, telemetry)
128+
single_attr = AttributeEntry("mode", "auto")
129+
await client.send_device_attributes(session, single_attr, wait_for_publish=True)
130+
131+
multi_attrs = [
132+
AttributeEntry("maxTemp", randint(60, 90)),
133+
AttributeEntry("calibrated", True)
134+
]
135+
await client.send_device_attributes(session, multi_attrs, wait_for_publish=True)
136+
137+
# --- Telemetry ---
138+
raw_ts = {"temperature": round(uniform(20, 30), 2), "humidity": randint(40, 80)}
139+
await client.send_device_timeseries(session, raw_ts, wait_for_publish=True)
143140

141+
list_ts = [
142+
{"ts": int(time() * 1000), "values": {"temperature": 25.5}},
143+
{"ts": int(time() * 1000) - 1000, "values": {"humidity": 65}}
144+
]
145+
await client.send_device_timeseries(session, list_ts, wait_for_publish=True)
146+
147+
ts_entries = [TimeseriesEntry(f"temp_{i}", randint(0, 50)) for i in range(5)]
148+
await client.send_device_timeseries(session, ts_entries, wait_for_publish=True)
149+
150+
# --- Attribute Request ---
151+
attr_req = await GatewayAttributeRequest.build(
152+
device_session=session,
153+
client_keys=["firmwareVersion", "mode"]
154+
)
155+
await client.send_device_attributes_request(session, attr_req, wait_for_publish=True)
156+
157+
# ---- Delay handling ----
144158
try:
145-
await asyncio.wait_for(stop_event.wait(), timeout=5)
159+
timeout = DELAY_BETWEEN_DATA_PUBLISH - (time() - iteration_start)
160+
if timeout > 0:
161+
await asyncio.wait_for(stop_event.wait(), timeout=timeout)
146162
except asyncio.TimeoutError:
147163
pass
148164

149-
# Disconnect devices before shutting down
150-
for device_name in device_names:
151-
await client.gw_disconnect_device(device_name)
152-
logger.info("Disconnected device: %s", device_name)
153-
165+
# ---- Disconnect devices ----
166+
for session in sessions.values():
167+
await client.disconnect_device(session, wait_for_publish=True)
154168
await client.disconnect()
155-
logger.info("Disconnected from ThingsBoard.")
169+
logger.info("Gateway disconnected cleanly.")
156170

157171

158172
if __name__ == "__main__":
159173
try:
160174
asyncio.run(main())
161175
except KeyboardInterrupt:
162-
print("Interrupted by user.")
176+
logger.info("Interrupted by user.")

tb_mqtt_client/service/gateway/message_adapter.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,11 @@ def parse_gateway_requested_attribute_response(self,
321321
and not gateway_attribute_request.client_keys))):
322322
# TODO: Skipping case when requested several attributes, but only one is returned, issue on the platform
323323
logger.warning("Received gateway attribute response with single key, but multiply keys expected. "
324-
"Request keys: %s, Response keys: %s",
325-
list(*gateway_attribute_request.client_keys, *gateway_attribute_request.shared_keys),
324+
"Request keys: %s, Response value: %r",
325+
gateway_attribute_request.client_keys + gateway_attribute_request.shared_keys,
326326
data['value'])
327-
return None
327+
client = []
328+
shared = []
328329
elif 'value' in data:
329330
if not client_keys_empty and len(gateway_attribute_request.client_keys) == 1:
330331
client = [AttributeEntry(gateway_attribute_request.client_keys[0], data['value'])]

0 commit comments

Comments
 (0)