Skip to content

Commit 703b405

Browse files
authored
Add how_much_to_skip_ms option (#5)
* Add `how_much_to_skip_ms` option * Add `Client.get_how_much_truly_skipped_ms/1` * Fix timestamp calculation in MPEG-TS demuxing engine
1 parent 58a7ee0 commit 703b405

File tree

7 files changed

+142
-25
lines changed

7 files changed

+142
-25
lines changed

lib/ex_hls/client.ex

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ defmodule ExHLS.Client do
2121
:queues,
2222
:timestamp_offsets,
2323
:last_timestamps,
24+
:how_much_to_skip_ms,
25+
:how_much_truly_skipped_ms,
2426
:end_stream_executed?
2527
]
2628

@@ -44,8 +46,8 @@ defmodule ExHLS.Client do
4446
By default, it uses `DemuxingEngine.MPEGTS` as the demuxing engine implementation.
4547
"""
4648

47-
@spec new(String.t()) :: client()
48-
def new(url) do
49+
@spec new(String.t(), non_neg_integer()) :: client()
50+
def new(url, how_much_to_skip_ms \\ 0) do
4951
%{status: 200, body: request_body} = Req.get!(url)
5052
multivariant_playlist = request_body |> ExM3U8.deserialize_multivariant_playlist!([])
5153

@@ -61,6 +63,8 @@ defmodule ExHLS.Client do
6163
queues: %{audio: Qex.new(), video: Qex.new()},
6264
timestamp_offsets: %{audio: nil, video: nil},
6365
last_timestamps: %{audio: nil, video: nil},
66+
how_much_to_skip_ms: how_much_to_skip_ms,
67+
how_much_truly_skipped_ms: nil,
6468
end_stream_executed?: false
6569
}
6670
end
@@ -87,14 +91,16 @@ defmodule ExHLS.Client do
8791
defp ensure_media_playlist_loaded(client), do: client
8892

8993
defp read_media_playlist_without_variant(%__MODULE__{media_playlist: nil} = client) do
90-
deserialized_media_playlist =
94+
{deserialized_media_playlist, how_much_truly_skipped_ms} =
9195
client.root_playlist_string
9296
|> ExM3U8.deserialize_media_playlist!([])
97+
|> skip_to_how_much_to_skip(client.how_much_to_skip_ms)
9398

9499
%{
95100
client
96101
| media_playlist: deserialized_media_playlist,
97-
media_base_url: client.base_url
102+
media_base_url: client.base_url,
103+
how_much_truly_skipped_ms: how_much_truly_skipped_ms
98104
}
99105
end
100106

@@ -121,15 +127,17 @@ defmodule ExHLS.Client do
121127

122128
media_playlist = Path.join(client.base_url, chosen_variant.uri) |> Req.get!()
123129

124-
deserialized_media_playlist =
130+
{deserialized_media_playlist, how_much_truly_skipped_ms} =
125131
ExM3U8.deserialize_media_playlist!(media_playlist.body, [])
132+
|> skip_to_how_much_to_skip(client.how_much_to_skip_ms)
126133

127134
media_base_url = Path.join(client.base_url, Path.dirname(chosen_variant.uri))
128135

129136
%{
130137
client
131138
| media_playlist: deserialized_media_playlist,
132-
media_base_url: media_base_url
139+
media_base_url: media_base_url,
140+
how_much_truly_skipped_ms: how_much_truly_skipped_ms
133141
}
134142
end
135143

@@ -296,7 +304,8 @@ defmodule ExHLS.Client do
296304
%{
297305
client
298306
| demuxing_engine_impl: demuxing_engine_impl,
299-
demuxing_engine: demuxing_engine_impl.new()
307+
# how_much_truly_skipped_ms is the timestamps offset of the first non-discarded sample
308+
demuxing_engine: get_how_much_truly_skipped_ms(client) |> demuxing_engine_impl.new()
300309
}
301310
end
302311

@@ -316,4 +325,48 @@ defmodule ExHLS.Client do
316325
{:error, _reason} -> :error
317326
end
318327
end
328+
329+
defp skip_to_how_much_to_skip(media_playlist, how_much_to_skip_ms) do
330+
{discarded, timeline_with_cumulative_duration} =
331+
Enum.map_reduce(
332+
media_playlist.timeline,
333+
0,
334+
fn
335+
%ExM3U8.Tags.Segment{} = chunk, cumulative_duration_ms ->
336+
chunk_end_ms = cumulative_duration_ms + 1000 * chunk.duration
337+
{{chunk, chunk_end_ms}, chunk_end_ms}
338+
339+
other_tag, cumulative_duration_ms ->
340+
{{other_tag, cumulative_duration_ms}, cumulative_duration_ms}
341+
end
342+
)
343+
|> elem(0)
344+
|> Enum.split_with(fn
345+
{%ExM3U8.Tags.Segment{}, chunk_end_ms} -> chunk_end_ms < how_much_to_skip_ms
346+
_other -> false
347+
end)
348+
349+
how_much_truly_skipped_ms =
350+
case List.last(discarded) do
351+
nil -> 0
352+
{_discarded_timeline, cumulative_duration_ms} -> cumulative_duration_ms
353+
end
354+
|> round()
355+
356+
timeline = Enum.map(timeline_with_cumulative_duration, &elem(&1, 0))
357+
358+
{put_in(media_playlist.timeline, timeline), how_much_truly_skipped_ms}
359+
end
360+
361+
@spec get_how_much_truly_skipped_ms(client()) :: non_neg_integer() | no_return()
362+
def get_how_much_truly_skipped_ms(%{how_much_truly_skipped_ms: nil}) do
363+
raise """
364+
`how_much_truly_skipped_ms` is not yet available.
365+
Please call `read_audio_chunk/1`, `read_video_chunk/1`
366+
or `choose_variant/2` before calling this function.
367+
"""
368+
end
369+
370+
def get_how_much_truly_skipped_ms(%{how_much_truly_skipped_ms: how_much_truly_skipped_ms}),
371+
do: how_much_truly_skipped_ms
319372
end

lib/ex_hls/demuxing_engine.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ defmodule ExHLS.DemuxingEngine do
33

44
@type t :: any()
55

6-
@callback new() :: t()
6+
@callback new(base_timestamps_ms :: non_neg_integer()) :: t()
77

88
@callback feed!(t(), binary()) :: t()
99

lib/ex_hls/demuxing_engine/cmaf.ex

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,22 @@ defmodule ExHLS.DemuxingEngine.CMAF do
33
@behaviour ExHLS.DemuxingEngine
44

55
alias Membrane.MP4.Demuxer.CMAF
6+
alias Membrane.MP4.Demuxer.Sample
67

7-
@enforce_keys [:demuxer]
8+
@enforce_keys [:demuxer, :timestamps_offset_ms]
89
defstruct @enforce_keys ++ [tracks_to_chunks: %{}]
910

1011
@type t :: %__MODULE__{
1112
demuxer: CMAF.Engine.t(),
12-
tracks_to_chunks: map()
13+
tracks_to_chunks: map(),
14+
timestamps_offset_ms: non_neg_integer()
1315
}
1416

1517
@impl true
16-
def new() do
18+
def new(timestamps_offset_ms) do
1719
%__MODULE__{
18-
demuxer: CMAF.Engine.new()
20+
demuxer: CMAF.Engine.new(),
21+
timestamps_offset_ms: timestamps_offset_ms
1922
}
2023
end
2124

@@ -30,7 +33,7 @@ defmodule ExHLS.DemuxingEngine.CMAF do
3033
chunks
3134
|> Enum.group_by(
3235
fn chunk -> chunk.track_id end,
33-
fn %CMAF.Engine.Sample{} = chunk ->
36+
fn %Sample{} = chunk ->
3437
%ExHLS.Chunk{
3538
payload: chunk.payload,
3639
pts_ms: chunk.pts,
@@ -66,6 +69,7 @@ defmodule ExHLS.DemuxingEngine.CMAF do
6669
with qex when qex != nil <- demuxing_engine.tracks_to_chunks[track_id],
6770
{{:value, chunk}, popped_qex} <- Qex.pop(qex) do
6871
demuxing_engine = put_in(demuxing_engine.tracks_to_chunks[track_id], popped_qex)
72+
chunk = normalize_timestamps(chunk, demuxing_engine.timestamps_offset_ms)
6973
{:ok, chunk, demuxing_engine}
7074
else
7175
nil -> {:error, :unknown_track, demuxing_engine}
@@ -75,4 +79,12 @@ defmodule ExHLS.DemuxingEngine.CMAF do
7579

7680
@impl true
7781
def end_stream(demuxing_engine), do: demuxing_engine
82+
83+
defp normalize_timestamps(chunk, timestamps_offset_ms) do
84+
%{
85+
chunk
86+
| pts_ms: round(chunk.pts_ms + timestamps_offset_ms),
87+
dts_ms: round(chunk.dts_ms + timestamps_offset_ms)
88+
}
89+
end
7890
end

lib/ex_hls/demuxing_engine/mpeg_ts.ex

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
1414
}
1515

1616
@impl true
17-
def new() do
17+
# We can ignore handling timestamps_offset_ms since timestamps in the
18+
# MPEG-TS container already include global timestamps.
19+
def new(_timestamps_offset_ms) do
1820
demuxer = Demuxer.new()
1921

2022
# we need to explicitly override that `waiting_random_access_indicator` as otherwise Demuxer
@@ -44,10 +46,19 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
4446
[{id, %RemoteStream{content_format: H264}}]
4547

4648
{id, unsupported_stream_info} ->
47-
Logger.warning("""
48-
#{__MODULE__ |> inspect()}: dropping unsupported stream with id #{id |> inspect()}.\
49-
Stream info: #{unsupported_stream_info |> inspect(pretty: true)}
50-
""")
49+
unsupported_streams = Process.get(:unsupported_streams) || MapSet.new()
50+
51+
if unsupported_stream_info.stream_type not in unsupported_streams do
52+
Logger.debug("""
53+
#{__MODULE__ |> inspect()}: dropping unsupported stream with id #{id |> inspect()}.\
54+
Stream info: #{unsupported_stream_info |> inspect(pretty: true)}
55+
""")
56+
57+
unsupported_streams =
58+
MapSet.put(unsupported_streams, unsupported_stream_info.stream_type)
59+
60+
Process.put(:unsupported_streams, unsupported_streams)
61+
end
5162

5263
[]
5364
end)
@@ -80,8 +91,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
8091
end
8192
end
8293

83-
@mpegts_clock_rate 90
84-
defp packet_ts_to_millis(ts), do: div(ts, @mpegts_clock_rate)
94+
# value returned by Demuxer is represented in nanoseconds
95+
defp packet_ts_to_millis(ts), do: div(ts, 1_000_000)
8596

8697
@impl true
8798
def end_stream(%__MODULE__{} = demuxing_engine) do

mix.exs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,9 @@ defmodule ExHLS.Mixfile do
4040
{:ex_m3u8, "~> 0.15.3"},
4141
{:req, "~> 0.5.10"},
4242
{:qex, "~> 0.5.1"},
43-
{:membrane_mp4_plugin, "~> 0.35.3"},
43+
{:membrane_mp4_plugin, "~> 0.36.0"},
4444
{:membrane_h26x_plugin, "~> 0.10.2"},
45-
# {:mpeg_ts, github: "kim-company/kim_mpeg_ts"},
46-
{:mpeg_ts, github: "membraneframework-labs/kim_mpeg_ts", branch: "backport-v1.0.3"},
45+
{:mpeg_ts, github: "kim-company/kim_mpeg_ts"},
4746
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
4847
{:dialyxir, ">= 0.0.0", only: :dev, runtime: false},
4948
{:credo, ">= 0.0.0", only: :dev, runtime: false}

mix.lock

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
"membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"},
2727
"membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.5", "e9fa1ee9cda944259c4d2728c8b279bfe0152a3a6c1af187b07fa8411ca4e25e", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "dd0287a6b6223e47bba30a8952d6ec53db35f6a3e33203b7ad786e995711f098"},
2828
"membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"},
29-
"membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.35.3", "80228f4332eeef4fce4d90184a82bd5869d184f78438419660da7dc91871a238", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "c6d8b20e49540329f246e9a9c69adae330d424802fdfa1e6485d76a5257e6169"},
29+
"membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.36.0", "ef4fc6fc72cc439974f2ae0ce579f3dbf8674f02ab00f7f524a6600b786a9ca8", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "84f55a42c69cb557b73d6272f958812f607abaaa6a3473f301d22393f2a62808"},
3030
"membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"},
3131
"membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"},
3232
"mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"},
3333
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
34-
"mpeg_ts": {:git, "https://github.com/membraneframework-labs/kim_mpeg_ts.git", "8c036fca6558a4339033a5a8697ebf147728f36b", [branch: "backport-v1.0.3"]},
34+
"mpeg_ts": {:git, "https://github.com/kim-company/kim_mpeg_ts.git", "ebde017bf639a9d57d068f029f2e43562ae78746", []},
3535
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
3636
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
3737
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},

test/client_test.exs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,46 @@ defmodule Client.Test do
146146
72, 183, 150, 44, 216, 32, 217, 35, 238, 239, 120, 50, 54, 52, 32, 45, 32, 99, 111,
147147
114, 101, 32, 49, 54, 52, 32, 114>> <> _rest = video_chunk.payload
148148
end
149+
150+
test "(MPEGTS) stream with how_much_to_skip_ms" do
151+
how_much_to_skip_ms = 44_000
152+
client = Client.new(@mpegts_url, how_much_to_skip_ms)
153+
154+
variant_720 =
155+
Client.get_variants(client)
156+
|> Map.values()
157+
|> Enum.find(&(&1.resolution == {1280, 720}))
158+
159+
assert variant_720 != nil
160+
161+
client = client |> Client.choose_variant(variant_720.id)
162+
{:ok, tracks_info, client} = Client.get_tracks_info(client)
163+
164+
tracks_info = tracks_info |> Map.values()
165+
166+
assert tracks_info |> length() == 2
167+
assert %RemoteStream{content_format: AAC, type: :bytestream} in tracks_info
168+
assert %RemoteStream{content_format: H264, type: :bytestream} in tracks_info
169+
170+
{video_chunk, client} = client |> Client.read_video_chunk()
171+
172+
# segments in the fixture are 10s long and
173+
# the timestamps offset is 10s, so the first
174+
# video pts after skipping initial 44 seconds should be div(10+44, 10) = 50s
175+
assert %{pts_ms: 50_033, dts_ms: 50_000} = video_chunk
176+
assert byte_size(video_chunk.payload) == 135_298
177+
178+
assert <<0, 0, 0, 1, 9, 240, 0, 0, 0, 1, 103, 100, 0, 31, 172, 217, 128, 80, 5, 187, 1, 16, 0,
179+
0, 3, 0, 16, 0, 0, 7, 128, 241, 131, 25, 160, 0, 0, 0, 1, 104, 233, 121, 203, 34,
180+
192, 0, 0, 1, 101, 136>> <> _rest = video_chunk.payload
181+
182+
{audio_chunk, _client} = Client.read_audio_chunk(client)
183+
184+
assert %{pts_ms: 50_018, dts_ms: 50_018} = audio_chunk
185+
assert byte_size(audio_chunk.payload) == 6020
186+
187+
assert <<255, 241, 80, 128, 47, 63, 252, 33, 10, 204, 43, 253, 251, 213, 30, 152, 129, 48, 80,
188+
38, 22, 18, 5, 130, 129, 113, 34, 92, 36, 20, 25, 9, 2, 193, 64, 144, 68, 36, 17, 75,
189+
215, 198, 77, 184, 229, 170, 157, 115, 169, 223>> <> _rest = audio_chunk.payload
190+
end
149191
end

0 commit comments

Comments
 (0)