-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for SRT ingest (instead of RTMP) #808
Comments
Hi, we currently use WebRTC for low-latency ingest. Regarding SRT, there are no plans for now, but my friend created Elixir bindings: https://github.com/Qizot/ex_libsrt. They should be easy to wrap in Membrane elements. |
@mat-hek Thanks for sharing that library. Would you be able to give a small example on how to integrate that with Membrane? |
Ok, so I played with it for a while and got a POC of a server. It only handles the happy path and single stream, but it seems to work. It uses FFmpeg as a client, receives stream via SRT and saves it to Mix.install([
{:ex_libsrt, github: "Qizot/ex_libsrt"},
# small fix for kim-company/membrane_mpeg_ts_plugin
{:membrane_mpeg_ts_plugin, github: "mat-hek/membrane_mpeg_ts_plugin", branch: "patch-1"},
:membrane_aac_plugin,
:membrane_h26x_plugin,
:membrane_mp4_plugin,
:membrane_file_plugin
])
defmodule Membrane.SRT.Source do
use Membrane.Source
alias ExLibSRT.Server
def_output_pad(:output, accepted_format: Membrane.RemoteStream, flow_control: :push)
def_options(port: [spec: :inet.port_number()], ip: [spec: String.t(), default: "0.0.0.0"])
@impl true
def handle_playing(_ctx, opts) do
{:ok, server} = Server.start(opts.ip, opts.port)
IO.inspect(:started)
{[stream_format: {:output, %Membrane.RemoteStream{}}], %{server: server}}
end
@impl true
def handle_info({:srt_server_connect_request, _address, _stream_id}, _ctx, state) do
IO.inspect(:accepting)
:ok = Server.accept_awaiting_connect_request(state.server)
IO.inspect(:accepted)
{[], state}
end
@impl true
def handle_info({:srt_data, _conn_id, payload}, _ctx, state) do
{[buffer: {:output, %Membrane.Buffer{payload: payload}}], state}
end
@impl true
def handle_info({:srt_server_conn_closed, _conn_id}, _ctx, state) do
{[end_of_stream: :output], state}
end
@impl true
def handle_info(message, _ctx, state) do
IO.inspect(message, label: :message)
{[], state}
end
end
defmodule Pipeline do
use Membrane.Pipeline
@impl true
def handle_init(_ctx, opts) do
spec =
child(%Membrane.SRT.Source{port: opts[:port]})
|> child(:demuxer, Membrane.MPEG.TS.Demuxer)
{[spec: spec], %{}}
end
@impl true
def handle_child_notification({:mpeg_ts_pmt, pmt}, :demuxer, _context, state) do
streams_spec =
Enum.map(pmt.streams, fn {id, %{stream_type: type}} ->
get_child(:demuxer)
|> via_out(Pad.ref(:output, {:stream_id, id}))
|> then(
&case type do
:H264 ->
&1 |> child(%Membrane.H264.Parser{output_stream_structure: :avc1})
:AAC ->
&1 |> child(%Membrane.AAC.Parser{out_encapsulation: :none, output_config: :esds})
end
)
|> get_child(:mp4)
end)
spec =
[
child(:mp4, Membrane.MP4.Muxer.ISOM)
|> child(:sink, %Membrane.File.Sink{location: "out.mp4"})
] ++ streams_spec
{[spec: spec], state}
end
@impl true
def handle_element_end_of_stream(:sink, _pad, _ctx, state) do
{[terminate: :normal], state}
end
@impl true
def handle_element_end_of_stream(_element, _pad, _ctx, state) do
{[], state}
end
end
{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(Pipeline, port: 1234)
Process.monitor(supervisor)
Process.sleep(1000)
url =
"https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s.mp4"
{_output, 0} = System.shell("ffmpeg -re -i #{url} -c copy -f mpegts srt://127.0.0.1:1234")
receive do
{:DOWN, _ref, _type, ^supervisor, _reason} -> :ok
end |
Hi there,
Would it be possible to add support for SRT ingest instead of RTMP? SRT is way better than RTMP in terms of latency.
The text was updated successfully, but these errors were encountered: