Skip to content

Commit

Permalink
Merge pull request #31 from membraneframework/rewrite-on-core-0.11
Browse files Browse the repository at this point in the history
Rewrite on core 0.11
  • Loading branch information
FelonEkonom authored Nov 18, 2022
2 parents e716f33 + 6f8a3db commit ebf3b9a
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 194 deletions.
20 changes: 9 additions & 11 deletions examples/sink_and_source.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Mix.install([
{:membrane_core, "~> 0.10.0"},
{:membrane_core, "~> 0.11"},
{:membrane_file_plugin, path: Path.expand(__DIR__ <> "/..")}
])

Expand All @@ -8,25 +8,23 @@ defmodule FileExamplePipeline do

@doc false
@impl true
def handle_init(target) do
links =
[
file_src: %Membrane.File.Source{location: __ENV__.file},
file_sink: %Membrane.File.Sink{location: "/tmp/test"}
]
|> ParentSpec.link_linear()
def handle_init(_ctx, target) do
structure = [
child(:file_src, %Membrane.File.Source{location: __ENV__.file})
|> child(:file_sink, %Membrane.File.Sink{location: "/tmp/test"})
]

{{:ok, spec: %ParentSpec{links: links}, playback: :playing}, %{target: target}}
{[spec: structure, playback: :playing], %{target: target}}
end

@impl true
def handle_element_end_of_stream({:file_sink, :input}, _ctx, state) do
send(state.target, :done)
{:ok, state}
{[], state}
end
end

{:ok, pid} = FileExamplePipeline.start_link(self())
{:ok, _supervisor_pid, pid} = FileExamplePipeline.start_link(self())

receive do
:done -> FileExamplePipeline.terminate(pid)
Expand Down
38 changes: 18 additions & 20 deletions examples/sink_multi.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Mix.install([
{:membrane_core, "~> 0.10.0"},
{:membrane_core, "~> 0.11"},
{:membrane_file_plugin, path: Path.expand(__DIR__ <> "/..")}
])

Expand All @@ -10,13 +10,13 @@ defmodule Splitter do
alias Membrane.Buffer
alias Membrane.File.SplitEvent

def_input_pad :input, demand_unit: :bytes, demand_mode: :auto, caps: Membrane.RemoteStream
def_output_pad :output, demand_mode: :auto, caps: Membrane.RemoteStream
def_input_pad :input, demand_unit: :bytes, demand_mode: :auto, accepted_format: Membrane.RemoteStream
def_output_pad :output, demand_mode: :auto, accepted_format: Membrane.RemoteStream

def_options head_size: [type: :integer]

def handle_init(opts) do
{:ok, opts |> Map.from_struct() |> Map.put(:split?, true)}
def handle_init(_ctx, opts) do
{[], opts |> Map.from_struct() |> Map.put(:split?, true)}
end

@impl true
Expand All @@ -29,11 +29,11 @@ defmodule Splitter do
buffer: {:output, %Buffer{payload: tail}}
]

{{:ok, actions}, %{split?: false}}
{ actions, %{split?: false}}
end

def handle_process(:input, buffer, _ctx, %{split?: false}) do
{{:ok, buffer: {:output, buffer}}, %{split?: false}}
{[buffer: {:output, buffer}], %{split?: false}}
end
end

Expand All @@ -44,30 +44,28 @@ defmodule SinkMultiExamplePipeline do

@doc false
@impl true
def handle_init(target) do
links =
[
file_source: %Membrane.File.Source{location: "input.bin"},
filter: %Splitter{head_size: 10},
file_sink: %Membrane.File.Sink.Multi{location: "/tmp/output", extension: ".bin"}
]
|> ParentSpec.link_linear()

{{:ok, spec: %ParentSpec{links: links}, playback: :playing}, %{target: target}}
def handle_init(_ctx, target) do
structure = [
child(:file_source, %Membrane.File.Source{location: "input.bin"})
|> child(:filter, %Splitter{head_size: 10})
|> child(:file_sink, %Membrane.File.Sink.Multi{location: "/tmp/output", extension: ".bin"})
]

{[spec: structure, playback: :playing], %{target: target}}
end

@impl true
def handle_element_end_of_stream({:file_sink, :input}, _ctx, state) do
send(state.target, :done)
{:ok, state}
{[], state}
end

def handle_element_end_of_stream(_other, _ctx, state) do
{:ok, state}
{[], state}
end
end

{:ok, pid} = SinkMultiExamplePipeline.start_link(self())
{:ok, _supervisor_pid, pid} = SinkMultiExamplePipeline.start_link(self())

receive do
:done -> SinkMultiExamplePipeline.terminate(pid)
Expand Down
31 changes: 15 additions & 16 deletions lib/membrane_file/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ defmodule Membrane.File.Sink do
description: "Path of the output file"
]

def_input_pad :input, demand_unit: :buffers, caps: :any
def_input_pad :input, demand_unit: :buffers, accepted_format: _any

@impl true
def handle_init(%__MODULE__{location: location}) do
{:ok,
def handle_init(_ctx, %__MODULE__{location: location}) do
{[],
%{
location: Path.expand(location),
temp_location: Path.expand(location <> ".tmp"),
Expand All @@ -33,21 +33,27 @@ defmodule Membrane.File.Sink do
end

@impl true
def handle_stopped_to_prepared(_ctx, %{location: location} = state) do
def handle_setup(ctx, %{location: location} = state) do
fd = @common_file.open!(location, [:read, :write])
:ok = @common_file.truncate!(fd)
{:ok, %{state | fd: fd}}

Membrane.ResourceGuard.register(
ctx.resource_guard,
fn -> @common_file.close!(fd) end
)

{[], %{state | fd: fd}}
end

@impl true
def handle_prepared_to_playing(_ctx, state) do
{{:ok, demand: :input}, state}
def handle_playing(_ctx, state) do
{[demand: :input], state}
end

@impl true
def handle_write(:input, buffer, _ctx, %{fd: fd} = state) do
:ok = @common_file.write!(fd, buffer)
{{:ok, demand: :input}, state}
{[demand: :input], state}
end

@impl true
Expand All @@ -59,18 +65,11 @@ defmodule Membrane.File.Sink do
seek_file(state, position)
end

{:ok, state}
{[], state}
end

def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

@impl true
def handle_prepared_to_stopped(_ctx, %{fd: fd} = state) do
state = maybe_merge_temporary(state)
:ok = @common_file.close!(fd)
{:ok, %{state | fd: nil}}
end

defp seek_file(%{fd: fd} = state, position) do
state = maybe_merge_temporary(state)
_position = @common_file.seek!(fd, position)
Expand Down
35 changes: 19 additions & 16 deletions lib/membrane_file/sink_multi.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ defmodule Membrane.File.Sink.Multi do
@spec default_naming_fun(Path.t(), non_neg_integer(), String.t()) :: Path.t()
def default_naming_fun(path, i, ext), do: [path, i, ext] |> Enum.join() |> Path.expand()

def_input_pad :input, demand_unit: :buffers, caps: :any
def_input_pad :input, demand_unit: :buffers, accepted_format: _any

@impl true
def handle_init(%__MODULE__{} = options) do
{:ok,
def handle_init(_ctx, %__MODULE__{} = options) do
{[],
%{
naming_fun: &options.naming_fun.(options.location, &1, options.extension),
split_on: options.split_event,
Expand All @@ -57,41 +57,44 @@ defmodule Membrane.File.Sink.Multi do
end

@impl true
def handle_stopped_to_prepared(_ctx, state), do: {:ok, open(state)}
def handle_setup(ctx, state), do: {[], open(state, ctx)}

@impl true
def handle_prepared_to_playing(_ctx, state) do
{{:ok, demand: :input}, state}
def handle_playing(_ctx, state) do
{[demand: :input], state}
end

@impl true
def handle_event(:input, %split_on{}, _ctx, %{split_on: split_on} = state) do
def handle_event(:input, %split_on{}, ctx, %{split_on: split_on} = state) do
state =
state
|> close()
|> open()
|> close(ctx)
|> open(ctx)

{:ok, state}
{[], state}
end

def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

@impl true
def handle_write(:input, buffer, _ctx, %{fd: fd} = state) do
:ok = @common_file.write!(fd, buffer)
{{:ok, demand: :input}, state}
{[demand: :input], state}
end

@impl true
def handle_prepared_to_stopped(_ctx, state), do: {:ok, close(state)}
# @impl true
# def handle_prepared_to_stopped(_ctx, state), do: {:ok, close(state)}

defp open(%{naming_fun: naming_fun, index: index} = state) do
defp open(%{naming_fun: naming_fun, index: index} = state, ctx) do
fd = @common_file.open!(naming_fun.(index), :write)

Membrane.ResourceGuard.register(ctx.resource_guard, fn -> @common_file.close!(fd) end, tag: fd)

%{state | fd: fd}
end

defp close(%{fd: fd, index: index} = state) do
:ok = @common_file.close!(fd)
defp close(%{fd: fd, index: index} = state, ctx) do
Membrane.ResourceGuard.cleanup(ctx.resource_guard, fd)

%{state | fd: nil, index: index + 1}
end
Expand Down
28 changes: 14 additions & 14 deletions lib/membrane_file/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ defmodule Membrane.File.Source do
description: "Size of chunks being read"
]

def_output_pad :output, caps: {RemoteStream, type: :bytestream}
def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}

@impl true
def handle_init(%__MODULE__{location: location, chunk_size: size}) do
{:ok,
def handle_init(_ctx, %__MODULE__{location: location, chunk_size: size}) do
{[],
%{
location: Path.expand(location),
chunk_size: size,
Expand All @@ -32,14 +32,20 @@ defmodule Membrane.File.Source do
end

@impl true
def handle_stopped_to_prepared(_ctx, %{location: location} = state) do
def handle_setup(ctx, %{location: location} = state) do
fd = @common_file.open!(location, :read)
{:ok, %{state | fd: fd}}

Membrane.ResourceGuard.register(
ctx.resource_guard,
fn -> @common_file.close!(fd) end
)

{[], %{state | fd: fd}}
end

@impl true
def handle_prepared_to_playing(_ctx, state) do
{{:ok, caps: {:output, %RemoteStream{type: :bytestream}}}, state}
def handle_playing(_ctx, state) do
{[stream_format: {:output, %RemoteStream{type: :bytestream}}], state}
end

@impl true
Expand All @@ -62,12 +68,6 @@ defmodule Membrane.File.Source do
[end_of_stream: :output]
end

{{:ok, actions}, state}
end

@impl true
def handle_prepared_to_stopped(_ctx, %{fd: fd} = state) do
:ok = @common_file.close!(fd)
{:ok, %{state | fd: nil}}
{actions, state}
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule Membrane.File.Plugin.Mixfile do

defp deps do
[
{:membrane_core, "~> 0.10.0"},
{:membrane_core, "~> 0.11"},
# Testing
{:mox, "~> 1.0", only: :test},
# Development
Expand Down
4 changes: 2 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%{
"bunch": {:hex, :bunch, "1.3.1", "f8fe80042f9eb474ef2801ae2c9372f9b13d11e7053265dcfc24b9d912e3750b", [:mix], [], "hexpm", "00e21b16ff9bb698b728a01a2fc4b3bf7fc0e87c4bb9c6e4a442324aa8c5e567"},
"bunch": {:hex, :bunch, "1.5.0", "78530e85bc3f53e1711dba654565692e2015cb6d1685e9b53bf7606b14a36c71", [:mix], [], "hexpm", "2c32f8da5d4c9e7a534c8c25aea150da696dd8624ce64f97c21ee193c12258e5"},
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"},
"credo": {:hex, :credo, "1.6.4", "ddd474afb6e8c240313f3a7b0d025cc3213f0d171879429bf8535d7021d9ad78", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "c28f910b61e1ff829bffa056ef7293a8db50e87f2c57a9b5c3f57eee124536b7"},
Expand All @@ -12,7 +12,7 @@
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"membrane_core": {:hex, :membrane_core, "0.10.0", "1d010bc632f6abb575c60e37607a0cb56fbf88bbb9d0d78a4b5410611563239a", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "64d63514aabf71c93136ee86093b7ca60cf4317a09b137e0c08e064caa34636f"},
"membrane_core": {:hex, :membrane_core, "0.11.0", "63ae9f56834ec67680d634d8d69f71b2d46b94f4a0ec8fafcf22d8ce216b8f41", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "097584018fe948fa3013bfd6bcf002b3ad7cbd13f2259be4f1903f37a7aad7ab"},
"mox": {:hex, :mox, "1.0.1", "b651bf0113265cda0ba3a827fcb691f848b683c373b77e7d7439910a8d754d6e", [:mix], [], "hexpm", "35bc0dea5499d18db4ef7fe4360067a59b06c74376eb6ab3bd67e6295b133469"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"},
Expand Down
Loading

0 comments on commit ebf3b9a

Please sign in to comment.