From 78762233d0f09b4ad19f55774f71a5bcfb6a8209 Mon Sep 17 00:00:00 2001
From: Lorenzo Miniero <lminiero@gmail.com>
Date: Tue, 11 Apr 2023 16:16:17 +0200
Subject: [PATCH 1/2] Add support for subscriber offer (requires specific
 version of Janus)

---
 src/server.js     |  19 +++--
 src/whep-janus.js |   7 +-
 web/watch.js      | 197 ++++++++++++++++++++++++++++++++--------------
 3 files changed, 154 insertions(+), 69 deletions(-)

diff --git a/src/server.js b/src/server.js
index c433e33..cff1513 100644
--- a/src/server.js
+++ b/src/server.js
@@ -212,12 +212,16 @@ function setupRest(app) {
 			return;
 		}
 		whep.debug("/endpoint/:", id);
-		// If we received an SDP, the client is providing an offer
+		// If we received a payload, make sure it's an SDP
 		whep.debug(req.body);
-		if(req.headers["content-type"] === "application/sdp" && req.body.indexOf('v=0') >= 0) {
-			res.status(403);
-			res.send('Client offers unsupported');
-			return;
+		let offer = null;
+		if(req.headers["content-type"]) {
+			if(req.headers["content-type"] !== "application/sdp" || req.body.indexOf('v=0') < 0) {
+				res.status(406);
+				res.send('Unsupported content type');
+				return;
+			}
+			offer = req.body;
 		}
 		// Check the Bearer token
 		let auth = req.headers["authorization"];
@@ -264,11 +268,12 @@ function setupRest(app) {
 		let details = {
 			uuid: uuid,
 			mountpoint: endpoint.mountpoint,
-			pin: endpoint.pin
+			pin: endpoint.pin,
+			sdp: offer
 		};
 		subscriber.enabled = true;
 		janus.subscribe(details, function(err, result) {
-			// Make sure we got an OFFER back
+			// Make sure we got an SDP back
 			if(err) {
 				delete subscribers[uuid];
 				res.status(500);
diff --git a/src/whep-janus.js b/src/whep-janus.js
index 3f11253..6123db2 100644
--- a/src/whep-janus.js
+++ b/src/whep-janus.js
@@ -205,6 +205,7 @@ var whepJanus = function(janusConfig) {
 		}
 		let mountpoint = details.mountpoint;
 		let pin = details.pin;
+		let sdp = details.sdp;
 		let uuid = details.uuid;
 		let session = sessions[uuid];
 		if(!session) {
@@ -263,6 +264,10 @@ var whepJanus = function(janusConfig) {
 					pin: pin
 				}
 			};
+			if(sdp) {
+				// We're going to let the user provide the SDP offer
+				subscribe.jsep = { type: 'offer', sdp: sdp };
+			}
 			janusSend(subscribe, function(response) {
 				let event = response["janus"];
 				if(event === "error") {
@@ -284,7 +289,7 @@ var whepJanus = function(janusConfig) {
 					callback({ error: data.error });
 					return;
 				}
-				whep.debug("Got an offer for session " + uuid + ":", data);
+				whep.debug("Got an SDP for session " + uuid + ":", data);
 				if(data["reason"]) {
 					// Unsubscribe from the transaction
 					delete that.config.janus.transactions[response["transaction"]];
diff --git a/web/watch.js b/web/watch.js
index 0481eb2..b81bd05 100644
--- a/web/watch.js
+++ b/web/watch.js
@@ -1,10 +1,10 @@
 // Base path for the REST WHEP API
 var rest = '/whep';
-var resource = null;
+var resource = null, token = null;
 
 // PeerConnection
 var pc = null;
-var iceUfrag = null, icePwd = null;
+var iceUfrag = null, icePwd = null, candidates = [];
 
 // Helper function to get query string arguments
 function getQueryStringValue(name) {
@@ -15,6 +15,8 @@ function getQueryStringValue(name) {
 }
 // Get the endpoint ID to subscribe to
 var id = getQueryStringValue('id');
+// Check if we should let the endpoint send the offer
+var sendOffer = (getQueryStringValue('offer') === 'true')
 
 $(document).ready(function() {
 	// Make sure WebRTC is supported by the browser
@@ -30,88 +32,93 @@ $(document).ready(function() {
 		title: 'Insert the endpoint token (leave it empty if not needed)',
 		inputType: 'password',
 		callback: function(result) {
-			subscribeToEndpoint(result);
+			token = result;
+			subscribeToEndpoint();
 		}
 	});
 });
 
 // Function to subscribe to the WHEP endpoint
-function subscribeToEndpoint(token) {
-	let headers = null;
+async function subscribeToEndpoint() {
+	let headers = null, offer = null;
 	if(token)
 		headers = { Authorization: 'Bearer ' + token };
+	if(sendOffer) {
+		// We need to prepare an offer ourselves, do it now
+		let iceServers = [{urls: "stun:stun.l.google.com:19302"}];
+		createPeerConnectionIfNeeded(iceServers);
+		let transceiver = await pc.addTransceiver('audio');
+		if(transceiver.setDirection)
+			transceiver.setDirection('recvonly');
+		else
+			transceiver.direction = 'recvonly';
+		transceiver = await pc.addTransceiver('video');
+		if(transceiver.setDirection)
+			transceiver.setDirection('recvonly');
+		else
+			transceiver.direction = 'recvonly';
+		offer = await pc.createOffer({});
+		await pc.setLocalDescription(offer);
+		// Extract ICE ufrag and pwd (for trickle)
+		iceUfrag = offer.sdp.match(/a=ice-ufrag:(.*)\r\n/)[1];
+		icePwd = offer.sdp.match(/a=ice-pwd:(.*)\r\n/)[1];
+	}
+	// Contact the WHEP endpoint
 	$.ajax({
 		url: rest + '/endpoint/' + id,
 		type: 'POST',
 		headers: headers,
-		data: {}
+		contentType: offer ? 'application/sdp' : null,
+		data: offer ? offer.sdp : {}
 	}).error(function(xhr, textStatus, errorThrown) {
 		bootbox.alert(xhr.status + ": " + xhr.responseText);
 	}).success(function(sdp, textStatus, request) {
-		console.log('Got offer:', sdp);
+		console.log('Got SDP:', sdp);
 		resource = request.getResponseHeader('Location');
 		console.log('WHEP resource:', resource);
 		// TODO Parse ICE servers
 		// let ice = request.getResponseHeader('Link');
 		let iceServers = [{urls: "stun:stun.l.google.com:19302"}];
-		// Create PeerConnection
-		let pc_config = {
-			sdpSemantics: 'unified-plan',
-			iceServers: iceServers
+		// Create PeerConnection, if needed
+		createPeerConnectionIfNeeded(iceServers);
+		// Pass the SDP to the PeerConnection
+		let jsep = {
+			type: sendOffer ? 'answer' : 'offer',
+			sdp: sdp
 		};
-		pc = new RTCPeerConnection(pc_config);
-		pc.oniceconnectionstatechange = function() {
-			console.log('[ICE] ', pc.iceConnectionState);
-		};
-		pc.onicecandidate = function(event) {
-			let end = false;
-			if(!event.candidate || (event.candidate.candidate && event.candidate.candidate.indexOf('endOfCandidates') > 0)) {
-				console.log('End of candidates');
-				end = true;
-			} else {
-				console.log('Got candidate:', event.candidate.candidate);
-			}
-			if(!resource) {
-				console.warn('No resource URL, ignoring candidate');
-				return;
-			}
-			if(!iceUfrag || !icePwd) {
-				console.warn('No ICE credentials, ignoring candidate');
-				return;
-			}
-			// FIXME Trickle candidate
-			let candidate =
-				'a=ice-ufrag:' + iceUfrag + '\r\n' +
-				'a=ice-pwd:' + icePwd + '\r\n' +
-				'm=audio 9 RTP/AVP 0\r\n' +
-				'a=' + (end ? 'end-of-candidates' : event.candidate.candidate) + '\r\n';
-			$.ajax({
-				url: resource,
-				type: 'PATCH',
-				headers: headers,
-				contentType: 'application/trickle-ice-sdpfrag',
-				data: candidate
-			}).error(function(xhr, textStatus, errorThrown) {
-				bootbox.alert(xhr.status + ": " + xhr.responseText);
-			}).done(function(response) {
-				console.log('Candidate sent');
-			});
-		};
-		pc.ontrack = function(event) {
-			console.log('Handling Remote Track', event);
-			if(!event.streams)
-				return;
-			if($('#whepvideo').length === 0) {
-				$('#video').removeClass('hide').show();
-				$('#videoremote').append('<video class="rounded centered" id="whepvideo" width="100%" height="100%" autoplay playsinline/>');
-			}
-			attachMediaStream($('#whepvideo').get(0), event.streams[0]);
-		};
-		// Pass the offer to the PeerConnection
-		let jsep = { type: 'offer', sdp: sdp };
 		pc.setRemoteDescription(jsep)
 			.then(function() {
 				console.log('Remote description accepted');
+				if(sendOffer) {
+					// We're done: just check if we have candidates to send
+					if(candidates.length > 0) {
+						// FIXME Trickle candidate
+						let headers = null;
+						if(token)
+							headers = { Authorization: 'Bearer ' + token };
+						let candidate =
+							'a=ice-ufrag:' + iceUfrag + '\r\n' +
+							'a=ice-pwd:' + icePwd + '\r\n' +
+							'm=audio 9 RTP/AVP 0\r\n';
+						for(let c of candidates)
+							candidate += 'a=' + c + '\r\n';
+						candidates = [];
+						$.ajax({
+							url: resource,
+							type: 'PATCH',
+							headers: headers,
+							contentType: 'application/trickle-ice-sdpfrag',
+							data: candidate
+						}).error(function(xhr, textStatus, errorThrown) {
+							bootbox.alert(xhr.status + ": " + xhr.responseText);
+						}).done(function(response) {
+							console.log('Candidate sent');
+						});
+					}
+					return;
+				}
+				// If we got here, we're in the "WHIP server sends offer" mode,
+				// so we have to prepare an answer to send back via a PATCH
 				pc.createAnswer({})
 					.then(function(answer) {
 						console.log('Prepared answer:', answer.sdp);
@@ -157,3 +164,71 @@ function attachMediaStream(element, stream) {
 		}
 	}
 };
+
+// Helper function to create a PeerConnection, if needed, since we can either
+// expect an offer from the WHEP server, or provide one ourselves
+function createPeerConnectionIfNeeded(iceServers) {
+	if(pc)
+		return;
+	let pc_config = {
+		sdpSemantics: 'unified-plan',
+		iceServers: iceServers
+	};
+	pc = new RTCPeerConnection(pc_config);
+	pc.oniceconnectionstatechange = function() {
+		console.log('[ICE] ', pc.iceConnectionState);
+	};
+	pc.onicecandidate = function(event) {
+		let end = false;
+		if(!event.candidate || (event.candidate.candidate && event.candidate.candidate.indexOf('endOfCandidates') > 0)) {
+			console.log('End of candidates');
+			end = true;
+		} else {
+			console.log('Got candidate:', event.candidate.candidate);
+		}
+		if(!resource) {
+			console.log('No resource URL yet, queueing candidate');
+			candidates.push(end ? 'end-of-candidates' : event.candidate.candidate);
+			return;
+		}
+		if(!iceUfrag || !icePwd) {
+			console.log('No ICE credentials yet, queueing candidate');
+			candidates.push(end ? 'end-of-candidates' : event.candidate.candidate);
+			return;
+		}
+		// FIXME Trickle candidate
+		let headers = null;
+		if(token)
+			headers = { Authorization: 'Bearer ' + token };
+		let candidate =
+			'a=ice-ufrag:' + iceUfrag + '\r\n' +
+			'a=ice-pwd:' + icePwd + '\r\n' +
+			'm=audio 9 RTP/AVP 0\r\n' +
+			'a=' + (end ? 'end-of-candidates' : event.candidate.candidate) + '\r\n';
+		$.ajax({
+			url: resource,
+			type: 'PATCH',
+			headers: headers,
+			contentType: 'application/trickle-ice-sdpfrag',
+			data: candidate
+		}).error(function(xhr, textStatus, errorThrown) {
+			bootbox.alert(xhr.status + ": " + xhr.responseText);
+		}).done(function(response) {
+			console.log('Candidate sent');
+		});
+	};
+	pc.ontrack = function(event) {
+		console.log('Handling Remote Track', event);
+		if(!event.streams)
+			return;
+		console.warn(event.streams[0].getTracks());
+		if($('#whepvideo').length === 0) {
+			$('#video').removeClass('hide').show();
+			$('#videoremote').append('<video class="rounded centered" id="whepvideo" width="100%" height="100%" autoplay playsinline/>');
+			$('#whenvideo').get(0).volume = 0;
+		}
+		attachMediaStream($('#whepvideo').get(0), event.streams[0]);
+		$('#whepvideo').get(0).play();
+		$('#whepvideo').get(0).volume = 1;
+	};
+}

From 4de9cc46e6cec36049b133101410111114291f58 Mon Sep 17 00:00:00 2001
From: Lorenzo Miniero <lminiero@gmail.com>
Date: Mon, 17 Apr 2023 14:55:27 +0200
Subject: [PATCH 2/2] Initial effort to implement SSE

---
 src/server.js     | 195 +++++++++++++++++++++++++++++++++++++++++++---
 src/whep-janus.js |  71 ++++++++++++++++-
 web/watch.js      |  54 +++++++++++--
 3 files changed, 303 insertions(+), 17 deletions(-)

diff --git a/src/server.js b/src/server.js
index cff1513..9935dfc 100644
--- a/src/server.js
+++ b/src/server.js
@@ -190,9 +190,12 @@ function setupRest(app) {
 			token: token,
 			iceServers: iceServers,
 			subscribers: {},
-			enabled: false
+			enabled: false,
+			active: false,
 		};
 		whep.info('[' + id + '] Created new WHEP endpoint');
+		// Monitor the state of the mountpoint on a regular basis
+		monitorEndpoint(endpoints[id]);
 		// Done
 		res.sendStatus(200);
 	});
@@ -259,8 +262,19 @@ function setupRest(app) {
 				let subscriber = subscribers[uuid];
 				if(subscriber) {
 					whep.info('[' + subscriber.whepId + '][' + uuid + '] PeerConnection detected as closed');
-					janus.removeSession({ uuid: endpoint.uuid });
+					janus.removeSession({ uuid: uuid });
+					delete subscriber.sse;
 					delete subscribers[uuid];
+					let endpoint = endpoints[subscriber.whepId];
+					if(endpoint) {
+						delete endpoint.subscribers[uuid];
+						// Notify updated viewers count
+						let count = Object.keys(endpoint.subscribers).length;
+						notifyEndpointSubscribers(endpoint, {
+							type: 'viewercount',
+							data: JSON.stringify({ viewercount: count })
+						});
+					}
 				}
 			}
 		});
@@ -283,15 +297,21 @@ function setupRest(app) {
 				endpoint.subscribers[uuid] = true;
 				subscriber.resource = config.rest + '/resource/' + uuid;
 				subscriber.latestEtag = janus.generateRandomString(16);
+				// Notify updated viewers count
+				let count = Object.keys(endpoint.subscribers).length;
+				notifyEndpointSubscribers(endpoint, {
+					type: 'viewercount',
+					data: JSON.stringify({ viewercount: count })
+				});
 				// Done
 				res.setHeader('Access-Control-Expose-Headers', 'Location, Link');
 				res.setHeader('Accept-Patch', 'application/trickle-ice-sdpfrag');
 				res.setHeader('Location', subscriber.resource);
 				res.set('ETag', '"' + subscriber.latestEtag + '"');
 				let iceServers = endpoint.iceServers ? endpoint.iceServers : config.iceServers;
+				let links = [];
 				if(iceServers && iceServers.length > 0) {
 					// Add a Link header for each static ICE server
-					let links = [];
 					for(let server of iceServers) {
 						if(!server.uri || (server.uri.indexOf('stun:') !== 0 &&
 								server.uri.indexOf('turn:') !== 0 &&
@@ -306,8 +326,13 @@ function setupRest(app) {
 						}
 						links.push(link);
 					}
-					res.setHeader('Link', links);
 				}
+				// Advertise support for SSE
+				let link = '<' + config.rest + '/sse/' + uuid + '>; ' +
+					'rel="urn:ietf:params:whep:ext:core:server-sent-events"; ' +
+					'events="active,inactive,layers,viewercount"';
+				links.push(link);
+				res.setHeader('Link', links);
 				res.writeHeader(201, { 'Content-Type': 'application/sdp' });
 				res.write(result.jsep.sdp);
 				res.end();
@@ -358,8 +383,16 @@ function setupRest(app) {
 			janus.finalize(details, function(err, result) {
 				if(err) {
 					let endpoint = endpoints[subscriber.whepId];
-					if(endpoint)
+					if(endpoint) {
 						delete endpoint.subscribers[uuid];
+						// Notify updated viewers count
+						let count = Object.keys(endpoint.subscribers).length;
+						notifyEndpointSubscribers(endpoint, {
+							type: 'viewercount',
+							data: JSON.stringify({ viewercount: count })
+						});
+					}
+					delete subscriber.sse;
 					delete subscribers[uuid];
 					res.status(500);
 					res.send(err.error);
@@ -481,6 +514,13 @@ function setupRest(app) {
 		if(janus)
 			janus.removeSession({ uuid: uuid });
 		delete endpoint.subscribers[uuid];
+		// Notify updated viewers count
+		let count = Object.keys(endpoint.subscribers).length;
+		notifyEndpointSubscribers(endpoint, {
+			type: 'viewercount',
+			data: JSON.stringify({ viewercount: count })
+		});
+		delete subscriber.sse;
 		delete subscribers[uuid];
 		whep.info('[' + uuid + '] Terminating WHEP session');
 		// Done
@@ -488,19 +528,105 @@ function setupRest(app) {
 	});
 
 	// GET, HEAD, POST and PUT on the resource must return a 405
-	router.get('/resource/:id', function(req, res) {
+	router.get('/resource/:uuid', function(req, res) {
 		res.sendStatus(405);
 	});
-	router.head('/resource/:id', function(req, res) {
+	router.head('/resource/:uuid', function(req, res) {
 		res.sendStatus(405);
 	});
-	router.post('/resource/:id', function(req, res) {
+	router.post('/resource/:uuid', function(req, res) {
 		res.sendStatus(405);
 	});
-	router.put('/resource/:id', function(req, res) {
+	router.put('/resource/:uuid', function(req, res) {
 		res.sendStatus(405);
 	});
 
+	// Create a SSE
+	router.post('/sse/:uuid', function(req, res) {
+		let uuid = req.params.uuid;
+		let subscriber = subscribers[uuid];
+		if(!uuid || !subscriber) {
+			res.status(404);
+			res.send('Invalid resource ID');
+			return;
+		}
+		let endpoint = endpoints[subscriber.whepId];
+		if(!endpoint) {
+			res.status(404);
+			res.send('Invalid WHEP endpoint');
+			return;
+		}
+		// Make sure we received a JSON array
+		if(req.headers['content-type'] !== 'application/json' || !Array.isArray(req.body)) {
+			res.status(406);
+			res.send('Unsupported content type');
+			return;
+		}
+		if(!subscriber.sse) {
+			subscriber.sse = {};
+			for(let ev of req.body)
+				subscriber.sse[ev] = true;
+			// FIXME
+			subscriber.events = [];
+			// Send a viewercount event right away
+			subscriber.events.push({
+				type: 'viewercount',
+				data: JSON.stringify({ viewercount: Object.keys(endpoint.subscribers).length })
+			});
+		}
+		res.setHeader('Location', config.rest + '/sse/' + uuid);
+		// Done
+		res.sendStatus(201);
+	});
+
+	// Helper function to wait some time (needed for long poll)
+	async function sleep(ms) {
+		return new Promise((resolve) => {
+			setTimeout(resolve, ms);
+		}).catch(function() {});
+	};
+
+	// Long poll associated with an existing SSE
+	router.get('/sse/:uuid', async function(req, res) {
+		let uuid = req.params.uuid;
+		let subscriber = subscribers[uuid];
+		if(!uuid || !subscriber || !subscriber.sse) {
+			res.status(404);
+			res.send('Invalid subscription');
+			return;
+		}
+		res.setHeader('Cache-Control', 'no-cache');
+		res.setHeader('Content-Type', 'text/event-stream');
+		res.setHeader('Connection', 'keep-alive');
+		res.write('retry: 2000\n\n');
+		while(subscriber.events) {
+			if(subscriber.events.length > 0) {
+				let event = subscriber.events.shift();
+				if(event.type && subscriber.sse && subscriber.sse[event.type]) {
+					res.write('event: ' + event.type + '\n');
+					res.write('data: ' + event.data + '\n\n');
+				}
+			} else {
+				await sleep(200);
+			}
+		}
+	});
+
+	// Get rid of an existing SSE
+	router.delete('/sse/:uuid', async function(req, res) {
+		let uuid = req.params.uuid;
+		let subscriber = subscribers[uuid];
+		if(!uuid || !subscriber || !subscriber.sse) {
+			res.status(404);
+			res.send('Invalid subscription');
+			return;
+		}
+		delete subscriber.sse;
+		delete subscriber.events;
+		// Done
+		res.sendStatus(200);
+	});
+
 	// Simple, non-standard, interface to destroy existing endpoints
 	router.delete('/endpoint/:id', function(req, res) {
 		let id = req.params.id;
@@ -548,5 +674,56 @@ function setupRest(app) {
 	app.use(bodyParser.json());
 	app.use(bodyParser.text({ type: 'application/sdp' }));
 	app.use(bodyParser.text({ type: 'application/trickle-ice-sdpfrag' }));
+	app.use(bodyParser.text({ type: 'application/json' }));
 	app.use(config.rest, router);
 }
+
+// Helper fucntion to monitor endpoints/mountpoints
+function monitorEndpoint(endpoint) {
+	if(!endpoint)
+		return;
+	let id = endpoint.id;
+	setTimeout(function() {
+		let endpoint = endpoints[id];
+		if(!endpoint)
+			return;
+		if(!janus || !janus.isReady() || janus.getState() !== "connected") {
+			// Try again later
+			monitorEndpoint(endpoint);
+			return;
+		}
+		let details = {
+			whepId: endpoint.id,
+			mountpoint: endpoint.mountpoint
+		};
+		janus.isMountpointActive(details, function(err, res) {
+			if(err) {
+				// Try again later
+				whep.err(err);
+				monitorEndpoint(endpoint);
+				return;
+			}
+			if(res.active !== endpoint.active) {
+				// Notify endpoint status
+				endpoint.active = res.active;
+				notifyEndpointSubscribers(endpoint, {
+					type: (endpoint.active ? 'active' : 'inactive'),
+					data: JSON.stringify({})
+				});
+			}
+			// Done, schedule a new check for later
+			monitorEndpoint(endpoint);
+		});
+	}, 2000);
+}
+
+// Helper function to notify events to all subscribers of an endpoint
+function notifyEndpointSubscribers(endpoint, event) {
+	if(!endpoint || !event)
+		return;
+	for(let uuid in endpoint.subscribers) {
+		let s = subscribers[uuid];
+		if(s && s.sse && s.events)
+			s.events.push(event);
+	}
+}
diff --git a/src/whep-janus.js b/src/whep-janus.js
index 6123db2..27c6047 100644
--- a/src/whep-janus.js
+++ b/src/whep-janus.js
@@ -170,9 +170,22 @@ var whepJanus = function(janusConfig) {
 					that.config.janus.multistream = (response.version >= 1000);
 					whep.info("Janus instance version: " + response.version_string + " (" +
 						(that.config.janus.multistream ? "multistream" : "legacy") + ")");
-					// We're done
-					that.config.janus.state = "connected";
-					callback();
+					// Finally, create a manager handle we'll use for monitoring
+					let attach = {
+						janus: "attach",
+						session_id: that.config.janus.session.id,
+						plugin: "janus.plugin.streaming"
+					};
+					janusSend(attach, function(response) {
+						whep.debug("Attach response:", response);
+						// Unsubscribe from the transaction
+						delete that.config.janus.transactions[response["transaction"]];
+						// Take note of the handle ID
+						that.config.janus.manager = response["data"]["id"];
+						// We're done
+						that.config.janus.state = "connected";
+						callback();
+					});
 				});
 			});
 		});
@@ -195,6 +208,58 @@ var whepJanus = function(janusConfig) {
 		delete sessions[uuid];
 	};
 
+	// Public method to retrieve info on a specific mountpoint: we use this
+	// to let WHEP endpoints monitor when a mountpoint becomes active/inactive
+	this.isMountpointActive = function(details, callback) {
+		callback = (typeof callback === "function") ? callback : noop;
+		if(!details.mountpoint || !details.whepId) {
+			callback({ error: "Missing mandatory attribute(s)" });
+			return;
+		}
+		let mountpoint = details.mountpoint;
+		let whepId = details.whepId;
+		// Send a different request according to the medium we're setting up
+		let info = {
+			janus: "message",
+			session_id: that.config.janus.session.id,
+			handle_id: that.config.janus.manager,
+			body: {
+				request: "info",
+				id: mountpoint
+			}
+		};
+		janusSend(info, function(response) {
+			let event = response["janus"];
+			// Get the plugin data: is this a success or an error?
+			let data = response.plugindata.data;
+			if(data.error) {
+				// Unsubscribe from the call transaction
+				delete that.config.janus.transactions[response["transaction"]];
+				whep.err("Got an error querying mountpoint:", data.error);
+				callback({ error: data.error });
+				return;
+			}
+			let active = false;
+			let info = data.info;
+			if(that.config.janus.multistream) {
+				// Janus 1.x response, iterate on the media array
+				for(let m of info.media) {
+					if(m.age_ms && m.age_ms < 1000) {
+						active = true;
+						break;
+					}
+				}
+			} else {
+				// Janus 0.x response
+				if((info.audio_age_ms && info.audio_age_ms < 1000) ||
+						(info.video_age_ms && info.video_age_ms < 1000))
+					active = true;
+			}
+			// Done
+			callback(null, { whepId: whepId, active: active });
+		});
+	};
+
 	// Public method for subscribing to a Streaming plugin mountpoint
 	this.subscribe = function(details, callback) {
 		callback = (typeof callback === "function") ? callback : noop;
diff --git a/web/watch.js b/web/watch.js
index b81bd05..e601691 100644
--- a/web/watch.js
+++ b/web/watch.js
@@ -76,9 +76,24 @@ async function subscribeToEndpoint() {
 		console.log('Got SDP:', sdp);
 		resource = request.getResponseHeader('Location');
 		console.log('WHEP resource:', resource);
-		// TODO Parse ICE servers
-		// let ice = request.getResponseHeader('Link');
-		let iceServers = [{urls: "stun:stun.l.google.com:19302"}];
+		// FIXME Parse Link headers (for ICE servers and/or SSE)
+		let iceServers = [];
+		let links = request.getResponseHeader('Link');
+		let l = links.split('<');
+		for(let i of l) {
+			if(!i || i.length === 0)
+				continue;
+			if(i.indexOf('ice-server') !== -1) {
+				// TODO Parse TURN attributes
+				let url = i.split('>')[0];
+				iceServers.push({ urls: url });
+			} else if(i.indexOf('urn:ietf:params:whep:ext:core:server-sent-events') !== -1) {
+				// TODO Parse event attribute
+				let url = i.split('>')[0];
+				let events = [ 'active', 'inactive', 'layers', 'viewercount' ];
+				startSSE(url, events);
+			}
+		}
 		// Create PeerConnection, if needed
 		createPeerConnectionIfNeeded(iceServers);
 		// Pass the SDP to the PeerConnection
@@ -221,14 +236,43 @@ function createPeerConnectionIfNeeded(iceServers) {
 		console.log('Handling Remote Track', event);
 		if(!event.streams)
 			return;
-		console.warn(event.streams[0].getTracks());
 		if($('#whepvideo').length === 0) {
 			$('#video').removeClass('hide').show();
 			$('#videoremote').append('<video class="rounded centered" id="whepvideo" width="100%" height="100%" autoplay playsinline/>');
-			$('#whenvideo').get(0).volume = 0;
+			$('#whepvideo').get(0).volume = 0;
 		}
 		attachMediaStream($('#whepvideo').get(0), event.streams[0]);
 		$('#whepvideo').get(0).play();
 		$('#whepvideo').get(0).volume = 1;
 	};
 }
+
+// Helper function to subscribe to events via SSE
+function startSSE(url, events) {
+	console.warn('Starting SSE:', url);
+	$.ajax({
+		url: url,
+		type: 'POST',
+		contentType: 'application/json',
+		data: JSON.stringify(events)
+	}).error(function(xhr, textStatus, errorThrown) {
+		bootbox.alert(xhr.status + ": " + xhr.responseText);
+	}).success(function(res, textStatus, request) {
+		// Done, access the Location header
+		let sse = request.getResponseHeader('Location');
+		console.warn('SSE Location:', sse);
+		let source = new EventSource(sse);
+		source.addEventListener('active', message => {
+			console.warn('Got', message);
+		});
+		source.addEventListener('inactive', message => {
+			console.warn('Got', message);
+		});
+		source.addEventListener('viewercount', message => {
+			console.warn('Got', message);
+		});
+		source.addEventListener('layer', message => {
+			console.warn('Got', message);
+		});
+	});
+}