Skip to content

Commit

Permalink
Merge pull request #28 from membraneframework/MS-101-file-plugin-exam…
Browse files Browse the repository at this point in the history
…ple-usage

Add usage examples and integration tests
  • Loading branch information
bblaszkow06 authored May 17, 2022
2 parents 9895d4c + 52669c9 commit e716f33
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
examples/input.bin

compile_commands.json
.gdb_history
bundlex.sh
bundlex.bat
/priv/plts/*
!/priv/plts/.gitkeep

# Dir generated by tmp_dir ExUnit tag
/tmp/
Expand Down
30 changes: 10 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,23 @@ The package can be installed by adding `membrane_file_plugin` to your list of de
```elixir
def deps do
[
{:membrane_file_plugin, "~> 0.12.0"}
{membrane_file_plugin, "~> 0.12.0"}
]
end
```

## Sample usage
## Usage examples

Playing below pipeline should copy `/etc/passwd` to `./test`:
### File.Sink and File.Source

```elixir
defmodule FileExamplePipeline do
use Membrane.Pipeline

@doc false
@impl true
def handle_init(_) do
children = [
file_src: %Membrane.File.Source{location: "/etc/passwd"},
file_sink: %Membrane.File.Sink{location: "./test"},
]
links = [link(:file_src) |> to(:file_sink)]

{{:ok, spec: %ParentSpec{children: children, links: links}}, %{}}
end
end
`Source` and `Sink` elements allow reading from and writing to a file, respectively.
The pipeline in `./examples/sink_and_source.exs` will copy the contents of that script to `/tmp/example.exs`

```
### File.MultiSink

`MultiSink` allows writing to multiple files, with the input being split into parts.
The example in `./examples/sink_multi.exs` will generate 0-filled input file of 1024 bytes (`input.bin`)
and copy first 10-bytes to `/tmp/output0.bin` and the rest to `/tmp/output1.bin`.

## Copyright and License

Expand Down
33 changes: 33 additions & 0 deletions examples/sink_and_source.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
Mix.install([
{:membrane_core, "~> 0.10.0"},
{:membrane_file_plugin, path: Path.expand(__DIR__ <> "/..")}
])

defmodule FileExamplePipeline do
use Membrane.Pipeline

@doc false
@impl true
def handle_init(target) do
links =
[
file_src: %Membrane.File.Source{location: __ENV__.file},
file_sink: %Membrane.File.Sink{location: "/tmp/test"}
]
|> ParentSpec.link_linear()

{{:ok, spec: %ParentSpec{links: links}, playback: :playing}, %{target: target}}
end

@impl true
def handle_element_end_of_stream({:file_sink, :input}, _ctx, state) do
send(state.target, :done)
{:ok, state}
end
end

{:ok, pid} = FileExamplePipeline.start_link(self())

receive do
:done -> FileExamplePipeline.terminate(pid)
end
74 changes: 74 additions & 0 deletions examples/sink_multi.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
Mix.install([
{:membrane_core, "~> 0.10.0"},
{:membrane_file_plugin, path: Path.expand(__DIR__ <> "/..")}
])

# Filter responsible for generating split events
defmodule Splitter do
use Membrane.Filter

alias Membrane.Buffer
alias Membrane.File.SplitEvent

def_input_pad :input, demand_unit: :bytes, demand_mode: :auto, caps: Membrane.RemoteStream
def_output_pad :output, demand_mode: :auto, caps: Membrane.RemoteStream

def_options head_size: [type: :integer]

def handle_init(opts) do
{:ok, opts |> Map.from_struct() |> Map.put(:split?, true)}
end

@impl true
def handle_process(:input, buffer, _ctx, %{head_size: head_size, split?: true}) do
<<head::binary-size(head_size), tail::binary>> = buffer.payload

actions = [
buffer: {:output, %Buffer{payload: head}},
event: {:output, %SplitEvent{}},
buffer: {:output, %Buffer{payload: tail}}
]

{{:ok, actions}, %{split?: false}}
end

def handle_process(:input, buffer, _ctx, %{split?: false}) do
{{:ok, buffer: {:output, buffer}}, %{split?: false}}
end
end

:ok = File.write!("input.bin", <<0::integer-unit(8)-size(1024)>>)

defmodule SinkMultiExamplePipeline do
use Membrane.Pipeline

@doc false
@impl true
def handle_init(target) do
links =
[
file_source: %Membrane.File.Source{location: "input.bin"},
filter: %Splitter{head_size: 10},
file_sink: %Membrane.File.Sink.Multi{location: "/tmp/output", extension: ".bin"}
]
|> ParentSpec.link_linear()

{{:ok, spec: %ParentSpec{links: links}, playback: :playing}, %{target: target}}
end

@impl true
def handle_element_end_of_stream({:file_sink, :input}, _ctx, state) do
send(state.target, :done)
{:ok, state}
end

def handle_element_end_of_stream(_other, _ctx, state) do
{:ok, state}
end
end

{:ok, pid} = SinkMultiExamplePipeline.start_link(self())

receive do
:done -> SinkMultiExamplePipeline.terminate(pid)
end
119 changes: 119 additions & 0 deletions test/membrane_file/sink_source_integration_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
defmodule Membrane.File.SinkSourceIntegrationTest do
use ExUnit.Case, async: false

import Membrane.Testing.Assertions
import Mox, only: [set_mox_global: 1]

alias Membrane.File, as: MbrFile
alias Membrane.ParentSpec
alias Membrane.Testing.Pipeline

@moduletag :tmp_dir

setup :set_mox_global

setup %{tmp_dir: tmp_dir} do
input_path = Path.join(tmp_dir, "input.bin")
output_path = Path.join(tmp_dir, "output.bin")
input_size = 10_240
content = <<0::integer-unit(8)-size(input_size)>>
:ok = File.write!(input_path, content)
Mox.stub_with(Membrane.File.CommonMock, Membrane.File.CommonFile)
[input_path: input_path, output_path: output_path, input_size: input_size, content: content]
end

test "File copy", ctx do
children = [
file_source: %MbrFile.Source{location: ctx.input_path},
file_sink: %MbrFile.Sink{location: ctx.output_path}
]

assert {:ok, pid} = Pipeline.start_link(links: ParentSpec.link_linear(children))
assert_start_of_stream(pid, :file_sink, :input)
assert_end_of_stream(pid, :file_sink, :input, 5_000)
Pipeline.terminate(pid, blocking?: true)

assert File.read!(ctx.output_path) == ctx.content
end

defmodule EmptyFilter do
use Membrane.Filter

def_input_pad :input, demand_unit: :bytes, demand_mode: :auto, caps: Membrane.RemoteStream
def_output_pad :output, demand_mode: :auto, caps: Membrane.RemoteStream

@impl true
def handle_process(:input, buffer, _ctx, state) do
{{:ok, buffer: {:output, buffer}}, state}
end
end

test "File copy with filter", ctx do
children = [
file_source: %MbrFile.Source{location: ctx.input_path},
filter: EmptyFilter,
file_sink: %MbrFile.Sink{location: ctx.output_path}
]

assert {:ok, pid} = Pipeline.start_link(links: ParentSpec.link_linear(children))
assert_start_of_stream(pid, :file_sink, :input)
assert_end_of_stream(pid, :file_sink, :input, 5_000)
Pipeline.terminate(pid, blocking?: true)

assert File.read!(ctx.output_path) == ctx.content
end

defmodule Splitter do
use Membrane.Filter

alias Membrane.Buffer
alias Membrane.File.SplitEvent

def_input_pad :input, demand_unit: :bytes, demand_mode: :auto, caps: Membrane.RemoteStream
def_output_pad :output, demand_mode: :auto, caps: Membrane.RemoteStream

def_options head_size: [type: :integer]

@impl true
def handle_init(opts) do
{:ok, opts |> Map.from_struct() |> Map.put(:split?, true)}
end

@impl true
def handle_process(:input, buffer, _ctx, %{head_size: head_size, split?: true}) do
<<head::binary-size(head_size), tail::binary>> = buffer.payload

actions = [
buffer: {:output, %Buffer{payload: head}},
event: {:output, %SplitEvent{}},
buffer: {:output, %Buffer{payload: tail}}
]

{{:ok, actions}, %{split?: false}}
end

def handle_process(:input, buffer, _ctx, %{split?: false}) do
{{:ok, buffer: {:output, buffer}}, %{split?: false}}
end
end

test "MultiSink with splitter", ctx do
head_size = 10

children = [
file_source: %MbrFile.Source{location: ctx.input_path},
filter: %Splitter{head_size: 10},
file_sink: %MbrFile.Sink.Multi{location: ctx.output_path, extension: ".bin"}
]

assert {:ok, pid} = Pipeline.start_link(links: ParentSpec.link_linear(children))
assert_start_of_stream(pid, :file_sink, :input)
assert_end_of_stream(pid, :file_sink, :input, 5_000)
Pipeline.terminate(pid, blocking?: true)

assert File.read!(ctx.output_path <> "0.bin") == binary_part(ctx.content, 0, head_size)

assert File.read!(ctx.output_path <> "1.bin") ==
binary_part(ctx.content, head_size, ctx.input_size - head_size)
end
end

0 comments on commit e716f33

Please sign in to comment.