Skip to content

Initial effort to implement SSE #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 186 additions & 9 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,12 @@ function setupRest(app) {
token: token,
iceServers: iceServers,
subscribers: {},
enabled: false
enabled: false,
active: false,
};
whep.info('[' + id + '] Created new WHEP endpoint');
// Monitor the state of the mountpoint on a regular basis
monitorEndpoint(endpoints[id]);
// Done
res.sendStatus(200);
});
Expand Down Expand Up @@ -259,8 +262,19 @@ function setupRest(app) {
let subscriber = subscribers[uuid];
if(subscriber) {
whep.info('[' + subscriber.whepId + '][' + uuid + '] PeerConnection detected as closed');
janus.removeSession({ uuid: endpoint.uuid });
janus.removeSession({ uuid: uuid });
delete subscriber.sse;
delete subscribers[uuid];
let endpoint = endpoints[subscriber.whepId];
if(endpoint) {
delete endpoint.subscribers[uuid];
// Notify updated viewers count
let count = Object.keys(endpoint.subscribers).length;
notifyEndpointSubscribers(endpoint, {
type: 'viewercount',
data: JSON.stringify({ viewercount: count })
});
}
}
}
});
Expand All @@ -283,15 +297,21 @@ function setupRest(app) {
endpoint.subscribers[uuid] = true;
subscriber.resource = config.rest + '/resource/' + uuid;
subscriber.latestEtag = janus.generateRandomString(16);
// Notify updated viewers count
let count = Object.keys(endpoint.subscribers).length;
notifyEndpointSubscribers(endpoint, {
type: 'viewercount',
data: JSON.stringify({ viewercount: count })
});
// Done
res.setHeader('Access-Control-Expose-Headers', 'Location, Link');
res.setHeader('Accept-Patch', 'application/trickle-ice-sdpfrag');
res.setHeader('Location', subscriber.resource);
res.set('ETag', '"' + subscriber.latestEtag + '"');
let iceServers = endpoint.iceServers ? endpoint.iceServers : config.iceServers;
let links = [];
if(iceServers && iceServers.length > 0) {
// Add a Link header for each static ICE server
let links = [];
for(let server of iceServers) {
if(!server.uri || (server.uri.indexOf('stun:') !== 0 &&
server.uri.indexOf('turn:') !== 0 &&
Expand All @@ -306,8 +326,13 @@ function setupRest(app) {
}
links.push(link);
}
res.setHeader('Link', links);
}
// Advertise support for SSE
let link = '<' + config.rest + '/sse/' + uuid + '>; ' +
'rel="urn:ietf:params:whep:ext:core:server-sent-events"; ' +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads-up: I noticed there's some inconsistency about these rel values in the spec.

Opened wish-wg/webrtc-http-egress-protocol#13 to discuss that and come up with a canonical answer, but this line and its parallel in web/watch.js may need to change.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inconsistency has been resolved. No action is needed in this PR.

Sergio's updated the spec and reference lib to consistently use urn:ietf:params:whep:ext:core:server-sent-events (the value used in this PR).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up! (and sorry for the late feedback, I was busy at the IETF meeting).

'events="active,inactive,layers,viewercount"';
links.push(link);
res.setHeader('Link', links);
res.writeHeader(201, { 'Content-Type': 'application/sdp' });
res.write(result.jsep.sdp);
res.end();
Expand Down Expand Up @@ -358,8 +383,16 @@ function setupRest(app) {
janus.finalize(details, function(err, result) {
if(err) {
let endpoint = endpoints[subscriber.whepId];
if(endpoint)
if(endpoint) {
delete endpoint.subscribers[uuid];
// Notify updated viewers count
let count = Object.keys(endpoint.subscribers).length;
notifyEndpointSubscribers(endpoint, {
type: 'viewercount',
data: JSON.stringify({ viewercount: count })
});
}
delete subscriber.sse;
delete subscribers[uuid];
res.status(500);
res.send(err.error);
Expand Down Expand Up @@ -481,26 +514,119 @@ function setupRest(app) {
if(janus)
janus.removeSession({ uuid: uuid });
delete endpoint.subscribers[uuid];
// Notify updated viewers count
let count = Object.keys(endpoint.subscribers).length;
notifyEndpointSubscribers(endpoint, {
type: 'viewercount',
data: JSON.stringify({ viewercount: count })
});
delete subscriber.sse;
delete subscribers[uuid];
whep.info('[' + uuid + '] Terminating WHEP session');
// Done
res.sendStatus(200);
});

// GET, HEAD, POST and PUT on the resource must return a 405
router.get('/resource/:id', function(req, res) {
router.get('/resource/:uuid', function(req, res) {
res.sendStatus(405);
});
router.head('/resource/:id', function(req, res) {
router.head('/resource/:uuid', function(req, res) {
res.sendStatus(405);
});
router.post('/resource/:id', function(req, res) {
router.post('/resource/:uuid', function(req, res) {
res.sendStatus(405);
});
router.put('/resource/:id', function(req, res) {
router.put('/resource/:uuid', function(req, res) {
res.sendStatus(405);
});

// Create a SSE
router.post('/sse/:uuid', function(req, res) {
let uuid = req.params.uuid;
let subscriber = subscribers[uuid];
if(!uuid || !subscriber) {
res.status(404);
res.send('Invalid resource ID');
return;
}
let endpoint = endpoints[subscriber.whepId];
if(!endpoint) {
res.status(404);
res.send('Invalid WHEP endpoint');
return;
}
// Make sure we received a JSON array
if(req.headers['content-type'] !== 'application/json' || !Array.isArray(req.body)) {
res.status(406);
res.send('Unsupported content type');
return;
}
if(!subscriber.sse) {
subscriber.sse = {};
for(let ev of req.body)
subscriber.sse[ev] = true;
// FIXME
subscriber.events = [];
// Send a viewercount event right away
subscriber.events.push({
type: 'viewercount',
data: JSON.stringify({ viewercount: Object.keys(endpoint.subscribers).length })
});
}
res.setHeader('Location', config.rest + '/sse/' + uuid);
// Done
res.sendStatus(201);
});

// Helper function to wait some time (needed for long poll)
async function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
}).catch(function() {});
};

// Long poll associated with an existing SSE
router.get('/sse/:uuid', async function(req, res) {
let uuid = req.params.uuid;
let subscriber = subscribers[uuid];
if(!uuid || !subscriber || !subscriber.sse) {
res.status(404);
res.send('Invalid subscription');
return;
}
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Connection', 'keep-alive');
res.write('retry: 2000\n\n');
while(subscriber.events) {
if(subscriber.events.length > 0) {
let event = subscriber.events.shift();
if(event.type && subscriber.sse && subscriber.sse[event.type]) {
res.write('event: ' + event.type + '\n');
res.write('data: ' + event.data + '\n\n');
}
} else {
await sleep(200);
}
}
});

// Get rid of an existing SSE
router.delete('/sse/:uuid', async function(req, res) {
let uuid = req.params.uuid;
let subscriber = subscribers[uuid];
if(!uuid || !subscriber || !subscriber.sse) {
res.status(404);
res.send('Invalid subscription');
return;
}
delete subscriber.sse;
delete subscriber.events;
// Done
res.sendStatus(200);
});

// Simple, non-standard, interface to destroy existing endpoints
router.delete('/endpoint/:id', function(req, res) {
let id = req.params.id;
Expand Down Expand Up @@ -548,5 +674,56 @@ function setupRest(app) {
app.use(bodyParser.json());
app.use(bodyParser.text({ type: 'application/sdp' }));
app.use(bodyParser.text({ type: 'application/trickle-ice-sdpfrag' }));
app.use(bodyParser.text({ type: 'application/json' }));
app.use(config.rest, router);
}

// Helper fucntion to monitor endpoints/mountpoints
function monitorEndpoint(endpoint) {
if(!endpoint)
return;
let id = endpoint.id;
setTimeout(function() {
let endpoint = endpoints[id];
if(!endpoint)
return;
if(!janus || !janus.isReady() || janus.getState() !== "connected") {
// Try again later
monitorEndpoint(endpoint);
return;
}
let details = {
whepId: endpoint.id,
mountpoint: endpoint.mountpoint
};
janus.isMountpointActive(details, function(err, res) {
if(err) {
// Try again later
whep.err(err);
monitorEndpoint(endpoint);
return;
}
if(res.active !== endpoint.active) {
// Notify endpoint status
endpoint.active = res.active;
notifyEndpointSubscribers(endpoint, {
type: (endpoint.active ? 'active' : 'inactive'),
data: JSON.stringify({})
});
}
// Done, schedule a new check for later
monitorEndpoint(endpoint);
});
}, 2000);
}

// Helper function to notify events to all subscribers of an endpoint
function notifyEndpointSubscribers(endpoint, event) {
if(!endpoint || !event)
return;
for(let uuid in endpoint.subscribers) {
let s = subscribers[uuid];
if(s && s.sse && s.events)
s.events.push(event);
}
}
71 changes: 68 additions & 3 deletions src/whep-janus.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,22 @@ var whepJanus = function(janusConfig) {
that.config.janus.multistream = (response.version >= 1000);
whep.info("Janus instance version: " + response.version_string + " (" +
(that.config.janus.multistream ? "multistream" : "legacy") + ")");
// We're done
that.config.janus.state = "connected";
callback();
// Finally, create a manager handle we'll use for monitoring
let attach = {
janus: "attach",
session_id: that.config.janus.session.id,
plugin: "janus.plugin.streaming"
};
janusSend(attach, function(response) {
whep.debug("Attach response:", response);
// Unsubscribe from the transaction
delete that.config.janus.transactions[response["transaction"]];
// Take note of the handle ID
that.config.janus.manager = response["data"]["id"];
// We're done
that.config.janus.state = "connected";
callback();
});
});
});
});
Expand All @@ -195,6 +208,58 @@ var whepJanus = function(janusConfig) {
delete sessions[uuid];
};

// Public method to retrieve info on a specific mountpoint: we use this
// to let WHEP endpoints monitor when a mountpoint becomes active/inactive
this.isMountpointActive = function(details, callback) {
callback = (typeof callback === "function") ? callback : noop;
if(!details.mountpoint || !details.whepId) {
callback({ error: "Missing mandatory attribute(s)" });
return;
}
let mountpoint = details.mountpoint;
let whepId = details.whepId;
// Send a different request according to the medium we're setting up
let info = {
janus: "message",
session_id: that.config.janus.session.id,
handle_id: that.config.janus.manager,
body: {
request: "info",
id: mountpoint
}
};
janusSend(info, function(response) {
let event = response["janus"];
// Get the plugin data: is this a success or an error?
let data = response.plugindata.data;
if(data.error) {
// Unsubscribe from the call transaction
delete that.config.janus.transactions[response["transaction"]];
whep.err("Got an error querying mountpoint:", data.error);
callback({ error: data.error });
return;
}
let active = false;
let info = data.info;
if(that.config.janus.multistream) {
// Janus 1.x response, iterate on the media array
for(let m of info.media) {
if(m.age_ms && m.age_ms < 1000) {
active = true;
break;
}
}
} else {
// Janus 0.x response
if((info.audio_age_ms && info.audio_age_ms < 1000) ||
(info.video_age_ms && info.video_age_ms < 1000))
active = true;
}
// Done
callback(null, { whepId: whepId, active: active });
});
};

// Public method for subscribing to a Streaming plugin mountpoint
this.subscribe = function(details, callback) {
callback = (typeof callback === "function") ? callback : noop;
Expand Down
Loading