Skip to content

Commit

Permalink
wasi: write output to fs. not sure about implementation. to review later
Browse files Browse the repository at this point in the history
  • Loading branch information
colindickson committed Jan 9, 2024
1 parent d07c2b0 commit 45eeafb
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 33 deletions.
5 changes: 2 additions & 3 deletions wasm/bench/cmd/wasigo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,13 @@ func main() {
wasm.NewStoreReaderInput("store.reader.2", createStore(ctx, "store.reader.2")),
wasm.NewStoreWriterOutput("out", createStore(ctx, "out"), 1, "string"),
)
execStart := time.Now()

call := wasm.NewCall(nil, "mapBlock", "mapBlock", args)
_, err = module.ExecuteNewCall(ctx, call, instance, args)
if err != nil {
panic(fmt.Errorf("executing call: %w", err))
}
fmt.Println("exec duration", time.Since(execStart))
//fmt.Println("call output", string(call.Output()))

data := call.Output()
output := &pb.MapBlockOutput{}
err = output.UnmarshalVT(data)
Expand Down
6 changes: 3 additions & 3 deletions wasm/bench/substreams_wasi_go/lib/substreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func mapBlock(inputs *MapBlockInput) (*pb.MapBlockOutput, error) {
log.Print(rand.Int())
log.Printf("totally random number: %d", rand.Int())

rocketAddress := strings.ToLower("ae78736Cd615f374D3085123A210448E74Fc6393")

Expand All @@ -29,7 +29,7 @@ func mapBlock(inputs *MapBlockInput) (*pb.MapBlockOutput, error) {
log.Print("got value_123")

log.Printf("%s", time.Now())
time.Sleep(5 * time.Second)
time.Sleep(60 * time.Second)
log.Printf("%s", time.Now())

trxCount := 0
Expand Down Expand Up @@ -66,7 +66,7 @@ func mapBlock(inputs *MapBlockInput) (*pb.MapBlockOutput, error) {
if err != nil {
return nil, err
}
log.Print("set popo to lol")
log.Print("set key `popo` to value `lol`")

output := &pb.MapBlockOutput{
TrxCount: uint32(trxCount),
Expand Down
Binary file modified wasm/bench/substreams_wasi_go/main.wasm
Binary file not shown.
3 changes: 1 addition & 2 deletions wasm/wasi/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ func marshallArgs(args []wasm.Argument) ([]byte, error) {
}
data, err := protoscope.NewScanner(scopeData).Exec()
if err != nil {
fmt.Println("scopeData", scopeData)
return nil, fmt.Errorf("scanning args: %w", err)
return nil, fmt.Errorf("scanning args: %w (scopeData: %s)", err, scopeData)
}
return data, nil

Expand Down
40 changes: 25 additions & 15 deletions wasm/wasi/fs/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,21 @@ import (
"math/big"
"strconv"
"strings"
"sync"
"time"

"github.com/streamingfast/substreams/wasm"
)

type Virtual struct {
ctx context.Context

resultOnce sync.Once
result []byte
}

func (v *Virtual) Result() []byte {
return v.result
}

func NewVirtualFs(ctx context.Context) *Virtual {
Expand All @@ -24,7 +32,11 @@ func NewVirtualFs(ctx context.Context) *Virtual {
}

func (v *Virtual) Open(name string) (fs.File, error) {
return NewVirtualFile(v.ctx, name)
return newVirtualFile(v.ctx, name, func(b []byte) {
v.resultOnce.Do(func() {
v.result = b
})
})
}

type VirtualFile struct {
Expand All @@ -33,17 +45,16 @@ type VirtualFile struct {
nameParts []string
Remaining []byte
Loaded bool

outputSetter func([]byte)
}

func NewVirtualFile(ctx context.Context, name string) (*VirtualFile, error) {
//if !strings.HasPrefix(name, "/sys/") {
// fmt.Printf("invalid file name %q should start with /sys/\n", name)
// return NewVirtualFile(ctx, name)
//}
func newVirtualFile(ctx context.Context, name string, outputSetter func([]byte)) (*VirtualFile, error) {
return &VirtualFile{
ctx: ctx,
name: name,
nameParts: strings.Split(name, "/"),
ctx: ctx,
name: name,
nameParts: strings.Split(name, "/"),
outputSetter: outputSetter,
}, nil
}

Expand All @@ -53,6 +64,11 @@ func (v *VirtualFile) Stat() (fs.FileInfo, error) {
}

func (v *VirtualFile) Write(bytes []byte) (sent int, err error) {
if strings.HasSuffix(v.name, "sys/substreams/output") { //special function
v.outputSetter(bytes)
return len(bytes), nil
}

err = dataToFile(v.ctx, v.nameParts[1:], bytes) //skip /sys
if err != nil {
return 0, fmt.Errorf("writing data for file %q: %w", v.name, err)
Expand Down Expand Up @@ -147,13 +163,7 @@ func dataToFile(ctx context.Context, parts []string, data []byte) error {
}

func stateDataToFile(ctx context.Context, parts []string, data []byte) error {
//indexString := parts[0]
//index, err := strconv.Atoi(indexString)
//if err != nil {
// return fmt.Errorf("parsing index %q: %w", indexString, err)
//}
verb := parts[1]

call := wasm.FromContext(ctx)

switch verb {
Expand Down
11 changes: 4 additions & 7 deletions wasm/wasi/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"math/rand"
"os"
"sync"
"time"

"github.com/dustin/go-humanize"
"github.com/streamingfast/substreams/wasm"
Expand Down Expand Up @@ -85,15 +84,15 @@ func (m *Module) ExecuteNewCall(ctx context.Context, call *wasm.Call, wasmInstan
if err != nil {
return nil, fmt.Errorf("marshalling args: %w", err)
}
fmt.Println("args data length", len(argsData))

ctx = wasm.WithContext(sfwaz.WithInstanceContext(ctx, inst), call)
vfs := fs.NewVirtualFs(ctx)
config := m.wazModuleConfig.
WithRandSource(rand.New(rand.NewSource(42))).
WithStdin(m.send).
WithStdout(m.receive).
WithStdout(os.Stdout).
WithStderr(NewStdErrLogWriter(ctx)).
WithFS(fs.NewVirtualFs(ctx)).
WithFS(vfs).
WithName(call.Entrypoint).
WithArgs(call.Entrypoint, "-inputsize", fmt.Sprintf("%d", len(argsData)))

Expand All @@ -102,7 +101,6 @@ func (m *Module) ExecuteNewCall(ctx context.Context, call *wasm.Call, wasmInstan
return nil, fmt.Errorf("writing args: %w", err)
}

start := time.Now()
if _, err := m.wazRuntime.InstantiateModule(ctx, m.userModule, config); err != nil {
// Note: Most compilers do not exit the module after running "_start",
// unless there was an error. This allows you to call exported functions.
Expand All @@ -112,9 +110,8 @@ func (m *Module) ExecuteNewCall(ctx context.Context, call *wasm.Call, wasmInstan
log.Panicln(err)
}
}
fmt.Println("wazero duration", time.Since(start))

data, err := io.ReadAll(m.receive)
data := vfs.Result()
if err != nil {
return nil, fmt.Errorf("reading output: %w", err)
}
Expand Down
5 changes: 2 additions & 3 deletions wasm/wasi/substreams/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package substreams

import (
"io"
"log"
"os"
)

Expand All @@ -16,7 +15,6 @@ func readAll(r io.Reader, allocSize int) ([]byte, error) {
for {
count++
n, err := r.Read(b[len(b):cap(b)])
log.Print("Read count: ", count)
b = b[:len(b)+n]
if err != nil {
if err == io.EOF {
Expand All @@ -33,7 +31,8 @@ func readAll(r io.Reader, allocSize int) ([]byte, error) {
}

func WriteOutput(data []byte) (int, error) {
return os.Stdout.Write(data)
err := os.WriteFile("/sys/substreams/output", data, 0644)
return len(data), err
}

func ReadInput(allocSize int) ([]byte, error) {
Expand Down

0 comments on commit 45eeafb

Please sign in to comment.