@@ -212,12 +212,12 @@ defmodule Boombox do
212212 case opts do
213213 % { input: { :stream , _stream_opts } } ->
214214 procs = Pipeline . start_pipeline ( opts )
215- source = Pipeline . await_source_ready ( )
215+ source = await_source_ready ( )
216216 consume_stream ( stream , source , procs )
217217
218218 % { output: { :stream , _stream_opts } } ->
219219 procs = Pipeline . start_pipeline ( opts )
220- sink = Pipeline . await_sink_ready ( )
220+ sink = await_sink_ready ( )
221221 produce_stream ( sink , procs )
222222
223223 % { input: { :writer , _writer_opts } } ->
@@ -237,7 +237,7 @@ defmodule Boombox do
237237 opts ->
238238 opts
239239 |> Pipeline . start_pipeline ( )
240- |> Pipeline . await_pipeline ( )
240+ |> await_pipeline ( )
241241 end
242242 end
243243
@@ -277,7 +277,7 @@ defmodule Boombox do
277277 case opts do
278278 % { input: { :stream , _stream_opts } } ->
279279 procs = Pipeline . start_pipeline ( opts )
280- source = Pipeline . await_source_ready ( )
280+ source = await_source_ready ( )
281281
282282 Task . async ( fn ->
283283 Process . monitor ( procs . supervisor )
@@ -286,7 +286,7 @@ defmodule Boombox do
286286
287287 % { output: { :stream , _stream_opts } } ->
288288 procs = Pipeline . start_pipeline ( opts )
289- sink = Pipeline . await_sink_ready ( )
289+ sink = await_sink_ready ( )
290290 produce_stream ( sink , procs )
291291
292292 % { input: { :writer , _writer_opts } } ->
@@ -311,7 +311,7 @@ defmodule Boombox do
311311 task =
312312 Task . async ( fn ->
313313 Process . monitor ( procs . supervisor )
314- Pipeline . await_pipeline ( procs )
314+ await_pipeline ( procs )
315315 end )
316316
317317 await_external_resource_ready ( )
@@ -322,7 +322,7 @@ defmodule Boombox do
322322
323323 Task . async ( fn ->
324324 Process . monitor ( procs . supervisor )
325- Pipeline . await_pipeline ( procs )
325+ await_pipeline ( procs )
326326 end )
327327 end
328328 end
@@ -471,7 +471,7 @@ defmodule Boombox do
471471
472472 _state ->
473473 send ( source , { :boombox_eos , self ( ) } )
474- Pipeline . await_pipeline ( procs )
474+ await_pipeline ( procs )
475475 end
476476 end
477477
@@ -495,12 +495,39 @@ defmodule Boombox do
495495 end
496496 end ,
497497 fn
498- % { procs: procs } -> Pipeline . terminate_pipeline ( procs )
498+ % { procs: procs } -> terminate_pipeline ( procs )
499499 :eos -> :ok
500500 end
501501 )
502502 end
503503
504+ @ spec terminate_pipeline ( Pipeline . procs ( ) ) :: :ok
505+ defp terminate_pipeline ( procs ) do
506+ Membrane.Pipeline . terminate ( procs . pipeline )
507+ await_pipeline ( procs )
508+ end
509+
510+ @ spec await_pipeline ( Pipeline . procs ( ) ) :: :ok
511+ defp await_pipeline ( % { supervisor: supervisor } ) do
512+ receive do
513+ { :DOWN , _monitor , :process , ^ supervisor , _reason } -> :ok
514+ end
515+ end
516+
517+ @ spec await_source_ready ( ) :: pid ( )
518+ defp await_source_ready ( ) do
519+ receive do
520+ { :boombox_elixir_source , source } -> source
521+ end
522+ end
523+
524+ @ spec await_sink_ready ( ) :: pid ( )
525+ defp await_sink_ready ( ) do
526+ receive do
527+ { :boombox_elixir_sink , sink } -> sink
528+ end
529+ end
530+
504531 # Waits for the external resource to be ready.
505532 # This is used to wait for the tcp/udp server to be ready before returning from async/2.
506533 # It is used for rtmp, rtmps, rtp, rtsp.
0 commit comments