Skip to content

Commit 5a236ad

Browse files
committed
Apply reviewers suggestions
1 parent a053f6c commit 5a236ad

File tree

16 files changed

+328
-363
lines changed

16 files changed

+328
-363
lines changed

examples.livemd

Lines changed: 25 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ System.put_env("PATH", "/opt/homebrew/bin:#{System.get_env("PATH")}")
1111
# Examples that don't mention them should still work.
1212

1313
# MIX_INSTALL_CONFIG_BEGIN
14-
boombox = {:boombox, github: "membraneframework/boombox"}
14+
boombox = {:boombox, github: "membraneframework/boombox", branch: "refactor-elixir-endpoints"}
1515

1616
# This livebook uses boombox from the master branch. If any examples happen to not work, the latest stable version of this livebook
1717
# can be found on https://hexdocs.pm/boombox/examples.html or in the latest github release.
@@ -24,7 +24,8 @@ Mix.install([
2424
:exla,
2525
:bumblebee,
2626
:websockex,
27-
:membrane_simple_rtsp_server
27+
:membrane_simple_rtsp_server,
28+
{:coerce, ">= 1.0.2"}
2829
])
2930

3031
Nx.global_default_backend(EXLA.Backend)
@@ -573,19 +574,8 @@ reader2 =
573574

574575
writer = Boombox.run(input: {:writer, video: :image, audio: false}, output: output)
575576

576-
Stream.unfold(%{}, fn _state ->
577+
Stream.repeatedly(fn ->
577578
case {Boombox.read(reader1), Boombox.read(reader2)} do
578-
{:finished, :finished} ->
579-
nil
580-
581-
{{:ok, _packet}, :finished} ->
582-
Boombox.close(reader1)
583-
nil
584-
585-
{:finished, {:ok, _packet}} ->
586-
Boombox.close(reader2)
587-
nil
588-
589579
{{:ok, packet1}, {:ok, packet2}} ->
590580
joined_image =
591581
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)
@@ -598,12 +588,15 @@ Stream.unfold(%{}, fn _state ->
598588

599589
Boombox.write(writer, packet)
600590

601-
{nil, %{}}
591+
_finished ->
592+
:eos
602593
end
603594
end)
604-
|> Stream.run()
595+
|> Enum.find(& &1 == :eos)
605596

606597
Boombox.close(writer)
598+
Boombox.close(reader1)
599+
Boombox.close(reader2)
607600
```
608601

609602
The second cell uses `:message` endpoints, meaning that the server communicates with boomboxes by
@@ -620,39 +613,39 @@ defmodule MyServer do
620613

621614
@impl true
622615
def init(args) do
623-
bb1 = Boombox.run(input: args.input1, output: {:message, video: :image, audio: false})
624-
bb2 = Boombox.run(input: args.input2, output: {:message, video: :image, audio: false})
616+
boombox1 = Boombox.run(input: args.input1, output: {:message, video: :image, audio: false})
617+
boombox2 = Boombox.run(input: args.input2, output: {:message, video: :image, audio: false})
625618
output_writer =
626619
Boombox.run(input: {:writer, video: :image, audio: false}, output: args.output)
627620

628621
{:ok,
629622
%{
630-
bb_states: %{
631-
bb1: %{last_packet: nil, eos: false},
632-
bb2: %{last_packet: nil, eos: false}
623+
boombox_states: %{
624+
boombox1: %{last_packet: nil, eos: false},
625+
boombox2: %{last_packet: nil, eos: false}
633626
},
634-
bbs: %{bb1 => :bb1, bb2 => :bb2},
627+
boomboxes: %{boombox1 => :boombox1, boombox2 => :boombox2},
635628
output_writer: output_writer
636629
}}
637630
end
638631

639632
@impl true
640633
def handle_info({:boombox_packet, bb, %Boombox.Packet{} = packet}, state) do
641-
boombox_id = state.bbs[bb]
642-
state = put_in(state.bb_states[boombox_id].last_packet, packet)
634+
boombox_id = state.boomboxes[bb]
635+
state = put_in(state.boombox_states[boombox_id].last_packet, packet)
643636

644-
if Enum.all?(Map.values(state.bb_states), &(&1.last_packet != nil)) do
637+
if Enum.all?(Map.values(state.boombox_states), &(&1.last_packet != nil)) do
645638
joined_image =
646639
Vix.Vips.Operation.join!(
647-
state.bb_states.bb1.last_packet.payload,
648-
state.bb_states.bb2.last_packet.payload,
640+
state.boombox_states.boombox1.last_packet.payload,
641+
state.boombox_states.boombox2.last_packet.payload,
649642
:VIPS_DIRECTION_HORIZONTAL
650643
)
651644

652645
pts =
653646
max(
654-
state.bb_states.bb1.last_packet.pts,
655-
state.bb_states.bb2.last_packet.pts
647+
state.boombox_states.boombox1.last_packet.pts,
648+
state.boombox_states.boombox2.last_packet.pts
656649
)
657650

658651
packet = %Boombox.Packet{packet | payload: joined_image, pts: pts}
@@ -665,10 +658,10 @@ defmodule MyServer do
665658

666659
@impl true
667660
def handle_info({:boombox_finished, bb}, state) do
668-
boombox_id = state.bbs[bb]
669-
state = put_in(state.bb_states[boombox_id].eos, true)
661+
boombox_id = state.boomboxes[bb]
662+
state = put_in(state.boombox_states[boombox_id].eos, true)
670663

671-
if Enum.all?(Map.values(state.bb_states), & &1.eos) do
664+
if Enum.all?(Map.values(state.boombox_states), & &1.eos) do
672665
Boombox.close(state.output_writer)
673666
{:stop, :normal, state}
674667
else
@@ -686,7 +679,6 @@ monitor = Process.monitor(server)
686679

687680
receive do
688681
{:DOWN, ^monitor, :process, ^server, reason} ->
689-
IO.inspect(reason)
690682
:ok
691683
end
692684
```

lib/boombox.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ defmodule Boombox do
99
require Membrane.Time
1010
require Membrane.Transcoder.{Audio, Video}
1111

12+
alias Boombox.Pipeline
1213
alias Membrane.HTTPAdaptiveStream
1314
alias Membrane.RTP
14-
alias Boombox.Pipeline
1515

1616
@elixir_endpoints [:stream, :message, :writer, :reader]
1717

@@ -390,7 +390,7 @@ defmodule Boombox do
390390
any more packets with `write/2` and should terminate accordingly.
391391
392392
"""
393-
@spec close(Writer.t() | Reader.t()) :: :ok | {:error, :incompatible_mode}
393+
@spec close(Writer.t() | Reader.t()) :: :ok | {:error, :incompatible_mode | :already_finished}
394394
def close(%Writer{} = writer) do
395395
Boombox.Server.finish_consuming(writer.server_reference)
396396
end

lib/boombox/bin.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ defmodule Boombox.Bin do
163163
spec =
164164
child(:boombox, %Boombox.InternalBin{
165165
input: opts.input || :membrane_pad,
166-
output: opts.output || :membrane_pad
166+
output: opts.output || :membrane_pad,
167+
parent: self()
167168
})
168169

169170
{[spec: spec], Map.from_struct(opts)}

lib/boombox/internal_bin.ex

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ defmodule Boombox.InternalBin do
135135
]
136136

137137
def_options input: [spec: input()],
138-
output: [spec: output()]
138+
output: [spec: output()],
139+
parent: [spec: pid()]
139140

140141
@impl true
141142
def handle_init(ctx, opts) do
@@ -147,8 +148,8 @@ defmodule Boombox.InternalBin do
147148

148149
state =
149150
%State{
150-
input: parse_endpoint_opt!(:input, opts.input),
151-
output: parse_endpoint_opt!(:output, opts.output),
151+
input: parse_endpoint_opt!(:input, opts.input, opts.parent),
152+
output: parse_endpoint_opt!(:output, opts.output, opts.parent),
152153
status: :init
153154
}
154155

@@ -730,14 +731,14 @@ defmodule Boombox.InternalBin do
730731
Process.sleep(500)
731732
end
732733

733-
@spec parse_endpoint_opt!(:input, input()) :: input()
734-
@spec parse_endpoint_opt!(:output, output()) :: output()
735-
defp parse_endpoint_opt!(direction, value) when is_binary(value) do
736-
parse_endpoint_opt!(direction, {value, []})
734+
@spec parse_endpoint_opt!(:input, input(), pid()) :: input()
735+
@spec parse_endpoint_opt!(:output, output(), pid()) :: output()
736+
defp parse_endpoint_opt!(direction, value, parent) when is_binary(value) do
737+
parse_endpoint_opt!(direction, {value, []}, parent)
737738
end
738739

739740
# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
740-
defp parse_endpoint_opt!(direction, {value, opts}) when is_binary(value) do
741+
defp parse_endpoint_opt!(direction, {value, opts}, parent) when is_binary(value) do
741742
uri = URI.parse(value)
742743
scheme = uri.scheme
743744
extension = if uri.path, do: Path.extname(uri.path)
@@ -769,16 +770,16 @@ defmodule Boombox.InternalBin do
769770
_other ->
770771
raise ArgumentError, "Unsupported URI: #{value} for direction: #{direction}"
771772
end
772-
|> then(&parse_endpoint_opt!(direction, &1))
773+
|> then(&parse_endpoint_opt!(direction, &1, parent))
773774
end
774775

775776
# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
776-
defp parse_endpoint_opt!(direction, value) when is_tuple(value) or is_atom(value) do
777+
defp parse_endpoint_opt!(direction, value, parent) when is_tuple(value) or is_atom(value) do
777778
case value do
778779
{endpoint_type, location}
779780
when is_binary(location) and direction == :input and
780781
StorageEndpoints.is_storage_endpoint_type(endpoint_type) ->
781-
parse_endpoint_opt!(:input, {endpoint_type, location, []})
782+
parse_endpoint_opt!(:input, {endpoint_type, location, []}, parent)
782783

783784
{endpoint_type, location, opts}
784785
when endpoint_type in [:h264, :h265] and is_binary(location) and direction == :input ->
@@ -819,7 +820,7 @@ defmodule Boombox.InternalBin do
819820
value
820821

821822
{:whip, uri} when is_binary(uri) ->
822-
parse_endpoint_opt!(direction, {:whip, uri, []})
823+
parse_endpoint_opt!(direction, {:whip, uri, []}, parent)
823824

824825
{:whip, uri, opts} when is_binary(uri) and is_list(opts) and direction == :input ->
825826
if Keyword.keyword?(opts), do: {:webrtc, value}
@@ -850,9 +851,9 @@ defmodule Boombox.InternalBin do
850851
{:rtp, opts} ->
851852
if Keyword.keyword?(opts), do: value
852853

853-
{elixir_endpoint, process, opts}
854-
when is_pid(process) and elixir_endpoint in @elixir_endpoint_types ->
855-
if Keyword.keyword?(opts), do: value
854+
{elixir_endpoint, opts}
855+
when elixir_endpoint in @elixir_endpoint_types ->
856+
if Keyword.keyword?(opts), do: {elixir_endpoint, parent, opts}
856857

857858
{:srt, server_awaiting_accept}
858859
when direction == :input and is_pid(server_awaiting_accept) ->

lib/boombox/internal_bin/elixir_endpoints/pull_sink.ex

Lines changed: 0 additions & 35 deletions
This file was deleted.

lib/boombox/internal_bin/elixir_endpoints/pull_source.ex

Lines changed: 0 additions & 27 deletions
This file was deleted.

lib/boombox/internal_bin/elixir_endpoints/push_sink.ex

Lines changed: 0 additions & 31 deletions
This file was deleted.

lib/boombox/internal_bin/elixir_endpoints/push_source.ex

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)