Unable to stop ICYStream in a multithreaded context #1954
-
I have a setup where I'm using 3 tasks for an internet radio:
This allows a highly resilient buffering pipeline which can survive several multi-second network dropouts before it stutters. However I am failing to stop the pipeline due to a crash in the Here is how I'm starting and stopping the stream: void start(const std::string url, std::function<void(bool)> loadingCallback) {
running = true;
loadingCallback(true);
ESP_LOGI(LOG_TAG, "Streamer is starting");
queueNetData.begin();
queuePcmData.begin();
netTask.begin([this, url, loadingCallback]() {
TickType_t last_shart_time = xTaskGetTickCount();
urlStream.setMetadataCallback(_update_meta_global);
urlStream.httpRequest().setTimeout(NET_CLIENT_TIMEOUT);
urlStream.httpRequest().header().setProtocol("HTTP/1.0"); // <- important, because chunked transfer of some servers seems to cause trouble with buffering!
urlStream.begin(url.c_str());
ESP_LOGI(LOG_TAG, "Streamer did begin URL");
const char * _srv_mime = urlStream.httpRequest().reply().get("Content-Type");
if(_srv_mime != nullptr) {
const std::string srv_mime = _srv_mime;
ESP_LOGI(LOG_TAG, "Server reports content-type: %s", srv_mime.c_str());
if(srv_mime == "audio/mpeg" || srv_mime == "audio/mp3") {
activeCodec = new MP3DecoderHelix();
}
else if(srv_mime == "audio/aac") {
activeCodec = new AACDecoderHelix();
}
else {
ESP_LOGE(LOG_TAG, "Content-type is not supported, TODO show error");
loadingCallback(false);
vTaskDelay(portMAX_DELAY);
}
decoder.setDecoder(activeCodec);
activeCodec->setNotifyActive(true);
activeCodec->addNotifyAudioChange(*outPort);
decoder.setOutput(queuePcmData);
} else {
ESP_LOGE(LOG_TAG, "No content-type reported, TODO show error");
loadingCallback(false);
vTaskDelay(portMAX_DELAY);
}
decoder.setNotifyActive(true);
decoder.begin();
ESP_LOGI(LOG_TAG, "Streamer did begin decoder");
TickType_t last_successful_copy = xTaskGetTickCount();
TickType_t last_stats = xTaskGetTickCount();
codecTask.begin([this]() {
while(bufferNetData.levelPercent() < NET_DATA_ENOUGH_MARK_PCT && running) {
delay(125);
}
if(!running) {
ESP_LOGI(LOG_TAG, "Finishing up codec task early");
decoder.end();
xSemaphoreGive(semaCodec);
vTaskDelay(portMAX_DELAY);
}
while(copierDecoding.copy() > 0 && !bufferNetData.isEmpty() && running) {
delay(10);
}
while(bufferNetData.levelPercent() < NET_DATA_ENOUGH_MARK_PCT && running) delay(250);
if(!running) {
ESP_LOGI(LOG_TAG, "Finishing up codec task");
decoder.end();
xSemaphoreGive(semaCodec);
vTaskDelay(portMAX_DELAY);
}
delay(5);
xSemaphoreGive(semaCodec);
});
sndTask.begin([this, loadingCallback]() {
xSemaphoreTake(semaSnd, portMAX_DELAY);
if(!copierPlaying.copy() && running) {
ESP_LOGD(LOG_TAG, "copierPlaying underrun!? PCM buffer %.00f%%, net buffer %.00f%%", bufferPcmData.levelPercent(), bufferNetData.levelPercent());
loadingCallback(true);
while(bufferPcmData.isEmpty() && running) {
xSemaphoreGive(semaSnd);
delay(100);
xSemaphoreTake(semaSnd, portMAX_DELAY);
}
loadingCallback(false);
}
xSemaphoreGive(semaSnd);
delay(2);
if(!running) {
ESP_LOGI(LOG_TAG, "Finishing up sound task");
vTaskDelay(portMAX_DELAY);
}
});
while(running) {
xSemaphoreTake(semaNet, portMAX_DELAY);
TickType_t now = xTaskGetTickCount();
int copied = copierDownloading.copy();
if(copied > 0) {
last_successful_copy = now;
delay(2);
} else if(now - last_successful_copy >= pdTICKS_TO_MS(NET_NO_DATA_TIMEOUT) && (bufferNetData.levelPercent() < 20.0 || bufferPcmData.isEmpty())) {
ESP_LOGE(LOG_TAG, "streamer is stalled!? time since last shart = [ %i ms ]", now - last_shart_time);
last_shart_time = now;
loadingCallback(true);
urlStream.end();
int retryDelay = 100;
do {
if(bufferPcmData.isEmpty()) bufferNetData.clear(); // prevent click artifact on restart playback
delay(retryDelay);
retryDelay += 1000;
if(retryDelay > NET_CLIENT_TIMEOUT) retryDelay = NET_CLIENT_TIMEOUT;
ESP_LOGE(LOG_TAG, "streamer is trying to begin URL again");
} while(!urlStream.begin(url.c_str()));
last_successful_copy = now;
} else if(copied < 0) {
ESP_LOGE(LOG_TAG, "copied = %i ???", copied);
} else if(bufferNetData.isFull()) {
ESP_LOGW(LOG_TAG, "Net buffer full, no space to receive more data!!");
delay(10);
} else {
// copied nothing, maybe yield to another task
delay(10);
}
if(now - last_stats >= pdTICKS_TO_MS(LOG_STATS_INTERVAL)) {
ESP_LOGI(LOG_TAG, "Stats: PCM buffer %.00f%%, net buffer %.00f%%", bufferPcmData.levelPercent(), bufferNetData.levelPercent());
last_stats = now;
}
xSemaphoreGive(semaNet);
}
ESP_LOGI(LOG_TAG, "Finishing up net task");
vTaskDelay(portMAX_DELAY);
}
);
}
void stop() {
NullStream tmp;
decoder.setOutput(tmp);
if(activeCodec != nullptr) {
activeCodec->setOutput(tmp);
}
running = false;
xSemaphoreTake(semaNet, portMAX_DELAY);
ESP_LOGI(LOG_TAG, "Streamer finalizing net task");
netTask.end();
ESP_LOGI(LOG_TAG, "Streamer finalizing sound task");
xSemaphoreTake(semaSnd, portMAX_DELAY);
sndTask.end();
// Small shitshow because the codec thread sometimes gets stuck and never exits and leaves the semaphore
ESP_LOGI(LOG_TAG, "Streamer finalizing codec task");
bool codecNotStuck = xSemaphoreTake(semaCodec, pdMS_TO_TICKS(1000));
codecTask.end();
if(!codecNotStuck) {
delay(1000); // give it time to unshit itself to prevent explosions
}
if(activeCodec != nullptr) {
activeCodec->setOutput(tmp);
delete activeCodec;
activeCodec = nullptr;
}
} As you can see, everything is locked by mutexes, and nothing is released until the respective task's mutex has been taken. This method is then called in the destructor for the pipeline, to prevent any object being deleted while the task is still running: ~StreamingPipeline() {
if(running) stop();
vSemaphoreDelete(semaSnd);
vSemaphoreDelete(semaNet);
vSemaphoreDelete(semaCodec);
} However, it ultimately leaves me with a nullpointerexception:
It appears as if the HttpRequest is being ended twice or something. If I remove the call to Does HttpRequest create some sort of object that cannot be accessed cross-thread even if locked behind a mutex? Or am I missing something simple? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Not sure what is going wrong: the following test sketch is working perfectly #include "AudioTools.h"
void setup(){
Serial.begin(115200);
AudioToolsLogger.begin(Serial, AudioToolsLogLevel::Info);
}
void loop() {
Serial.begin(115200);
Serial.println("---> loop:");
ICYStream *p_url = new ICYStream("ssid","pwd"); // or replace with ICYStream to get metadata
if (!p_url->begin("https://stream.srg-ssr.ch/m/rsj/mp3_128","audio/mp3")){
stop();
}
p_url->end();
delete p_url;
} I have the following comments though:
|
Beta Was this translation helpful? Give feedback.
Not sure what is going wrong: the following test sketch is working perfectly
I have the following comments though: