Skip to content

Commit

Permalink
Sink service: Add a maximum number of DL packets in parallel
Browse files Browse the repository at this point in the history
This an optional feature that is by default unset
  • Loading branch information
GwendalRaoul committed Apr 19, 2023
1 parent 75a76bc commit 3010bac
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 9 deletions.
58 changes: 54 additions & 4 deletions sink_service/source/data.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,39 @@ static char * m_interface = NULL;
/** Bus slot used to register the Vtable */
static sd_bus_slot * m_slot = NULL;

/** Max mtu size of sink */
static size_t m_max_mtu;

/* Max number of downlink packet being sent in parallel */
static size_t m_downlink_limit;

/**********************************************************************
* DBUS Methods implementation *
**********************************************************************/

static uint8_t m_message_queued_in_sink = 0;

static void on_data_sent_cb(uint16_t pduid, uint32_t buffering_delay, uint8_t result)
{
m_message_queued_in_sink -= (uint8_t) (pduid >> 8);
LOGD("Message sent %d, Message_queued: %d\n", pduid, m_message_queued_in_sink);
}

/**
* \brief Send a message handler
* \param ... (from sd_bus function signature)
*/
static int send_message(sd_bus_message * m, void * userdata, sd_bus_error * error)
{
static uint8_t m_pdu_id = 0;

app_message_t message;
app_res_e res;
const void * data;
size_t n;
int r;
uint8_t qos;
uint8_t weight;

/* Read the parameters */
r = sd_bus_message_read(m,
Expand Down Expand Up @@ -77,21 +95,46 @@ static int send_message(sd_bus_message * m, void * userdata, sd_bus_error * erro
message.bytes = data;
message.num_bytes = n;

if (m_downlink_limit > 0)
{
/* Check if message can be queued */
weight = (n + m_max_mtu - 1) / m_max_mtu;
if (m_message_queued_in_sink + weight > m_downlink_limit)
{
// No point to try sending data, queue is already full
return sd_bus_reply_method_return(m, "u", APP_RES_OUT_OF_MEMORY);
}

/* Keep track of packet queued on the sink */
/* Encode weight in ID */
message.pdu_id = weight << 8 | m_pdu_id++;
message.on_data_sent_cb = on_data_sent_cb;
}
else
{
message.pdu_id = 0;
message.on_data_sent_cb = NULL;

}

LOGD("Message to send on EP %d from EP %d to 0x%x size = %d\n",
message.dst_ep,
message.src_ep,
message.dst_addr,
message.num_bytes);

/* Send packet. For now, packets are not tracked to keep behavior simpler */
message.pdu_id = 0;
message.on_data_sent_cb = NULL;


res = WPC_send_data_with_options(&message);
if (res != APP_RES_OK)
{
LOGE("Cannot send data: %d\n", res);
}
else if (m_downlink_limit > 0)
{
m_message_queued_in_sink += weight;
LOGI("Message_queued: %d\n", m_message_queued_in_sink);
}

return sd_bus_reply_method_return(m, "u", res);
}
Expand Down Expand Up @@ -203,17 +246,24 @@ static const sd_bus_vtable data_vtable[] = {

SD_BUS_VTABLE_END};

int Data_Init(sd_bus * bus, char * object, char * interface)
int Data_Init(sd_bus * bus, char * object, char * interface, size_t downlink_limit)
{
int ret;

m_bus = bus;
m_object = object;
m_interface = interface;
m_downlink_limit = downlink_limit;

/* Register for all data */
WPC_register_for_data(onDataReceived);

if (WPC_get_mtu((uint8_t *) &m_max_mtu) != APP_RES_OK)
{
LOGW("Cannot read max mtu from node");
m_max_mtu = 102;
}

/* Install the data vtable */
ret = sd_bus_add_object_vtable(bus, &m_slot, object, interface, data_vtable, NULL);
if (ret < 0)
Expand Down
4 changes: 3 additions & 1 deletion sink_service/source/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
* The sd_bus instance to publish the config interface
*\param object
*\param interface
*\param downlink_limit
If > 0, max number of downlink messages being queued in parallel
* \return 0 if initialization succeed, an error code otherwise
* \note Connection with sink must be ready before calling this module
*/
int Data_Init(sd_bus * bus, char * object, char * interface);
int Data_Init(sd_bus * bus, char * object, char * interface, size_t downlink_limit);

void Data_Close();

Expand Down
26 changes: 22 additions & 4 deletions sink_service/source/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@ static bool get_service_name(char service_name[MAX_SIZE_SERVICE_NAME], unsigned
* Pointer where to store max_poll_fail_duration value (if any)
* \param fragment_max_duration_s
* Pointer where to store fragment_max_duration_s value (if any)
* \param downlink_limit
* Pointer where to store downlink_limit value (if any)
*/
static void get_env_parameters(unsigned long * baudrate,
char ** port_name,
unsigned int * sink_id,
unsigned int * max_poll_fail_duration,
unsigned int * fragment_max_duration_s)
unsigned int * fragment_max_duration_s,
unsigned int * downlink_limit)
{
char * ptr;

Expand Down Expand Up @@ -110,6 +113,11 @@ static void get_env_parameters(unsigned long * baudrate,
*fragment_max_duration_s = strtoul(ptr, NULL, 0);
LOGI("WM_GW_SINK_MAX_FRAGMENT_DURATION_S: %lu\n", *fragment_max_duration_s);
}
if ((ptr = getenv("WM_GW_SINK_DOWNLINK_LIMIT")) != NULL)
{
*downlink_limit = strtoul(ptr, NULL, 0);
LOGI("WM_GW_SINK_DOWNLINK_LIMIT: %lu\n", *downlink_limit);
}
}

// Usual baudrate to test in automatic mode
Expand Down Expand Up @@ -147,13 +155,14 @@ int main(int argc, char * argv[])
unsigned int sink_id = 0;
unsigned int max_poll_fail_duration = UNDEFINED_MAX_POLL_FAIL_DURATION;
unsigned int fragment_max_duration_s = DEFAULT_FRAGMENT_MAX_DURATION_S;
unsigned int downlink_limit = true;

/* Acquires environment parameters */
get_env_parameters(&baudrate, &port_name, &sink_id, &max_poll_fail_duration,
&fragment_max_duration_s);
&fragment_max_duration_s, &downlink_limit);

/* Parse command line arguments - take precedence over environmental ones */
while ((c = getopt(argc, argv, "b:p:i:d:f:")) != -1)
while ((c = getopt(argc, argv, "b:p:i:d:f:l:")) != -1)
{
switch (c)
{
Expand All @@ -176,6 +185,9 @@ int main(int argc, char * argv[])
case 'f':
fragment_max_duration_s = strtoul(optarg, NULL, 0);
break;
case 'l':
downlink_limit = strtoul(optarg, NULL, 0);
break;
case '?':
default:
LOGE("Error in argument parsing\n");
Expand All @@ -184,6 +196,12 @@ int main(int argc, char * argv[])
}
}

if (downlink_limit > 16)
{
LOGE("Max downlink limit is 16 (%d)\n", downlink_limit);
return EXIT_FAILURE;
}

/* Generate full service name */
if (!get_service_name(full_service_name, sink_id))
{
Expand Down Expand Up @@ -263,7 +281,7 @@ int main(int argc, char * argv[])
goto finish;
}

if (Data_Init(m_bus, "/com/wirepas/sink", "com.wirepas.sink.data1") < 0)
if (Data_Init(m_bus, "/com/wirepas/sink", "com.wirepas.sink.data1", downlink_limit) < 0)
{
LOGE("Cannot initialize data module\n");
r = -1;
Expand Down

0 comments on commit 3010bac

Please sign in to comment.