diff --git a/sink_service/source/data.c b/sink_service/source/data.c index 80547cbf..7437d3d6 100644 --- a/sink_service/source/data.c +++ b/sink_service/source/data.c @@ -29,21 +29,39 @@ static char * m_interface = NULL; /** Bus slot used to register the Vtable */ static sd_bus_slot * m_slot = NULL; +/** Max mtu size of sink */ +static size_t m_max_mtu; + +/* Max number of downlink packet being sent in parallel */ +static size_t m_downlink_limit; + /********************************************************************** * DBUS Methods implementation * **********************************************************************/ + +static uint8_t m_message_queued_in_sink = 0; + +static void on_data_sent_cb(uint16_t pduid, uint32_t buffering_delay, uint8_t result) +{ + m_message_queued_in_sink -= (uint8_t) (pduid >> 8); + LOGD("Message sent %d, Message_queued: %d\n", pduid, m_message_queued_in_sink); +} + /** * \brief Send a message handler * \param ... (from sd_bus function signature) */ static int send_message(sd_bus_message * m, void * userdata, sd_bus_error * error) { + static uint8_t m_pdu_id = 0; + app_message_t message; app_res_e res; const void * data; size_t n; int r; uint8_t qos; + uint8_t weight; /* Read the parameters */ r = sd_bus_message_read(m, @@ -77,21 +95,46 @@ static int send_message(sd_bus_message * m, void * userdata, sd_bus_error * erro message.bytes = data; message.num_bytes = n; + if (m_downlink_limit > 0) + { + /* Check if message can be queued */ + weight = (n + m_max_mtu - 1) / m_max_mtu; + if (m_message_queued_in_sink + weight > m_downlink_limit) + { + // No point to try sending data, queue is already full + return sd_bus_reply_method_return(m, "u", APP_RES_OUT_OF_MEMORY); + } + + /* Keep track of packet queued on the sink */ + /* Encode weight in ID */ + message.pdu_id = weight << 8 | m_pdu_id++; + message.on_data_sent_cb = on_data_sent_cb; + } + else + { + message.pdu_id = 0; + message.on_data_sent_cb = NULL; + + } + LOGD("Message to send on EP %d from EP %d to 0x%x size = %d\n", message.dst_ep, message.src_ep, message.dst_addr, message.num_bytes); - /* Send packet. For now, packets are not tracked to keep behavior simpler */ - message.pdu_id = 0; - message.on_data_sent_cb = NULL; + res = WPC_send_data_with_options(&message); if (res != APP_RES_OK) { LOGE("Cannot send data: %d\n", res); } + else if (m_downlink_limit > 0) + { + m_message_queued_in_sink += weight; + LOGI("Message_queued: %d\n", m_message_queued_in_sink); + } return sd_bus_reply_method_return(m, "u", res); } @@ -203,17 +246,24 @@ static const sd_bus_vtable data_vtable[] = { SD_BUS_VTABLE_END}; -int Data_Init(sd_bus * bus, char * object, char * interface) +int Data_Init(sd_bus * bus, char * object, char * interface, size_t downlink_limit) { int ret; m_bus = bus; m_object = object; m_interface = interface; + m_downlink_limit = downlink_limit; /* Register for all data */ WPC_register_for_data(onDataReceived); + if (WPC_get_mtu((uint8_t *) &m_max_mtu) != APP_RES_OK) + { + LOGW("Cannot read max mtu from node"); + m_max_mtu = 102; + } + /* Install the data vtable */ ret = sd_bus_add_object_vtable(bus, &m_slot, object, interface, data_vtable, NULL); if (ret < 0) diff --git a/sink_service/source/data.h b/sink_service/source/data.h index 1f12a302..73bc4ffd 100644 --- a/sink_service/source/data.h +++ b/sink_service/source/data.h @@ -15,10 +15,12 @@ * The sd_bus instance to publish the config interface *\param object *\param interface + *\param downlink_limit + If > 0, max number of downlink messages being queued in parallel * \return 0 if initialization succeed, an error code otherwise * \note Connection with sink must be ready before calling this module */ -int Data_Init(sd_bus * bus, char * object, char * interface); +int Data_Init(sd_bus * bus, char * object, char * interface, size_t downlink_limit); void Data_Close(); diff --git a/sink_service/source/main.c b/sink_service/source/main.c index 44399527..d5eb1a0e 100644 --- a/sink_service/source/main.c +++ b/sink_service/source/main.c @@ -74,12 +74,15 @@ static bool get_service_name(char service_name[MAX_SIZE_SERVICE_NAME], unsigned * Pointer where to store max_poll_fail_duration value (if any) * \param fragment_max_duration_s * Pointer where to store fragment_max_duration_s value (if any) + * \param downlink_limit + * Pointer where to store downlink_limit value (if any) */ static void get_env_parameters(unsigned long * baudrate, char ** port_name, unsigned int * sink_id, unsigned int * max_poll_fail_duration, - unsigned int * fragment_max_duration_s) + unsigned int * fragment_max_duration_s, + unsigned int * downlink_limit) { char * ptr; @@ -110,6 +113,11 @@ static void get_env_parameters(unsigned long * baudrate, *fragment_max_duration_s = strtoul(ptr, NULL, 0); LOGI("WM_GW_SINK_MAX_FRAGMENT_DURATION_S: %lu\n", *fragment_max_duration_s); } + if ((ptr = getenv("WM_GW_SINK_DOWNLINK_LIMIT")) != NULL) + { + *downlink_limit = strtoul(ptr, NULL, 0); + LOGI("WM_GW_SINK_DOWNLINK_LIMIT: %lu\n", *downlink_limit); + } } // Usual baudrate to test in automatic mode @@ -147,13 +155,14 @@ int main(int argc, char * argv[]) unsigned int sink_id = 0; unsigned int max_poll_fail_duration = UNDEFINED_MAX_POLL_FAIL_DURATION; unsigned int fragment_max_duration_s = DEFAULT_FRAGMENT_MAX_DURATION_S; + unsigned int downlink_limit = true; /* Acquires environment parameters */ get_env_parameters(&baudrate, &port_name, &sink_id, &max_poll_fail_duration, - &fragment_max_duration_s); + &fragment_max_duration_s, &downlink_limit); /* Parse command line arguments - take precedence over environmental ones */ - while ((c = getopt(argc, argv, "b:p:i:d:f:")) != -1) + while ((c = getopt(argc, argv, "b:p:i:d:f:l:")) != -1) { switch (c) { @@ -176,6 +185,9 @@ int main(int argc, char * argv[]) case 'f': fragment_max_duration_s = strtoul(optarg, NULL, 0); break; + case 'l': + downlink_limit = strtoul(optarg, NULL, 0); + break; case '?': default: LOGE("Error in argument parsing\n"); @@ -184,6 +196,12 @@ int main(int argc, char * argv[]) } } + if (downlink_limit > 16) + { + LOGE("Max downlink limit is 16 (%d)\n", downlink_limit); + return EXIT_FAILURE; + } + /* Generate full service name */ if (!get_service_name(full_service_name, sink_id)) { @@ -263,7 +281,7 @@ int main(int argc, char * argv[]) goto finish; } - if (Data_Init(m_bus, "/com/wirepas/sink", "com.wirepas.sink.data1") < 0) + if (Data_Init(m_bus, "/com/wirepas/sink", "com.wirepas.sink.data1", downlink_limit) < 0) { LOGE("Cannot initialize data module\n"); r = -1;