Skip to content

Commit e5a7bf7

Browse files
varsillmat-hek
andauthored
Adjust to kim_mpeg_ts v3. Release v0.2.0 (#20)
Co-authored-by: Mateusz Front <[email protected]>
1 parent 1beb189 commit e5a7bf7

File tree

5 files changed

+92
-99
lines changed

5 files changed

+92
-99
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ The package can be installed by adding `ex_hls` to your list of dependencies in
1515
```elixir
1616
def deps do
1717
[
18-
{:ex_hls, "~> 0.1.5"}
18+
{:ex_hls, "~> 0.2.0"}
1919
]
2020
end
2121
```
@@ -55,7 +55,7 @@ Now you can get the Elixir stream containing media chunks:
5555
```elixir
5656
stream = ExHLS.Client.generate_stream(client)
5757
Enum.take(stream, 5)
58-
# Returns:
58+
# Returns:
5959
# [
6060
# %ExHLS.Chunk{
6161
# payload: <<220, 0, 76, 97, 118, 99, 54, 49, 46, 51, 46, 49, 48, 48, 0, 66,

lib/ex_hls/demuxing_engine/mpeg_ts.ex

Lines changed: 59 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
99
alias Membrane.{AAC, H264, RemoteStream}
1010
alias MPEG.TS.Demuxer
1111

12-
@enforce_keys [:demuxer, :last_tden_tag]
13-
defstruct @enforce_keys ++ [track_timestamps_data: %{}]
12+
@enforce_keys [:demuxer]
13+
defstruct @enforce_keys ++ [track_timestamps_data: %{}, last_tden_tag: nil, packets_map: %{}]
1414

1515
# using it a boundary expressed in nanoseconds, instead of the usual 90kHz clock ticks,
1616
# generates up to 1/10th of ms error per 26.5 hours of stream which is acceptable in
@@ -19,7 +19,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
1919

2020
@type t :: %__MODULE__{
2121
demuxer: Demuxer.t(),
22-
last_tden_tag: String.t() | nil
22+
last_tden_tag: String.t() | nil,
23+
packets_map: %{(track_id :: non_neg_integer()) => Qex.t(MPEG.TS.Demuxer.Container.t())}
2324
}
2425

2526
@impl true
@@ -28,31 +29,31 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
2829
# different demuxing engines
2930
def new(_timestamp_offset_ms) do
3031
demuxer = Demuxer.new()
31-
32-
# we need to explicitly override that `waiting_random_access_indicator` as otherwise Demuxer
33-
# discards all the input data
34-
# TODO - figure out how to do it properly
35-
demuxer = %{demuxer | waiting_random_access_indicator: false}
36-
37-
%__MODULE__{demuxer: demuxer, last_tden_tag: nil}
32+
%__MODULE__{demuxer: demuxer}
3833
end
3934

4035
@impl true
4136
def feed!(%__MODULE__{} = demuxing_engine, binary) do
42-
demuxing_engine
43-
|> Map.update!(:demuxer, &Demuxer.push_buffer(&1, binary))
37+
{new_packets, demuxer} = Demuxer.demux(demuxing_engine.demuxer, binary)
38+
39+
packets_map =
40+
Enum.reduce(new_packets, demuxing_engine.packets_map, fn new_packet, packets_map ->
41+
Map.update(packets_map, new_packet.pid, Qex.new([new_packet]), &Qex.push(&1, new_packet))
42+
end)
43+
44+
%{demuxing_engine | demuxer: demuxer, packets_map: packets_map}
4445
end
4546

4647
@impl true
4748
def get_tracks_info(%__MODULE__{} = demuxing_engine) do
48-
with %{streams: streams} <- demuxing_engine.demuxer.pmt do
49+
with %{streams: streams} <- demuxing_engine.demuxer do
4950
tracks_info =
5051
streams
5152
|> Enum.flat_map(fn
52-
{id, %{stream_type: :AAC}} ->
53+
{id, %{stream_type: :AAC_ADTS}} ->
5354
[{id, %RemoteStream{content_format: AAC}}]
5455

55-
{id, %{stream_type: :H264}} ->
56+
{id, %{stream_type: :H264_AVC}} ->
5657
[{id, %RemoteStream{content_format: H264}}]
5758

5859
{id, unsupported_stream_info} ->
@@ -80,47 +81,56 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
8081

8182
@impl true
8283
def pop_chunk(%__MODULE__{} = demuxing_engine, track_id) do
83-
with {[packet], demuxer} <- Demuxer.take(demuxing_engine.demuxer, track_id) do
84-
{maybe_tden_tag, demuxer} = maybe_read_tden_tag(demuxer, packet.pts)
85-
tden_tag = maybe_tden_tag || demuxing_engine.last_tden_tag
84+
with {[packet], demuxing_engine} <- take_packets(demuxing_engine, track_id) do
85+
demuxing_engine = maybe_read_tden_tag(demuxing_engine, packet.payload.pts)
8686

8787
{demuxing_engine, packet} =
88-
%{demuxing_engine | demuxer: demuxer, last_tden_tag: tden_tag}
89-
|> handle_possible_timestamps_rollover(track_id, packet)
88+
demuxing_engine |> handle_possible_timestamps_rollover(track_id, packet)
9089

9190
chunk = %ExHLS.Chunk{
92-
payload: packet.data,
93-
pts_ms: packet.pts |> packet_ts_to_millis(),
94-
dts_ms: packet.dts |> packet_ts_to_millis(),
91+
payload: packet.payload.data,
92+
pts_ms: packet.payload.pts |> packet_ts_to_millis(),
93+
dts_ms: packet.payload.dts |> packet_ts_to_millis(),
9594
track_id: track_id,
9695
metadata: %{
97-
discontinuity: packet.discontinuity,
98-
is_aligned: packet.is_aligned,
99-
tden_tag: tden_tag
96+
discontinuity: packet.payload.discontinuity,
97+
is_aligned: packet.payload.is_aligned,
98+
tden_tag: demuxing_engine.last_tden_tag
10099
}
101100
}
102101

103102
{:ok, chunk, demuxing_engine}
104103
else
105-
{[], demuxer} ->
106-
{:error, :empty_track_data, %{demuxing_engine | demuxer: demuxer}}
104+
{[], demuxing_engine} ->
105+
{:error, :empty_track_data, demuxing_engine}
107106
end
108107
end
109108

110-
defp maybe_read_tden_tag(demuxer, packet_pts) do
109+
defp take_packets(demuxing_engine, track_id) do
110+
with {:ok, packets} <- Map.fetch(demuxing_engine.packets_map, track_id),
111+
{{:value, packet}, rest} <- Qex.pop(packets) do
112+
demuxing_engine = put_in(demuxing_engine.packets_map[track_id], rest)
113+
{[packet], demuxing_engine}
114+
else
115+
_other -> {[], demuxing_engine}
116+
end
117+
end
118+
119+
defp maybe_read_tden_tag(demuxing_engine, packet_pts) do
111120
withl no_id3_stream:
112121
{id3_track_id, _stream_description} <-
113-
demuxer.pmt.streams
122+
demuxing_engine.demuxer.streams
114123
|> Enum.find(fn {_pid, stream_description} ->
115124
stream_description.stream_type == :METADATA_IN_PES
116125
end),
117-
no_id3_data: {[id3], demuxer} <- Demuxer.take(demuxer, id3_track_id),
118-
id3_not_in_timerange: true <- id3.pts <= packet_pts do
119-
{parse_tden_tag(id3.data), demuxer}
126+
no_id3_data: {[id3], demuxing_engine} <- take_packets(demuxing_engine, id3_track_id),
127+
id3_not_in_timerange: true <- id3.payload.pts <= packet_pts do
128+
tden_tag = parse_tden_tag(id3.payload.data) || demuxing_engine.last_tden_tag
129+
%{demuxing_engine | last_tden_tag: tden_tag}
120130
else
121-
no_id3_stream: nil -> {nil, demuxer}
122-
no_id3_data: {[], updated_demuxer} -> {nil, updated_demuxer}
123-
id3_not_in_timerange: false -> {nil, demuxer}
131+
no_id3_stream: nil -> demuxing_engine
132+
no_id3_data: {[], updated_demuxing_engine} -> updated_demuxing_engine
133+
id3_not_in_timerange: false -> demuxing_engine
124134
end
125135
end
126136

@@ -143,7 +153,7 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
143153

144154
@impl true
145155
def end_stream(%__MODULE__{} = demuxing_engine) do
146-
demuxer = Demuxer.end_of_stream(demuxing_engine.demuxer)
156+
{_flushed, demuxer} = Demuxer.flush(demuxing_engine.demuxer)
147157
%{demuxing_engine | demuxer: demuxer}
148158
end
149159

@@ -155,32 +165,34 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
155165

156166
rollovers_offset = rollovers_count * @timestamp_range_size_ns
157167

158-
packet =
159-
packet
168+
payload =
169+
packet.payload
160170
|> Map.update!(:pts, &add_offset_if_not_nil(&1, rollovers_offset))
161171
|> Map.update!(:dts, &add_offset_if_not_nil(&1, rollovers_offset))
162172

163-
{demuxing_engine, packet} =
173+
{demuxing_engine, payload} =
164174
with last_ts when last_ts != nil <- last_dts || last_pts,
165-
true <- last_ts > (packet.dts || packet.pts) do
175+
true <- last_ts > (payload.dts || payload.pts) do
166176
demuxing_engine =
167177
demuxing_engine
168178
|> update_in([:track_timestamps_data, track_id, :rollovers_count], &(&1 + 1))
169179

170-
packet =
171-
packet
180+
payload =
181+
payload
172182
|> Map.update!(:pts, &add_offset_if_not_nil(&1, @timestamp_range_size_ns))
173183
|> Map.update!(:dts, &add_offset_if_not_nil(&1, @timestamp_range_size_ns))
174184

175-
{demuxing_engine, packet}
185+
{demuxing_engine, payload}
176186
else
177-
_other -> {demuxing_engine, packet}
187+
_other -> {demuxing_engine, payload}
178188
end
179189

180190
demuxing_engine =
181191
demuxing_engine
182-
|> put_in([:track_timestamps_data, track_id, :last_pts], packet.pts)
183-
|> put_in([:track_timestamps_data, track_id, :last_dts], packet.dts)
192+
|> put_in([:track_timestamps_data, track_id, :last_pts], payload.pts)
193+
|> put_in([:track_timestamps_data, track_id, :last_dts], payload.dts)
194+
195+
packet = %{packet | payload: payload}
184196

185197
{demuxing_engine, packet}
186198
end

mix.exs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule ExHLS.Mixfile do
22
use Mix.Project
33

4-
@version "0.1.5"
4+
@version "0.2.0"
55
@github_url "https://github.com/membraneframework/ex_hls"
66

77
def project do
@@ -43,13 +43,10 @@ defmodule ExHLS.Mixfile do
4343
{:bunch, "~> 1.6"},
4444
{:membrane_mp4_plugin, "~> 0.36.0"},
4545
{:membrane_h26x_plugin, "~> 0.10.2"},
46-
{:mpeg_ts,
47-
github: "membraneframework-labs/kim_mpeg_ts",
48-
branch: "varsill/fix_pes_optional_header_resolving"},
46+
{:mpeg_ts, "~> 3.3.5"},
4947
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
5048
{:dialyxir, ">= 0.0.0", only: :dev, runtime: false},
51-
{:credo, ">= 0.0.0", only: :dev, runtime: false},
52-
{:mock, "~> 0.3.0", only: :test}
49+
{:credo, ">= 0.0.0", only: :dev, runtime: false}
5350
]
5451
end
5552

mix.lock

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
"makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"},
1919
"makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"},
2020
"makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"},
21-
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
2221
"membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"},
2322
"membrane_cmaf_format": {:hex, :membrane_cmaf_format, "0.7.1", "9ea858faefdcb181cdfa8001be827c35c5f854e9809ad57d7062cff1f0f703fd", [:mix], [], "hexpm", "3c7b4ed2a986e27f6f336d2f19e9442cb31d93b3142fc024c019572faca54a73"},
2423
"membrane_core": {:hex, :membrane_core, "1.2.4", "3f9fc78cef29b69acadd4f959c8ec23cbb1544c26c8e8474589b143ada9a0da2", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ec7a77b7ab457267c0243338383365f6ef5ace2686ddc129939e502a58eba546"},
@@ -32,8 +31,7 @@
3231
"membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"},
3332
"mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"},
3433
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
35-
"mpeg_ts": {:git, "https://github.com/membraneframework-labs/kim_mpeg_ts.git", "c8c770e0e7714c72b3faa7f20088fdbd76f5bade", [branch: "varsill/fix_pes_optional_header_resolving"]},
36-
"mock": {:hex, :mock, "0.3.9", "10e44ad1f5962480c5c9b9fa779c6c63de9bd31997c8e04a853ec990a9d841af", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "9e1b244c4ca2551bb17bb8415eed89e40ee1308e0fbaed0a4fdfe3ec8a4adbd3"},
34+
"mpeg_ts": {:hex, :mpeg_ts, "3.3.5", "b0fd6714753da5ad51e686ddbdb4cf4a4480f43dd6df311c7e2da4359df1960f", [:mix], [], "hexpm", "503bf4f557057efb35433893ffdb8da2be4643b2be161e8721a0daec9825b600"},
3735
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
3836
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
3937
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
Lines changed: 27 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
defmodule ExHLS.DemuxingEngine.MPEGTS.Test do
22
use ExUnit.Case, async: false
3-
import Mock
4-
5-
alias MPEG.TS.Demuxer
63

74
test "handling timestamp rollovers" do
85
timestamp_range = (2 ** 33 * 1_000_000_000) |> div(90_000)
@@ -16,52 +13,41 @@ defmodule ExHLS.DemuxingEngine.MPEGTS.Test do
1613

1714
%{
1815
og_timestamp: og_timestamp,
19-
pts: rolled_timestamp,
20-
dts: rolled_timestamp,
21-
data: <<>>,
22-
discontinuity: false,
23-
is_aligned: true
16+
payload: %{
17+
pts: rolled_timestamp,
18+
dts: rolled_timestamp,
19+
data: <<>>,
20+
discontinuity: false,
21+
is_aligned: true
22+
}
2423
}
2524
end)
2625

27-
demuxer = %{
28-
waiting_random_access_indicator: nil,
29-
packet_buffers: %{1 => packets, 2 => packets},
30-
pmt: %{streams: %{}}
31-
}
32-
33-
new = fn -> demuxer end
34-
35-
take = fn demuxer, track_id ->
36-
demuxer
37-
|> get_and_update_in(
38-
[:packet_buffers, track_id],
39-
fn [head | tail] -> {[head], tail} end
40-
)
41-
end
26+
demuxing_engine = ExHLS.DemuxingEngine.MPEGTS.new(0)
4227

43-
with_mock Demuxer, new: new, take: take do
44-
demuxing_engine = ExHLS.DemuxingEngine.MPEGTS.new(0)
45-
46-
[1, 2]
47-
|> Enum.reduce(demuxing_engine, fn track_id, demuxing_engine ->
48-
{chunks, demuxing_engine} =
49-
1..200
50-
|> Enum.map_reduce(demuxing_engine, fn _i, demuxing_engine ->
51-
{:ok, chunk, demuxing_engine} =
52-
ExHLS.DemuxingEngine.MPEGTS.pop_chunk(demuxing_engine, track_id)
28+
demuxing_engine = %{
29+
demuxing_engine
30+
| packets_map: %{1 => Qex.new(packets), 2 => Qex.new(packets)}
31+
}
5332

54-
{chunk, demuxing_engine}
55-
end)
33+
[1, 2]
34+
|> Enum.reduce(demuxing_engine, fn track_id, demuxing_engine ->
35+
{chunks, demuxing_engine} =
36+
1..200
37+
|> Enum.map_reduce(demuxing_engine, fn _i, demuxing_engine ->
38+
{:ok, chunk, demuxing_engine} =
39+
ExHLS.DemuxingEngine.MPEGTS.pop_chunk(demuxing_engine, track_id)
5640

57-
Enum.zip(chunks, packets)
58-
|> Enum.each(fn {chunk, packet} ->
59-
assert chunk.pts_ms == div(packet.og_timestamp, 1_000_000)
60-
assert chunk.dts_ms == div(packet.og_timestamp, 1_000_000)
41+
{chunk, demuxing_engine}
6142
end)
6243

63-
demuxing_engine
44+
Enum.zip(chunks, packets)
45+
|> Enum.each(fn {chunk, packet} ->
46+
assert chunk.pts_ms == div(packet.og_timestamp, 1_000_000)
47+
assert chunk.dts_ms == div(packet.og_timestamp, 1_000_000)
6448
end)
65-
end
49+
50+
demuxing_engine
51+
end)
6652
end
6753
end

0 commit comments

Comments
 (0)