Skip to content

Commit 4023ad9

Browse files
authored
Merge pull request #1 from membraneframework-labs/mpeg_ts_client_poc
Create HLS client
2 parents c0b8caa + 0746339 commit 4023ad9

File tree

13 files changed

+663
-36
lines changed

13 files changed

+663
-36
lines changed

README.md

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
1-
# Membrane Template Plugin
1+
# ExHLS
22

3-
[![Hex.pm](https://img.shields.io/hexpm/v/membrane_template_plugin.svg)](https://hex.pm/packages/membrane_template_plugin)
4-
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_template_plugin)
5-
[![CircleCI](https://circleci.com/gh/membraneframework/membrane_template_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_template_plugin)
3+
[![Hex.pm](https://img.shields.io/hexpm/v/ex_hls.svg)](https://hex.pm/packages/ex_hls)
4+
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/ex_hls)
5+
[![CircleCI](https://circleci.com/gh/membraneframework/ex_hls.svg?style=svg)](https://circleci.com/gh/membraneframework/ex_hls)
66

7-
This repository contains a template for new plugins.
8-
9-
Check out different branches for other flavors of this template.
7+
This repository contains ExHLS - an Elixir package for handling HLS streams
108

119
It's a part of the [Membrane Framework](https://membrane.stream).
1210

1311
## Installation
1412

15-
The package can be installed by adding `membrane_template_plugin` to your list of dependencies in `mix.exs`:
13+
The package can be installed by adding `ex_hls` to your list of dependencies in `mix.exs`:
1614

1715
```elixir
1816
def deps do
1917
[
20-
{:membrane_template_plugin, "~> 0.1.0"}
18+
{:ex_hls, "~> 0.1.0"}
2119
]
2220
end
2321
```
@@ -28,8 +26,8 @@ TODO
2826

2927
## Copyright and License
3028

31-
Copyright 2020, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin)
29+
Copyright 2025, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=ex_hls)
3230

33-
[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_template_plugin)
31+
[![Software Mansion](https://logo.swmansion.com/logo?color=white&variant=desktop&width=200&tag=membrane-github)](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=ex_hls)
3432

3533
Licensed under the [Apache License, Version 2.0](LICENSE)

fixture/fileSequence0.m4s

686 KB
Binary file not shown.

fixture/init.mp4

1.34 KB
Binary file not shown.

fixture/output.m3u8

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#EXTM3U
2+
#EXT-X-VERSION:7
3+
#EXT-X-TARGETDURATION:10
4+
#EXT-X-MEDIA-SEQUENCE:0
5+
#EXT-X-PLAYLIST-TYPE:VOD
6+
#EXT-X-MAP:URI="init.mp4"
7+
#EXTINF:10.001628,
8+
fileSequence0.m4s
9+
#EXT-X-ENDLIST

lib/ex_hls/chunk.ex

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
defmodule ExHLS.Chunk do
2+
@moduledoc """
3+
A struct representing a media chunk in the ExHLS demuxing engine.
4+
"""
5+
@enforce_keys [:payload, :pts_ms, :dts_ms, :track_id]
6+
defstruct @enforce_keys ++ [metadata: %{}]
7+
8+
@type t :: %__MODULE__{
9+
payload: binary(),
10+
pts_ms: integer(),
11+
dts_ms: integer(),
12+
track_id: term(),
13+
metadata: map()
14+
}
15+
16+
# timestamps need to be represented in milliseconds
17+
@time_base 1000
18+
19+
@spec time_base() :: integer()
20+
def time_base(), do: @time_base
21+
end

lib/ex_hls/client.ex

Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
defmodule ExHLS.Client do
2+
@moduledoc """
3+
Module providing functionality to read and demux HLS streams.
4+
It allows reading chunks from the stream, choosing variants, and managing media playlists.
5+
"""
6+
7+
alias ExHLS.DemuxingEngine
8+
alias Membrane.{AAC, H264, RemoteStream}
9+
10+
@opaque client :: map()
11+
@type chunk :: any()
12+
13+
@type variant_description :: %{
14+
id: integer(),
15+
name: String.t() | nil,
16+
frame_rate: number() | nil,
17+
resolution: {integer(), integer()} | nil,
18+
codecs: String.t() | nil,
19+
bandwidth: integer() | nil,
20+
uri: String.t() | nil
21+
}
22+
23+
@doc """
24+
Starts the ExHLS client with the given URL and demuxing engine implementation.
25+
26+
By default, it uses `DemuxingEngine.MPEGTS` as the demuxing engine implementation.
27+
"""
28+
29+
@spec new(String.t()) :: client()
30+
def new(url) do
31+
%{status: 200, body: request_body} = Req.get!(url)
32+
multivariant_playlist = request_body |> ExM3U8.deserialize_multivariant_playlist!([])
33+
34+
%{
35+
media_playlist: nil,
36+
media_base_url: nil,
37+
multivariant_playlist: multivariant_playlist,
38+
base_url: Path.dirname(url),
39+
video_chunks: [],
40+
demuxing_engine_impl: nil,
41+
demuxing_engine: nil,
42+
queues: %{audio: Qex.new(), video: Qex.new()},
43+
timestamp_offsets: %{audio: nil, video: nil},
44+
last_timestamps: %{audio: nil, video: nil}
45+
}
46+
end
47+
48+
defp ensure_media_playlist_loaded(%{media_playlist: nil} = client) do
49+
get_variants(client)
50+
|> Map.to_list()
51+
|> case do
52+
[] ->
53+
read_media_playlist_without_variant(client)
54+
55+
[{variant_id, _variant}] ->
56+
choose_variant(client, variant_id)
57+
58+
_many_variants ->
59+
raise """
60+
If there are available variants, you have to choose one of them using \
61+
`choose_variant/2` function before reading chunks. Available variants:
62+
#{get_variants(client) |> inspect(limit: :infinity, pretty: true)}
63+
"""
64+
end
65+
end
66+
67+
defp ensure_media_playlist_loaded(client), do: client
68+
69+
defp read_media_playlist_without_variant(%{media_playlist: nil} = client) do
70+
media_playlist =
71+
client.base_url
72+
|> Path.join("output.m3u8")
73+
|> Req.get!()
74+
75+
deserialized_media_playlist =
76+
ExM3U8.deserialize_media_playlist!(media_playlist.body, [])
77+
78+
%{
79+
client
80+
| media_playlist: deserialized_media_playlist,
81+
media_base_url: client.base_url
82+
}
83+
end
84+
85+
@spec get_variants(client()) :: %{optional(integer()) => variant_description()}
86+
def get_variants(client) do
87+
client.multivariant_playlist.items
88+
|> Enum.filter(&match?(%ExM3U8.Tags.Stream{}, &1))
89+
|> Enum.with_index(fn variant, index ->
90+
variant_description =
91+
variant
92+
|> Map.take([:name, :frame_rate, :resolution, :codecs, :bandwidth, :uri])
93+
|> Map.put(:id, index)
94+
95+
{index, variant_description}
96+
end)
97+
|> Map.new()
98+
end
99+
100+
@spec choose_variant(client(), String.t()) :: client()
101+
def choose_variant(client, variant_id) do
102+
chosen_variant =
103+
get_variants(client)
104+
|> Map.fetch!(variant_id)
105+
106+
media_playlist = Path.join(client.base_url, chosen_variant.uri) |> Req.get!()
107+
108+
deserialized_media_playlist =
109+
ExM3U8.deserialize_media_playlist!(media_playlist.body, [])
110+
111+
media_base_url = Path.join(client.base_url, Path.dirname(chosen_variant.uri))
112+
113+
%{
114+
client
115+
| media_playlist: deserialized_media_playlist,
116+
media_base_url: media_base_url
117+
}
118+
end
119+
120+
@spec read_video_chunk(client()) :: chunk() | :end_of_stream
121+
def read_video_chunk(client), do: pop_queue_or_do_read_chunk(client, :video)
122+
123+
@spec read_audio_chunk(client()) :: chunk() | :end_of_stream
124+
def read_audio_chunk(client), do: pop_queue_or_do_read_chunk(client, :audio)
125+
126+
defp pop_queue_or_do_read_chunk(client, media_type) do
127+
client.queues[media_type]
128+
|> Qex.pop()
129+
|> case do
130+
{{:value, chunk}, queue} ->
131+
client = client |> put_in([:queues, media_type], queue)
132+
{chunk, client}
133+
134+
{:empty, _queue} ->
135+
do_read_chunk(client, media_type)
136+
end
137+
end
138+
139+
@spec do_read_chunk(client(), :audio | :video) :: {chunk() | :end_of_stream, client()}
140+
defp do_read_chunk(client, media_type) do
141+
client = ensure_media_playlist_loaded(client)
142+
143+
with impl when impl != nil <- client.demuxing_engine_impl,
144+
track_id <- get_track_id!(client, media_type),
145+
{:ok, chunk, demuxing_engine} <- client.demuxing_engine |> impl.pop_chunk(track_id) do
146+
client =
147+
with %{timestamp_offsets: %{^media_type => nil}} <- client do
148+
client |> put_in([:timestamp_offsets, media_type], chunk.dts_ms)
149+
end
150+
|> put_in([:last_timestamps, media_type], chunk.dts_ms)
151+
|> put_in([:demuxing_engine], demuxing_engine)
152+
153+
{chunk, client}
154+
else
155+
other ->
156+
case other do
157+
{:error, _reason, demuxing_engine} -> %{client | demuxing_engine: demuxing_engine}
158+
nil -> client
159+
end
160+
|> download_chunk()
161+
|> case do
162+
{:ok, client} -> do_read_chunk(client, media_type)
163+
{:end_of_stream, client} -> {:end_of_stream, client}
164+
end
165+
end
166+
end
167+
168+
@spec get_tracks_info(client()) ::
169+
{:ok, %{optional(integer()) => struct()}, client()}
170+
| {:error, reason :: any(), client()}
171+
def get_tracks_info(client) do
172+
with impl when impl != nil <- client.demuxing_engine_impl,
173+
{:ok, tracks_info} <- client.demuxing_engine |> impl.get_tracks_info() do
174+
{:ok, tracks_info, client}
175+
else
176+
_other ->
177+
media_type = media_type_with_lower_ts(client)
178+
{chunk_or_eos, client} = do_read_chunk(client, media_type)
179+
180+
with %ExHLS.Chunk{} <- chunk_or_eos do
181+
client
182+
|> update_in([:queues, media_type], &Qex.push(&1, chunk_or_eos))
183+
|> get_tracks_info()
184+
else
185+
:end_of_stream ->
186+
{:error, "end of stream reached, but tracks info is not available", client}
187+
end
188+
end
189+
end
190+
191+
defp media_type_with_lower_ts(client) do
192+
cond do
193+
client.timestamp_offsets.audio == nil ->
194+
:audio
195+
196+
client.timestamp_offsets.video == nil ->
197+
:video
198+
199+
true ->
200+
[:audio, :video]
201+
|> Enum.min_by(fn media_type ->
202+
client.last_timestamps[media_type] - client.timestamp_offsets[media_type]
203+
end)
204+
end
205+
end
206+
207+
defp download_chunk(client) do
208+
client = ensure_media_playlist_loaded(client)
209+
210+
case client.media_playlist.timeline do
211+
[%{uri: segment_uri} | rest] ->
212+
client =
213+
with %{demuxing_engine: nil} <- client do
214+
resolve_demuxing_engine(segment_uri, client)
215+
end
216+
217+
request_result =
218+
Path.join(client.media_base_url, segment_uri)
219+
|> Req.get!()
220+
221+
demuxing_engine =
222+
client.demuxing_engine
223+
|> client.demuxing_engine_impl.feed!(request_result.body)
224+
225+
client =
226+
%{
227+
client
228+
| demuxing_engine: demuxing_engine,
229+
media_playlist: %{client.media_playlist | timeline: rest}
230+
}
231+
232+
{:ok, client}
233+
234+
[_other_tag | rest] ->
235+
%{client | media_playlist: %{client.media_playlist | timeline: rest}}
236+
|> download_chunk()
237+
238+
[] ->
239+
client =
240+
client
241+
|> Map.update!(:demuxing_engine, &client.demuxing_engine_impl.end_stream/1)
242+
243+
{:end_of_stream, client}
244+
end
245+
end
246+
247+
defp resolve_demuxing_engine(segment_uri, %{demuxing_engine: nil} = client) do
248+
demuxing_engine_impl =
249+
case Path.extname(segment_uri) do
250+
".ts" -> DemuxingEngine.MPEGTS
251+
".m4s" -> DemuxingEngine.CMAF
252+
".mp4" -> DemuxingEngine.CMAF
253+
_other -> raise "Unsupported segment URI extension: #{segment_uri |> inspect()}"
254+
end
255+
256+
%{
257+
client
258+
| demuxing_engine_impl: demuxing_engine_impl,
259+
demuxing_engine: demuxing_engine_impl.new()
260+
}
261+
end
262+
263+
defp get_track_id!(client, type) when type in [:audio, :video] do
264+
case get_track_id(client, type) do
265+
{:ok, track_id} -> track_id
266+
:error -> raise "Track ID for #{type} not found in client #{inspect(client, pretty: true)}"
267+
end
268+
end
269+
270+
defp get_track_id(client, type) when type in [:audio, :video] do
271+
impl = client.demuxing_engine_impl
272+
273+
with {:ok, tracks_info} <- client.demuxing_engine |> impl.get_tracks_info() do
274+
tracks_info
275+
|> Enum.find_value(:error, fn
276+
{id, %AAC{}} when type == :audio -> {:ok, id}
277+
{id, %RemoteStream{content_format: AAC}} when type == :audio -> {:ok, id}
278+
{id, %H264{}} when type == :video -> {:ok, id}
279+
{id, %RemoteStream{content_format: H264}} when type == :video -> {:ok, id}
280+
_different_type -> false
281+
end)
282+
else
283+
{:error, _reason} -> :error
284+
end
285+
end
286+
end

lib/ex_hls/demuxing_engine.ex

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
defmodule ExHLS.DemuxingEngine do
2+
@moduledoc false
3+
4+
@type t :: any()
5+
6+
@callback new() :: t()
7+
8+
@callback feed!(t(), binary()) :: t()
9+
10+
@callback get_tracks_info(t()) :: {:ok, %{optional(integer()) => struct()}} | {:error, any()}
11+
12+
@callback pop_chunk(t(), track_id :: any()) ::
13+
{:ok, ExHLS.Chunk.t(), t()} | {:error, :empty_track_data, t()}
14+
15+
@callback end_stream(t()) :: t()
16+
end

0 commit comments

Comments
 (0)