Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions slscore/SLSApiServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,24 @@ bool CSLSApiServer::authenticateRequest(const httplib::Request& req, httplib::Re
return true;
}

/**
* @brief Register HTTP routes and associate them with their request handlers.
*
* Configures the server's endpoints including health check, CORS preflight,
* stream IDs (list, create, delete), stats retrieval, API key creation, and
* publisher disconnect.
*
* @details
* The following routes are registered:
* - GET /health
* - OPTIONS .* (CORS preflight)
* - GET /api/stream-ids
* - POST /api/stream-ids
* - DELETE /api/stream-ids/{id}
* - GET /stats/{publisher}
* - POST /api/keys
* - DELETE /api/disconnect/{publisher}
*/
void CSLSApiServer::setupEndpoints() {
// Health check endpoint (no auth required)
m_server.Get("/health", [this](const httplib::Request& req, httplib::Response& res) {
Expand Down Expand Up @@ -176,8 +194,22 @@ void CSLSApiServer::setupEndpoints() {
m_server.Post("/api/keys", [this](const httplib::Request& req, httplib::Response& res) {
handleApiKeys(req, res);
});

// Publisher disconnect endpoint
m_server.Delete(R"(/api/disconnect/(.+))", [this](const httplib::Request& req, httplib::Response& res) {
handleDisconnectPublisher(req, res);
});
}

/**
* @brief Responds to health-check requests with basic service information.
*
* Sets CORS headers and returns a JSON body containing service status, name,
* and version.
*
* @param req Incoming HTTP request.
* @param res Outgoing HTTP response (will be populated with JSON content).
*/
void CSLSApiServer::handleHealth(const httplib::Request& req, httplib::Response& res) {
setCorsHeaders(res);

Expand Down Expand Up @@ -381,6 +413,25 @@ void CSLSApiServer::handleStats(const httplib::Request& req, httplib::Response&
res.set_content(ret.dump(), "application/json");
}

/**
* @brief Handle creation of a new API key via the /api/keys endpoint.
*
* Validates rate limits for configuration endpoints, requires a caller with
* admin permissions, accepts a JSON body with optional `name` and
* `permissions` fields (defaults: "New API Key" and "read"), and creates a new
* API key in the database. On success, returns a JSON object containing the
* newly generated `api_key` and a message advising to save it securely.
*
* Possible response statuses:
* - 200: API key created successfully (response contains `api_key` and `message`).
* - 400: Invalid JSON body.
* - 403: Caller lacks admin permissions.
* - 429: Rate limit exceeded for configuration endpoints.
* - 500: Failed to create API key.
*
* @param req Incoming HTTP request (expects JSON body with optional `name` and `permissions`).
* @param res HTTP response to populate with JSON result and appropriate status code.
*/
void CSLSApiServer::handleApiKeys(const httplib::Request& req, httplib::Response& res) {
setCorsHeaders(res);

Expand Down Expand Up @@ -440,4 +491,84 @@ void CSLSApiServer::handleApiKeys(const httplib::Request& req, httplib::Response
error["message"] = "Failed to create API key";
res.set_content(error.dump(), "application/json");
}
}

/**
* @brief Handles DELETE /api/disconnect/{publisher} requests to force-disconnect a publisher.
*
* Validates rate limits and API key authentication, requires `admin` or `write` permissions,
* invokes the SLS manager to disconnect the specified publisher, and returns a JSON response
* indicating success or the appropriate error.
*
* @param req Incoming HTTP request; expects the publisher id in the first route match (req.matches[1])
* and an Authorization header with a Bearer API key.
* @param res HTTP response object which will be populated with a JSON body and appropriate status code.
*
* Possible response status codes:
* - 200: Publisher disconnected successfully.
* - 403: Authentication succeeded but caller lacks `admin` or `write` permissions.
* - 404: Publisher not found or not currently streaming.
* - 429: Rate limit exceeded for the API endpoint.
* - 500: SLS manager is not available.
*/
void CSLSApiServer::handleDisconnectPublisher(const httplib::Request& req, httplib::Response& res) {
setCorsHeaders(res);

// Rate limiting
if (!checkRateLimit(req.remote_addr, "api")) {
res.status = 429;
json error;
error["status"] = "error";
error["message"] = "Rate limit exceeded";
res.set_content(error.dump(), "application/json");
return;
}

// Authentication with admin or write permissions check
std::string permissions;
if (!authenticateRequest(req, res, permissions)) {
return;
}

if (permissions != "admin" && permissions != "write") {
res.status = 403;
json error;
error["status"] = "error";
error["message"] = "Admin or write permissions required";
res.set_content(error.dump(), "application/json");
CSLSDatabase::getInstance().logAccess(req.get_header_value("Authorization").substr(7),
req.path, req.method, req.remote_addr, 403);
return;
}

std::string publisher_id = req.matches[1];

if (!m_sls_manager) {
res.status = 500;
json error;
error["status"] = "error";
error["message"] = "SLS manager not available";
res.set_content(error.dump(), "application/json");
return;
}

// Attempt to disconnect the publisher
if (m_sls_manager->disconnect_publisher(publisher_id)) {
json response;
response["status"] = "success";
response["message"] = "Publisher disconnected successfully";
res.set_content(response.dump(), "application/json");

CSLSDatabase::getInstance().logAccess(req.get_header_value("Authorization").substr(7),
req.path, req.method, req.remote_addr, 200);
} else {
res.status = 404;
json error;
error["status"] = "error";
error["message"] = "Publisher not found or not currently streaming";
res.set_content(error.dump(), "application/json");

CSLSDatabase::getInstance().logAccess(req.get_header_value("Authorization").substr(7),
req.path, req.method, req.remote_addr, 404);
}
}
133 changes: 127 additions & 6 deletions slscore/SLSManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ SLS_CONF_DYNAMIC_IMPLEMENT(srt)
*/
#define DEFAULT_GROUP 1

/**
* @brief Initializes CSLSManager with default member values.
*
* Sets the worker thread count to DEFAULT_GROUP, server count to 1,
* and initializes pointer members (role list, single group, and per-server maps)
* to NULL.
*/
CSLSManager::CSLSManager()
{
m_worker_threads = DEFAULT_GROUP;
Expand All @@ -61,14 +68,28 @@ CSLSManager::CSLSManager()
m_map_data = NULL;
m_map_publisher = NULL;
m_map_puller = NULL;
m_map_pusher = NULL;

m_map_pusher = NULL;
}

/**
* @brief Releases resources used by CSLSManager.
*
* Performs cleanup when a CSLSManager instance is destroyed.
*/
CSLSManager::~CSLSManager()
{
}

/**
* @brief Initialize the manager: load configuration, create listeners and worker groups.
*
* Reads SRT configuration, applies log settings, allocates per-server maps and a shared role list,
* creates and starts publisher and player listeners for each configured server, and initializes
* worker group(s) (either a single group or multiple worker threads) with epoll and worker settings.
*
* @return int 0 on success; SLS_ERROR on configuration or initialization failure.
*/
int CSLSManager::start()
{
int ret = 0;
Expand Down Expand Up @@ -195,7 +216,26 @@ int CSLSManager::start()

}

/**
* @brief Resolve a publisher key corresponding to a given player key.
*
* Checks the internal stream database for a mapping from the provided player key
* to a publisher ID; if none is found, treats the player key as a potential
* publisher key and searches active publishers across servers.
*
* @param player_key Null-terminated C string containing the player key to resolve.
* Must not be NULL or empty.
* @return char* Pointer to a null-terminated publisher key if found, `NULL` otherwise.
* If the mapping is returned from the database, the pointer refers
* to thread-local storage; if the provided player_key is itself a
* publisher key, the original player_key pointer is returned.
*/
char* CSLSManager::find_publisher_by_player_key(char *player_key) {
if (player_key == NULL || strlen(player_key) == 0) {
sls_log(SLS_LOG_WARNING, "[%p]CSLSManager::find_publisher_by_player_key, empty player key provided", this);
return NULL;
}

// First check stream ID database
std::string publisher_id = CSLSDatabase::getInstance().getPublisherFromPlayer(player_key);
if (!publisher_id.empty()) {
Expand All @@ -205,10 +245,10 @@ char* CSLSManager::find_publisher_by_player_key(char *player_key) {

return mapped_publisher;
}

sls_log(SLS_LOG_WARNING, "[%p]CSLSManager::find_publisher_by_player_key, player key '%s' not found in database",
this, player_key);

// If not found in database, check if it's a direct publisher key
CSLSRole* role = nullptr;
for (int i = 0; i < m_server_count; i++) {
Expand All @@ -217,18 +257,32 @@ char* CSLSManager::find_publisher_by_player_key(char *player_key) {
break;
}
}

if (role != NULL) {
sls_log(SLS_LOG_INFO, "[%p]CSLSManager::find_publisher_by_player_key, player key '%s' is a publisher key",
this, player_key);
return player_key;
}

sls_log(SLS_LOG_WARNING, "[%p]CSLSManager::find_publisher_by_player_key, no publisher found for player key '%s'",
this, player_key);
return NULL;
}

/**
* @brief Produce a JSON object describing the publisher associated with the given player key.
*
* Looks up the publisher mapped to the provided player key and returns its current statistics
* in either legacy or modern format, or an error message if the key is invalid or the
* publisher is not currently streaming.
*
* @param playerKey Player key used to resolve the mapped publisher; must not be empty.
* @param clear If non-zero, clear the publisher's collected statistics after reading.
* @param legacy If true, return statistics under a legacy `publishers` object; otherwise use `publisher`.
* @return json A JSON object with a `status` field set to `"ok"` on success or `"error"` on failure.
* On success includes either `publisher` (modern) or `publishers` (legacy) containing stats.
* On error includes a `message` explaining the failure.
*/
json CSLSManager::generate_json_for_publisher(std::string playerKey, int clear, bool legacy) {
json ret;
ret["status"] = "error";
Expand Down Expand Up @@ -288,6 +342,74 @@ json CSLSManager::generate_json_for_publisher(std::string playerKey, int clear,
return ret;
}

/**
* @brief Disconnects the publisher associated with the given player or publisher key.
*
* Resolves the provided key (player or publisher key) to the active publisher instance,
* calls the publisher's on_close callback, and marks it invalid so it will be cleaned up.
*
* @param key Player key or publisher key used to locate the publisher; must not be empty.
* @return true if a publisher was found and scheduled for disconnection, false otherwise.
*/
bool CSLSManager::disconnect_publisher(const std::string& key) {
if (key.empty()) {
sls_log(SLS_LOG_WARNING, "[%p]CSLSManager::disconnect_publisher, empty key provided", this);
return false;
}

char* mapped_publisher = find_publisher_by_player_key(const_cast<char*>(key.c_str()));
if (mapped_publisher == NULL) {
sls_log(SLS_LOG_WARNING, "[%p]CSLSManager::disconnect_publisher, unable to resolve publisher for key: %s", this, key.c_str());
return false;
}

std::string publisher_key(mapped_publisher);

// Search for the publisher in all server instances using resolved publisher key
CSLSRole *role = nullptr;
for (int i = 0; i < m_server_count; i++) {
CSLSMapPublisher *publisher_map = &m_map_publisher[i];
role = publisher_map->get_publisher(publisher_key.c_str());
if (role != nullptr) {
break;
}
}
if (role == nullptr) {
sls_log(SLS_LOG_WARNING, "[%p]CSLSManager::disconnect_publisher, publisher not found for key: %s (resolved publisher: %s)",
this, key.c_str(), publisher_key.c_str());
return false;
}

// Disconnect the publisher
sls_log(SLS_LOG_INFO, "[%p]CSLSManager::disconnect_publisher, disconnecting publisher: %s (requested key: %s)",
this, publisher_key.c_str(), key.c_str());
// Call on_close to notify any HTTP callbacks
role->on_close();
// Mark the role as invalid to trigger cleanup in the next cycle
role->invalid_srt();
return true;
}

/**
* @brief Build a legacy-formatted JSON object containing publisher statistics.
*
* Produces a JSON object with interval and instant metrics for the given publisher role.
*
* @param role Pointer to the publisher role from which statistics are retrieved.
* @param clear If non-zero, clear the role's internal counters after reading.
* @return json JSON object containing fields:
* - pktRcvLoss: packet receive loss count (interval)
* - pktRcvDrop: packet receive drop count (interval)
* - bytesRcvLoss: bytes lost while receiving (interval)
* - bytesRcvDrop: bytes dropped while receiving (interval)
* - mbpsRecvRate: receive rate in Mbps (interval)
* - rtt: round-trip time in milliseconds (instant)
* - msRcvBuf: receive buffer size in milliseconds (instant)
* - mbpsBandwidth: measured bandwidth in Mbps (instant)
* - bitrate: current bitrate in kbps (instant)
* - uptime: publisher uptime in seconds (instant)
* - latency: publisher latency in milliseconds (instant)
*/
json CSLSManager::create_legacy_json_stats_for_publisher(CSLSRole *role, int clear) {
json ret = json::object();
SRT_TRACEBSTATS stats;
Expand Down Expand Up @@ -479,4 +601,3 @@ int CSLSManager::stat_client_callback(void *p, HTTP_CALLBACK_TYPE type, void *v
}



Loading