Skip to content

Commit 58a7ee0

Browse files
authored
Merge pull request #4 from membraneframework-labs/fix-end-stream-bug
Fix end stream bug
2 parents 88847ba + 29e06bd commit 58a7ee0

File tree

2 files changed

+48
-20
lines changed

2 files changed

+48
-20
lines changed

lib/ex_hls/client.ex

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,29 @@ defmodule ExHLS.Client do
44
It allows reading chunks from the stream, choosing variants, and managing media playlists.
55
"""
66

7+
use Bunch.Access
8+
79
alias ExHLS.DemuxingEngine
810
alias Membrane.{AAC, H264, RemoteStream}
911

10-
@opaque client :: map()
11-
@type chunk :: any()
12+
@enforce_keys [
13+
:media_playlist,
14+
:media_base_url,
15+
:multivariant_playlist,
16+
:root_playlist_string,
17+
:base_url,
18+
:demuxing_engine_impl,
19+
:demuxing_engine,
20+
:media_types,
21+
:queues,
22+
:timestamp_offsets,
23+
:last_timestamps,
24+
:end_stream_executed?
25+
]
26+
27+
defstruct @enforce_keys
28+
29+
@opaque client :: %__MODULE__{}
1230

1331
@type variant_description :: %{
1432
id: integer(),
@@ -31,23 +49,23 @@ defmodule ExHLS.Client do
3149
%{status: 200, body: request_body} = Req.get!(url)
3250
multivariant_playlist = request_body |> ExM3U8.deserialize_multivariant_playlist!([])
3351

34-
%{
52+
%__MODULE__{
3553
media_playlist: nil,
3654
media_base_url: nil,
3755
multivariant_playlist: multivariant_playlist,
3856
root_playlist_string: request_body,
3957
base_url: Path.dirname(url),
40-
video_chunks: [],
4158
demuxing_engine_impl: nil,
4259
demuxing_engine: nil,
4360
media_types: [:audio, :video],
4461
queues: %{audio: Qex.new(), video: Qex.new()},
4562
timestamp_offsets: %{audio: nil, video: nil},
46-
last_timestamps: %{audio: nil, video: nil}
63+
last_timestamps: %{audio: nil, video: nil},
64+
end_stream_executed?: false
4765
}
4866
end
4967

50-
defp ensure_media_playlist_loaded(%{media_playlist: nil} = client) do
68+
defp ensure_media_playlist_loaded(%__MODULE__{media_playlist: nil} = client) do
5169
get_variants(client)
5270
|> Map.to_list()
5371
|> case do
@@ -68,7 +86,7 @@ defmodule ExHLS.Client do
6886

6987
defp ensure_media_playlist_loaded(client), do: client
7088

71-
defp read_media_playlist_without_variant(%{media_playlist: nil} = client) do
89+
defp read_media_playlist_without_variant(%__MODULE__{media_playlist: nil} = client) do
7290
deserialized_media_playlist =
7391
client.root_playlist_string
7492
|> ExM3U8.deserialize_media_playlist!([])
@@ -81,7 +99,7 @@ defmodule ExHLS.Client do
8199
end
82100

83101
@spec get_variants(client()) :: %{optional(integer()) => variant_description()}
84-
def get_variants(client) do
102+
def get_variants(%__MODULE__{} = client) do
85103
client.multivariant_playlist.items
86104
|> Enum.filter(&match?(%ExM3U8.Tags.Stream{}, &1))
87105
|> Enum.with_index(fn variant, index ->
@@ -96,7 +114,7 @@ defmodule ExHLS.Client do
96114
end
97115

98116
@spec choose_variant(client(), String.t()) :: client()
99-
def choose_variant(client, variant_id) do
117+
def choose_variant(%__MODULE__{} = client, variant_id) do
100118
chosen_variant =
101119
get_variants(client)
102120
|> Map.fetch!(variant_id)
@@ -115,11 +133,11 @@ defmodule ExHLS.Client do
115133
}
116134
end
117135

118-
@spec read_video_chunk(client()) :: chunk() | :end_of_stream
119-
def read_video_chunk(client), do: pop_queue_or_do_read_chunk(client, :video)
136+
@spec read_video_chunk(client()) :: {ExHLS.Chunk.t() | :end_of_stream, client()}
137+
def read_video_chunk(%__MODULE__{} = client), do: pop_queue_or_do_read_chunk(client, :video)
120138

121-
@spec read_audio_chunk(client()) :: chunk() | :end_of_stream
122-
def read_audio_chunk(client), do: pop_queue_or_do_read_chunk(client, :audio)
139+
@spec read_audio_chunk(client()) :: {ExHLS.Chunk.t() | :end_of_stream, client()}
140+
def read_audio_chunk(%__MODULE__{} = client), do: pop_queue_or_do_read_chunk(client, :audio)
123141

124142
defp pop_queue_or_do_read_chunk(client, media_type) do
125143
client.queues[media_type]
@@ -135,7 +153,7 @@ defmodule ExHLS.Client do
135153
end
136154

137155
@spec do_read_chunk(client(), :audio | :video) ::
138-
{chunk() | :end_of_stream | {:error, atom()}, client()}
156+
{ExHLS.Chunk.t() | :end_of_stream | {:error, atom()}, client()}
139157
defp do_read_chunk(client, media_type) do
140158
client = ensure_media_playlist_loaded(client)
141159

@@ -144,7 +162,8 @@ defmodule ExHLS.Client do
144162
{:ok, chunk, demuxing_engine} <- client.demuxing_engine |> impl.pop_chunk(track_id) do
145163
client =
146164
with %{timestamp_offsets: %{^media_type => nil}} <- client do
147-
client |> put_in([:timestamp_offsets, media_type], chunk.dts_ms)
165+
client
166+
|> put_in([:timestamp_offsets, media_type], chunk.dts_ms)
148167
end
149168
|> put_in([:last_timestamps, media_type], chunk.dts_ms)
150169
|> put_in([:demuxing_engine], demuxing_engine)
@@ -164,16 +183,26 @@ defmodule ExHLS.Client do
164183
end
165184
|> download_chunk()
166185
|> case do
167-
{:ok, client} -> do_read_chunk(client, media_type)
168-
{:end_of_stream, client} -> {:end_of_stream, client}
186+
{:ok, client} ->
187+
do_read_chunk(client, media_type)
188+
189+
{:error, :no_more_segments, client} when not client.end_stream_executed? ->
190+
# after calling `end_stream/1` there is a chance that `pop_chunk/2` will flush
191+
# some remaining data
192+
%{client | end_stream_executed?: true}
193+
|> Map.update!(:demuxing_engine, &client.demuxing_engine_impl.end_stream/1)
194+
|> do_read_chunk(media_type)
195+
196+
{:error, :no_more_segments, client} when client.end_stream_executed? ->
197+
{:end_of_stream, client}
169198
end
170199
end
171200
end
172201

173202
@spec get_tracks_info(client()) ::
174203
{:ok, %{optional(integer()) => struct()}, client()}
175204
| {:error, reason :: any(), client()}
176-
def get_tracks_info(client) do
205+
def get_tracks_info(%__MODULE__{} = client) do
177206
with impl when impl != nil <- client.demuxing_engine_impl,
178207
{:ok, tracks_info} <- client.demuxing_engine |> impl.get_tracks_info() do
179208
{:ok, tracks_info, client}
@@ -251,7 +280,7 @@ defmodule ExHLS.Client do
251280
client
252281
|> Map.update!(:demuxing_engine, &client.demuxing_engine_impl.end_stream/1)
253282

254-
{:end_of_stream, client}
283+
{:error, :no_more_segments, client}
255284
end
256285
end
257286

lib/ex_hls/demuxing_engine/mpeg_ts.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
8686
@impl true
8787
def end_stream(%__MODULE__{} = demuxing_engine) do
8888
demuxer = Demuxer.end_of_stream(demuxing_engine.demuxer)
89-
9089
%{demuxing_engine | demuxer: demuxer}
9190
end
9291
end

0 commit comments

Comments
 (0)