Skip to content
10 changes: 9 additions & 1 deletion iotconnect-sdk/include/iotconnect.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ typedef enum {
// TODO: Sync statuses etc.
} IotconnectConnectionStatus;

typedef enum {
MSG_SEND_SUCCESS, //Received PUBACK for the respective message.
MSG_SEND_TIMEOUT, //Message drop from queue due to maximum retry with PUBACK timeout.
MSG_SEND_FAILED, //Message drop from queue due to disconnection.
} IotconnectMsgSendStatus;

typedef void (*IotConnectMsgSendStatusCallback)(uint32_t msg_id, IotconnectMsgSendStatus status);

typedef void (*IotConnectStatusCallback)(IotconnectConnectionStatus data);

Expand All @@ -37,6 +44,7 @@ typedef struct {
IotclCommandCallback cmd_cb; // callback for command events.
IotclMessageCallback msg_cb; // callback for ALL messages, including the specific ones like cmd or ota callback.
IotConnectStatusCallback status_cb; // callback for connection status
IotConnectMsgSendStatusCallback msg_send_status_cb; // callback for MQTT message send status.
} IotconnectClientConfig;


Expand All @@ -48,7 +56,7 @@ bool iotconnect_sdk_is_connected();

IotclConfig *iotconnect_sdk_get_lib_config();

void iotconnect_sdk_send_packet(const char *data);
void iotconnect_sdk_send_packet(const char *data, uint32_t *p_msg_id);

void iotconnect_sdk_loop();

Expand Down
4 changes: 3 additions & 1 deletion iotconnect-sdk/nrf-layer-lib/include/iotconnect_mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ typedef struct {
const char *env; // Environment name. Contact your representative for details. Same as telemetry config.
IotconnectMqttOnDataCallback data_cb; // callback for OTA events.
IotConnectStatusCallback status_cb; // callback for command events.
IotConnectMsgSendStatusCallback msg_send_status_cb; // callback for message send status.
} IotconnectMqttConfig;

bool iotc_nrf_mqtt_init(IotconnectMqttConfig *config, IotclSyncResponse *sr);
Expand All @@ -31,7 +32,8 @@ void iotc_nrf_mqtt_loop();

bool iotc_nrf_mqtt_is_connected();

int iotc_nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos, const uint8_t *data, size_t len);
int iotc_nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos, const uint8_t *data,
size_t len, uint32_t *p_msg_id);

void iotc_nrf_mqtt_abort();

Expand Down
251 changes: 185 additions & 66 deletions iotconnect-sdk/nrf-layer-lib/src/iotconnect_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,21 @@
#include "nrf_cert_store.h"

#define MQTT_PUBACK_TIMEOUT_S 5 //5 secs

SYS_MUTEX_DEFINE(mutex_mqtt_pub);
static unsigned int pending_ack_msg_id = 0;
static bool msg_ack_pending = false;
#define MAXIMUM_MSG_QUEUE 4
#define MAXIMUM_MSG_RESEND 1
typedef struct {
bool in_used;
uint32_t msg_id;
const char *p_pub_topic;
uint8_t *p_msg;
uint32_t msg_len;
uint32_t start_tick;
uint8_t resend_count;
} msg_q_object_t;

static msg_q_object_t msg_queue[MAXIMUM_MSG_QUEUE];
SYS_MUTEX_DEFINE(mutex_msg_q);
static bool msg_queue_initialized = false;

extern struct mqtt_client client;
static IotconnectMqttConfig *config;
Expand Down Expand Up @@ -52,6 +63,125 @@ static int fds_init(struct mqtt_client *c);

static void mqtt_evt_handler(struct mqtt_client *const c, const struct mqtt_evt *evt);

static int nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos,
const uint8_t *data, size_t len, uint32_t *p_msg_id);

static void msg_queue_clear(bool invoke_cb) {
if (msg_queue_initialized) {
//mutex lock
sys_mutex_lock(&mutex_msg_q, K_FOREVER);
for (int i=0; i<ARRAY_SIZE(msg_queue); i++) {
if (msg_queue[i].in_used) {
uint32_t msg_id = msg_queue[i].msg_id;
free(msg_queue[i].p_msg);
msg_queue[i].in_used = false;
if (invoke_cb && config->msg_send_status_cb) {
config->msg_send_status_cb(msg_id, MSG_SEND_FAILED);
}
}
}
//printk("msg queue cleared\n");
//mutex unlock
sys_mutex_unlock(&mutex_msg_q);
}
}

static void msg_queue_init(void) {
if (!msg_queue_initialized) {
for (int i=0; i<ARRAY_SIZE(msg_queue); i++) {
msg_queue[i].in_used = false;
}
msg_queue_initialized = true;
//printk("msg queue initialized\n");
}
else {
msg_queue_clear(false);
}
}

static bool msg_queue_add(uint32_t msg_id, const char *p_pub_topic,
uint8_t *p_msg, uint32_t msg_len) {
bool msg_added = false;
if (msg_queue_initialized) {
//mutex lock
sys_mutex_lock(&mutex_msg_q, K_FOREVER);
for (int i=0; i<ARRAY_SIZE(msg_queue); i++) {
if (!msg_queue[i].in_used) {
msg_queue[i].p_msg = (uint8_t *)malloc(msg_len);
if (msg_queue[i].p_msg) {
memcpy(msg_queue[i].p_msg, p_msg, msg_len);
msg_queue[i].msg_len = msg_len;
msg_queue[i].msg_id = msg_id;
msg_queue[i].p_pub_topic = p_pub_topic;
msg_queue[i].start_tick = k_uptime_get_32();
msg_queue[i].resend_count = MAXIMUM_MSG_RESEND;
msg_queue[i].in_used = true;
msg_added = true;
//printk("message added to queue. (msg id: %d)\n", msg_id);
}
else {
printk("ERR: Unable to add message to queue!\n");
}
break;
}
}
//mutex unlock
sys_mutex_unlock(&mutex_msg_q);
}
return msg_added;
}

static void msg_queue_delete(uint32_t msg_id) {
if (msg_queue_initialized) {
//mutex lock
sys_mutex_lock(&mutex_msg_q, K_FOREVER);
for (int i=0; i<ARRAY_SIZE(msg_queue); i++) {
if (msg_queue[i].in_used && (msg_queue[i].msg_id == msg_id)) {
free(msg_queue[i].p_msg);
msg_queue[i].in_used = false;
//printk("message deleted from queue. (msg id: %d)\n", msg_id);
break;
}
}
//mutex unlock
sys_mutex_unlock(&mutex_msg_q);
}
}

static void msg_queue_proc(void) {
if (msg_queue_initialized) {
//mutex lock
sys_mutex_lock(&mutex_msg_q, K_FOREVER);
for (int i=0; i<ARRAY_SIZE(msg_queue); i++) {
if (msg_queue[i].in_used) {
if ((k_uptime_get_32() - msg_queue[i].start_tick) >= (MQTT_PUBACK_TIMEOUT_S * 1000)) {
//printk("ERR: Send message timeout! (msg id: %d)\n", msg_queue[i].msg_id);
if (msg_queue[i].resend_count) {
printk("Re-send message again. (msg id: %d)\n", msg_queue[i].msg_id);
uint32_t msg_id = msg_queue[i].msg_id;
int err = nrf_mqtt_publish(&client, msg_queue[i].p_pub_topic, MQTT_QOS_1_AT_LEAST_ONCE,
msg_queue[i].p_msg, msg_queue[i].msg_len, &msg_id);
if (err) {
printk("message resend failed! %d\n", err);
}
msg_queue[i].start_tick = k_uptime_get_32();
msg_queue[i].resend_count--;
}
else {
//printk("Maximum re-send reached! Abort message! (msg id: %d)\n", msg_queue[i].msg_id);
uint32_t msg_id = msg_queue[i].msg_id;
msg_queue_delete(msg_id);
if (config->msg_send_status_cb) {
config->msg_send_status_cb(msg_id, MSG_SEND_TIMEOUT);
}
}
}
}
}
//mutex unlock
sys_mutex_unlock(&mutex_msg_q);
}
}

static unsigned int get_next_message_id() {
if (rolling_message_id + 1 >= UINT_MAX) {
Expand Down Expand Up @@ -221,71 +351,55 @@ static bool client_init() {
return true;
}


/**@brief Function to publish data on the configured topic
*/
int iotc_nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos,
const uint8_t *data, size_t len) {
static int nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos,
const uint8_t *data, size_t len, uint32_t *p_msg_id) {
struct mqtt_publish_param param;
int retry_count = 2;
int code;

//mutex lock
sys_mutex_lock(&mutex_mqtt_pub, K_FOREVER);

do {

param.message.topic.qos = qos;
param.message.topic.topic.utf8 = (uint8_t *) topic;
param.message.topic.topic.size = strlen(param.message.topic.topic.utf8);
param.message.payload.data = (uint8_t *) data;
param.message.payload.len = len;
param.message.topic.qos = qos;
param.message.topic.topic.utf8 = (uint8_t *) topic;
param.message.topic.topic.size = strlen(param.message.topic.topic.utf8);
param.message.payload.data = (uint8_t *) data;
param.message.payload.len = len;
// if p_msg_id contains non-zero value, the message is re-send from msg_queue_proc().
// Assign this message with assigned messsage id in p_msg_id.
// else assign current running message id to this message.
if (p_msg_id && *p_msg_id) {
param.message_id = *p_msg_id;
}
else {
param.message_id = get_next_message_id();
param.dup_flag = 0;
param.retain_flag = 0;

code = mqtt_publish(c, &param);
if (code != 0) {
goto publish_done;
}

if (param.message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {

pending_ack_msg_id = param.message_id;
msg_ack_pending = true;

uint32_t start_ticks = k_uptime_get_32();

do {

iotc_nrf_mqtt_loop();
k_msleep(1);

} while (msg_ack_pending &&
(k_uptime_get_32() - start_ticks) < (MQTT_PUBACK_TIMEOUT_S * 1000));

if (!msg_ack_pending) {
code = 0;
break;
}

if (retry_count > 0) {
retry_count--;
if (retry_count == 0) {
msg_ack_pending = false;
code = -ETIMEDOUT;
}
}
}
param.dup_flag = 0;
param.retain_flag = 0;

code = mqtt_publish(c, &param);

// Only add the message to msg queue when:
// mqtt_publish() is successful.
// p_msg_id is NULL or contains zero. (not re-send from msg_queue_proc())
// QOS is 1.
if (!code && !(p_msg_id && *p_msg_id) && (qos == MQTT_QOS_1_AT_LEAST_ONCE)) {
msg_queue_add(param.message_id, topic, (uint8_t *)data, len);
// Return back the assigned message id if p_msg_id is not NULL.
if (p_msg_id) {
*p_msg_id = param.message_id;
}
}

} while ((param.message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) && (retry_count > 0));

publish_done:
return code;
}

//mutex unlock
sys_mutex_unlock(&mutex_mqtt_pub);
/**@brief Function to publish data on the configured topic
*/
int iotc_nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos,
const uint8_t *data, size_t len, uint32_t *p_msg_id) {

return code;
if (p_msg_id ) {
//New message. Force p_msg_id to zero.
*p_msg_id = 0;
}
return nrf_mqtt_publish(c, topic, qos, data, len, p_msg_id);
}

static void mqtt_evt_handler(struct mqtt_client *const c,
Expand Down Expand Up @@ -317,6 +431,7 @@ static void mqtt_evt_handler(struct mqtt_client *const c,
if (config->status_cb) {
config->status_cb(MQTT_DISCONNECTED);
}
msg_queue_clear(true);
break;

case MQTT_EVT_PUBLISH: {
Expand Down Expand Up @@ -351,11 +466,9 @@ static void mqtt_evt_handler(struct mqtt_client *const c,
}

printk("Packet id: %u acknowledged\n", evt->param.puback.message_id);

if (msg_ack_pending) {
if (pending_ack_msg_id == evt->param.puback.message_id) {
msg_ack_pending = false;
}
msg_queue_delete(evt->param.puback.message_id);
if (config->msg_send_status_cb) {
config->msg_send_status_cb(evt->param.puback.message_id, MSG_SEND_SUCCESS);
}
break;

Expand Down Expand Up @@ -429,6 +542,9 @@ bool iotc_nrf_mqtt_init(IotconnectMqttConfig *c, IotclSyncResponse *sr) {
return false;
}

//init msg queue
msg_queue_init();

fds.fd = -1;
sync_response = sr;
config = c;
Expand Down Expand Up @@ -458,6 +574,9 @@ bool iotc_nrf_mqtt_init(IotconnectMqttConfig *c, IotclSyncResponse *sr) {
void iotc_nrf_mqtt_loop(void) {
int err;

// process msg queue procedure
msg_queue_proc();

//if no FD return.
if (fds.fd == -1) {
return;
Expand Down
15 changes: 12 additions & 3 deletions iotconnect-sdk/src/iotconnect.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ static void on_iotconnect_status(IotconnectConnectionStatus status) {
}
}


static void on_msg_send_status(uint32_t msg_id, IotconnectMsgSendStatus status) {
if (config.msg_send_status_cb) {
config.msg_send_status_cb(msg_id, status);
}
}
///////////////////////////////////////////////////////////////////////////////////
// Get All twin property from C2D
void iotconnect_sdk_disconnect() {
Expand All @@ -202,8 +206,12 @@ void iotconnect_sdk_disconnect() {
k_msleep(100);
}

void iotconnect_sdk_send_packet(const char *data) {
if (0 != iotc_nrf_mqtt_publish(&client, sync_response->broker.pub_topic, MQTT_QOS_0_AT_MOST_ONCE, data, strlen(data))) {
// Send MQTT message.
// p_msg_id points to a 32-bit variable that store the assigned message id of the message
// being sent.
void iotconnect_sdk_send_packet(const char *data, uint32_t *p_msg_id) {
if (0 != iotc_nrf_mqtt_publish(&client, sync_response->broker.pub_topic, MQTT_QOS_1_AT_LEAST_ONCE, data, strlen(data),
....p_msg_id)) {
printk("\n\t Device_Attributes_Data Publish failure");
}
}
Expand Down Expand Up @@ -302,6 +310,7 @@ int iotconnect_sdk_init() {
mqtt_config.tls_verify = CONFIG_PEER_VERIFY;
mqtt_config.data_cb = iotc_on_mqtt_data;
mqtt_config.status_cb = on_iotconnect_status;
mqtt_config.msg_send_status_cb = on_msg_send_status;

if (!iotc_nrf_mqtt_init(&mqtt_config, sync_response)) {
return -8;
Expand Down
Loading