diff --git a/wasm/bench/cmd/wasigo/main.go b/wasm/bench/cmd/wasigo/main.go index a349f9688..acd18a3ab 100644 --- a/wasm/bench/cmd/wasigo/main.go +++ b/wasm/bench/cmd/wasigo/main.go @@ -45,14 +45,13 @@ func main() { wasm.NewStoreReaderInput("store.reader.2", createStore(ctx, "store.reader.2")), wasm.NewStoreWriterOutput("out", createStore(ctx, "out"), 1, "string"), ) - execStart := time.Now() + call := wasm.NewCall(nil, "mapBlock", "mapBlock", args) _, err = module.ExecuteNewCall(ctx, call, instance, args) if err != nil { panic(fmt.Errorf("executing call: %w", err)) } - fmt.Println("exec duration", time.Since(execStart)) - //fmt.Println("call output", string(call.Output())) + data := call.Output() output := &pb.MapBlockOutput{} err = output.UnmarshalVT(data) diff --git a/wasm/bench/substreams_wasi_go/lib/substreams.go b/wasm/bench/substreams_wasi_go/lib/substreams.go index 6f6b598a6..0369e97fe 100644 --- a/wasm/bench/substreams_wasi_go/lib/substreams.go +++ b/wasm/bench/substreams_wasi_go/lib/substreams.go @@ -12,7 +12,7 @@ import ( ) func mapBlock(inputs *MapBlockInput) (*pb.MapBlockOutput, error) { - log.Print(rand.Int()) + log.Printf("totally random number: %d", rand.Int()) rocketAddress := strings.ToLower("ae78736Cd615f374D3085123A210448E74Fc6393") @@ -29,7 +29,7 @@ func mapBlock(inputs *MapBlockInput) (*pb.MapBlockOutput, error) { log.Print("got value_123") log.Printf("%s", time.Now()) - time.Sleep(5 * time.Second) + time.Sleep(60 * time.Second) log.Printf("%s", time.Now()) trxCount := 0 @@ -66,7 +66,7 @@ func mapBlock(inputs *MapBlockInput) (*pb.MapBlockOutput, error) { if err != nil { return nil, err } - log.Print("set popo to lol") + log.Print("set key `popo` to value `lol`") output := &pb.MapBlockOutput{ TrxCount: uint32(trxCount), diff --git a/wasm/bench/substreams_wasi_go/main.wasm b/wasm/bench/substreams_wasi_go/main.wasm index 787472b24..1bfb032d4 100755 Binary files a/wasm/bench/substreams_wasi_go/main.wasm and b/wasm/bench/substreams_wasi_go/main.wasm differ diff --git a/wasm/wasi/args.go b/wasm/wasi/args.go index f5b3b1b73..1ef6a3c7a 100644 --- a/wasm/wasi/args.go +++ b/wasm/wasi/args.go @@ -29,8 +29,7 @@ func marshallArgs(args []wasm.Argument) ([]byte, error) { } data, err := protoscope.NewScanner(scopeData).Exec() if err != nil { - fmt.Println("scopeData", scopeData) - return nil, fmt.Errorf("scanning args: %w", err) + return nil, fmt.Errorf("scanning args: %w (scopeData: %s)", err, scopeData) } return data, nil diff --git a/wasm/wasi/fs/virtual.go b/wasm/wasi/fs/virtual.go index c0e560152..24c70152c 100644 --- a/wasm/wasi/fs/virtual.go +++ b/wasm/wasi/fs/virtual.go @@ -8,6 +8,7 @@ import ( "math/big" "strconv" "strings" + "sync" "time" "github.com/streamingfast/substreams/wasm" @@ -15,6 +16,13 @@ import ( type Virtual struct { ctx context.Context + + resultOnce sync.Once + result []byte +} + +func (v *Virtual) Result() []byte { + return v.result } func NewVirtualFs(ctx context.Context) *Virtual { @@ -24,7 +32,11 @@ func NewVirtualFs(ctx context.Context) *Virtual { } func (v *Virtual) Open(name string) (fs.File, error) { - return NewVirtualFile(v.ctx, name) + return newVirtualFile(v.ctx, name, func(b []byte) { + v.resultOnce.Do(func() { + v.result = b + }) + }) } type VirtualFile struct { @@ -33,17 +45,16 @@ type VirtualFile struct { nameParts []string Remaining []byte Loaded bool + + outputSetter func([]byte) } -func NewVirtualFile(ctx context.Context, name string) (*VirtualFile, error) { - //if !strings.HasPrefix(name, "/sys/") { - // fmt.Printf("invalid file name %q should start with /sys/\n", name) - // return NewVirtualFile(ctx, name) - //} +func newVirtualFile(ctx context.Context, name string, outputSetter func([]byte)) (*VirtualFile, error) { return &VirtualFile{ - ctx: ctx, - name: name, - nameParts: strings.Split(name, "/"), + ctx: ctx, + name: name, + nameParts: strings.Split(name, "/"), + outputSetter: outputSetter, }, nil } @@ -53,6 +64,11 @@ func (v *VirtualFile) Stat() (fs.FileInfo, error) { } func (v *VirtualFile) Write(bytes []byte) (sent int, err error) { + if strings.HasSuffix(v.name, "sys/substreams/output") { //special function + v.outputSetter(bytes) + return len(bytes), nil + } + err = dataToFile(v.ctx, v.nameParts[1:], bytes) //skip /sys if err != nil { return 0, fmt.Errorf("writing data for file %q: %w", v.name, err) @@ -147,13 +163,7 @@ func dataToFile(ctx context.Context, parts []string, data []byte) error { } func stateDataToFile(ctx context.Context, parts []string, data []byte) error { - //indexString := parts[0] - //index, err := strconv.Atoi(indexString) - //if err != nil { - // return fmt.Errorf("parsing index %q: %w", indexString, err) - //} verb := parts[1] - call := wasm.FromContext(ctx) switch verb { diff --git a/wasm/wasi/module.go b/wasm/wasi/module.go index 3abe622ae..450ee02d1 100644 --- a/wasm/wasi/module.go +++ b/wasm/wasi/module.go @@ -12,7 +12,6 @@ import ( "math/rand" "os" "sync" - "time" "github.com/dustin/go-humanize" "github.com/streamingfast/substreams/wasm" @@ -85,15 +84,15 @@ func (m *Module) ExecuteNewCall(ctx context.Context, call *wasm.Call, wasmInstan if err != nil { return nil, fmt.Errorf("marshalling args: %w", err) } - fmt.Println("args data length", len(argsData)) ctx = wasm.WithContext(sfwaz.WithInstanceContext(ctx, inst), call) + vfs := fs.NewVirtualFs(ctx) config := m.wazModuleConfig. WithRandSource(rand.New(rand.NewSource(42))). WithStdin(m.send). - WithStdout(m.receive). + WithStdout(os.Stdout). WithStderr(NewStdErrLogWriter(ctx)). - WithFS(fs.NewVirtualFs(ctx)). + WithFS(vfs). WithName(call.Entrypoint). WithArgs(call.Entrypoint, "-inputsize", fmt.Sprintf("%d", len(argsData))) @@ -102,7 +101,6 @@ func (m *Module) ExecuteNewCall(ctx context.Context, call *wasm.Call, wasmInstan return nil, fmt.Errorf("writing args: %w", err) } - start := time.Now() if _, err := m.wazRuntime.InstantiateModule(ctx, m.userModule, config); err != nil { // Note: Most compilers do not exit the module after running "_start", // unless there was an error. This allows you to call exported functions. @@ -112,9 +110,8 @@ func (m *Module) ExecuteNewCall(ctx context.Context, call *wasm.Call, wasmInstan log.Panicln(err) } } - fmt.Println("wazero duration", time.Since(start)) - data, err := io.ReadAll(m.receive) + data := vfs.Result() if err != nil { return nil, fmt.Errorf("reading output: %w", err) } diff --git a/wasm/wasi/substreams/io.go b/wasm/wasi/substreams/io.go index 55361e95e..1bc2d504a 100644 --- a/wasm/wasi/substreams/io.go +++ b/wasm/wasi/substreams/io.go @@ -2,7 +2,6 @@ package substreams import ( "io" - "log" "os" ) @@ -16,7 +15,6 @@ func readAll(r io.Reader, allocSize int) ([]byte, error) { for { count++ n, err := r.Read(b[len(b):cap(b)]) - log.Print("Read count: ", count) b = b[:len(b)+n] if err != nil { if err == io.EOF { @@ -33,7 +31,8 @@ func readAll(r io.Reader, allocSize int) ([]byte, error) { } func WriteOutput(data []byte) (int, error) { - return os.Stdout.Write(data) + err := os.WriteFile("/sys/substreams/output", data, 0644) + return len(data), err } func ReadInput(allocSize int) ([]byte, error) {