Skip to content

Commit

Permalink
Use interpreter runtime in wasm tests (#1746)
Browse files Browse the repository at this point in the history
* use interpreter runtime in wasm tests

* small refactor

* remove redundant cleanup
  • Loading branch information
lovromazgon authored Aug 6, 2024
1 parent d6e83ce commit d370cd5
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 28 deletions.
7 changes: 6 additions & 1 deletion pkg/plugin/processor/standalone/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ import (
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
)

// newRuntime is a function that creates a new Wazero runtime. This is used to
// allow tests to replace the runtime with an interpreter runtime, as it's much
// faster than the compiler runtime.
var newRuntime = wazero.NewRuntime

// Registry is a directory registry of processor plugins, organized by plugin
// type, name and version.
// Every file in the specified directory is considered a plugin
Expand Down Expand Up @@ -81,7 +86,7 @@ func NewRegistry(logger log.CtxLogger, pluginDir string, schemaService pprocutil
}

// we are using the wasm compiler, context is not used
runtime := wazero.NewRuntime(ctx)
runtime := newRuntime(ctx)
// TODO close runtime on shutdown

_, err := wasi_snapshot_preview1.Instantiate(ctx, runtime)
Expand Down
23 changes: 4 additions & 19 deletions pkg/plugin/processor/standalone/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,6 @@ import (
"github.com/matryer/is"
)

func TestRegistry_List(t *testing.T) {
is := is.New(t)

underTest, err := NewRegistry(log.Test(t), testPluginChaosDir, schema.NewInMemoryService())
is.NoErr(err)
list := underTest.List()
is.Equal(1, len(list))
got, ok := list["standalone:[email protected]"]
is.True(ok) // expected spec for standalone:[email protected]

want := ChaosProcessorSpecifications()

is.Equal("", cmp.Diff(got, want))
}

func TestRegistry_MalformedProcessor(t *testing.T) {
is := is.New(t)

Expand Down Expand Up @@ -105,9 +90,9 @@ func TestRegistry_ChaosProcessor(t *testing.T) {

t.Run("ConcurrentProcessors", func(t *testing.T) {
const (
// spawn 50 processors, each processing 50 records simultaneously
processorCount = 50
recordCount = 50
// spawn 25 processors, each processing 25 records simultaneously
processorCount = 25
recordCount = 25
)

var wg csync.WaitGroup
Expand Down Expand Up @@ -138,7 +123,7 @@ func TestRegistry_ChaosProcessor(t *testing.T) {
is.NoErr(p.Teardown(ctx))
}(i + 1)
}
err = wg.WaitTimeout(ctx, time.Minute)
err = wg.WaitTimeout(ctx, 2*time.Minute)
is.NoErr(err)
})

Expand Down
31 changes: 23 additions & 8 deletions pkg/plugin/processor/standalone/standalone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/csync"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/stealthrocket/wazergo"
Expand Down Expand Up @@ -72,21 +71,34 @@ func TestMain(m *testing.M) {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

err := cmd.Run()
exitOnError(err, "error executing bash script")
{
fmt.Printf("Building test processors (%s)...\n", cmd.String())
start := time.Now()

err := cmd.Run()
exitOnError(err, "error executing bash script")

fmt.Printf("Built test processors in %v\n", time.Since(start))
}

// instantiate shared test runtime
ctx := context.Background()
TestRuntime = wazero.NewRuntime(ctx)

_, err = wasi_snapshot_preview1.Instantiate(ctx, TestRuntime)
// use interpreter runtime as it's faster for tests
newRuntime = func(ctx context.Context) wazero.Runtime {
cfg := wazero.NewRuntimeConfigInterpreter()
return wazero.NewRuntimeWithConfig(ctx, cfg)
}

TestRuntime = newRuntime(ctx)

_, err := wasi_snapshot_preview1.Instantiate(ctx, TestRuntime)
exitOnError(err, "error instantiating WASI")

CompiledHostModule, err = wazergo.Compile(ctx, TestRuntime, hostModule)
exitOnError(err, "error compiling host module")

// load test processors
var wg csync.WaitGroup
for path, t := range testProcessorPaths {
*t.V1, err = os.ReadFile(path)
exitOnError(err, "error reading file "+path)
Expand All @@ -95,13 +107,16 @@ func TestMain(m *testing.M) {
continue
}

fmt.Printf("Compiling module %s...\n", path)
start := time.Now()

// note that modules can't be compiled in parallel, because the runtime
// is not thread-safe
*t.V2, err = TestRuntime.CompileModule(ctx, *t.V1)
exitOnError(err, "error compiling module "+path)

fmt.Printf("Compiled module %s in %v\n", path, time.Since(start))
}
err = wg.WaitTimeout(ctx, time.Minute)
exitOnError(err, "timed out waiting on modules to compile")

// run tests
code := m.Run()
Expand Down

0 comments on commit d370cd5

Please sign in to comment.