Skip to content

Commit

Permalink
wip with generated code wip
Browse files Browse the repository at this point in the history
  • Loading branch information
colindickson committed Dec 22, 2023
1 parent d4d8ff4 commit f4a661f
Show file tree
Hide file tree
Showing 12 changed files with 1,048 additions and 683 deletions.
26 changes: 21 additions & 5 deletions wasm/bench/cmd/wasigo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"time"

"github.com/streamingfast/substreams/wasm/bench/substreams_wasi_go/pb"

"go.uber.org/zap"

"github.com/streamingfast/dstore"
Expand All @@ -18,11 +21,12 @@ import (
func main() {
ctx := context.Background()
wasmRuntime := wasm.NewRegistryWithRuntime("wasi", nil, 0)
code, err := os.ReadFile("/Users/cbillett/devel/sf/substreams/wasm/bench/substreams_wasi_go/main.wasm")
blockReader, err := os.Open("/Users/cbillett/devel/sf/substreams/wasm/bench/cmd/barebones/testdata/block.binpb")
code, err := os.ReadFile("/Users/colindickson/code/dfuse/substreams/wasm/bench/substreams_wasi_go/main.wasm")
blockReader, err := os.Open("/Users/colindickson/code/dfuse/substreams/wasm/bench/cmd/barebones/testdata/block.binpb")
if err != nil {
panic(err)
}

defer blockReader.Close()

module, err := wasmRuntime.NewModule(ctx, code, "go/wasi")
Expand All @@ -36,18 +40,30 @@ func main() {
for i := 0; i < 1; i++ {
args := args(
wasm.NewParamsInput("{key.1: 'value.1'}"),
blockInputFile("/Users/cbillett/devel/sf/substreams/wasm/bench/cmd/barebones/testdata/block.binpb"),
blockInputFile("/Users/colindickson/code/dfuse/substreams/wasm/bench/cmd/barebones/testdata/block.binpb"),
wasm.NewStoreReaderInput("store.reader.1", createStore(ctx, "store.reader.1")),
wasm.NewStoreReaderInput("store.reader.2", createStore(ctx, "store.reader.2")),
)
execStart := time.Now()
call := wasm.NewCall(nil, "", "", args)
call := wasm.NewCall(nil, "mapBlock", "mapBlock", args)
_, err = module.ExecuteNewCall(ctx, call, instance, args)
if err != nil {
panic(fmt.Errorf("executing call: %w", err))
}
fmt.Println("exec duration", time.Since(execStart))
fmt.Println("call output", string(call.Output()))
//fmt.Println("call output", string(call.Output()))
data := call.Output()
output := &pb.MapBlockOutput{}
err = output.UnmarshalVT(data)
if err != nil {
panic(fmt.Errorf("unmarshalling output: %w", err))
}
jdata, err := json.Marshal(output)
if err != nil {
panic(err)
}
fmt.Println("output", string(jdata))

fmt.Println("-------------------------------- call logs --------------------------------")
for _, log := range call.Logs {
fmt.Print(log)
Expand Down
47 changes: 47 additions & 0 deletions wasm/bench/substreams_wasi_go/lib/generated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package lib

import (
"fmt"

"github.com/streamingfast/substreams/wasm/bench/substreams_wasi_go/pb"
"github.com/streamingfast/substreams/wasm/wasi/substream"
)

func init() {
substream.Register("mapBlock", ExecuteMapBlock)
}

type MapBlockInput struct {
block *pb.Block
readStore1 substream.StoreGet[string]
readStore2 substream.StoreGet[string]
}

func ExecuteMapBlock(input []byte) error {
res := &pb.MapBlockInput{}
err := res.UnmarshalVT(input)
if err != nil {
return fmt.Errorf("unmarshalling args: %w", err)
}
mapBlockInputs := &MapBlockInput{
block: res.Block,
readStore1: substream.NewStringStore(res.ReadStore),
readStore2: substream.NewStringStore(res.ReadStore2),
}

out, err := mapBlock(mapBlockInputs)
if err != nil {
return fmt.Errorf("mapping block: %w", err)
}
data, err := out.MarshalVT()
if err != nil {
return fmt.Errorf("marshalling output: %w", err)
}

_, err = substream.WriteOutput(data)
if err != nil {
return fmt.Errorf("writing output: %w", err)
}

return nil
}
53 changes: 53 additions & 0 deletions wasm/bench/substreams_wasi_go/lib/substreams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package lib

import (
"encoding/hex"
"strings"

"github.com/streamingfast/substreams/wasm/bench/substreams_wasi_go/pb"
)

func mapBlock(inputs *MapBlockInput) (*pb.MapBlockOutput, error) {
rocketAddress := strings.ToLower("ae78736Cd615f374D3085123A210448E74Fc6393")

approvalTopic := strings.ToLower("8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925")
transferTopic := strings.ToLower("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")

trxCount := 0
transferCount := 0
approvalCount := 0
for _, trace := range inputs.block.TransactionTraces {
trxCount++
if trace.Status != 1 {
continue
}
for _, call := range trace.Calls {
if call.StateReverted {
continue
}
for _, log := range call.Logs {
l := hex.EncodeToString(log.Address)
l = strings.ToLower(l)
if l != rocketAddress || len(log.Topics) == 0 {
continue
}
t := hex.EncodeToString(log.Topics[0])
t = strings.ToLower(t)
if t == approvalTopic {
approvalCount++
}
if t == transferTopic {
transferCount++
}
}
}
}

output := &pb.MapBlockOutput{
TrxCount: uint32(trxCount),
TransferCount: uint32(transferCount),
ApprovalCount: uint32(approvalCount),
}

return output, nil
}
119 changes: 2 additions & 117 deletions wasm/bench/substreams_wasi_go/main.go
Original file line number Diff line number Diff line change
@@ -1,125 +1,10 @@
package main

import (
"encoding/hex"
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"

"github.com/streamingfast/substreams/wasm/bench/substreams_wasi_go/pb"
_ "github.com/streamingfast/substreams/wasm/bench/substreams_wasi_go/lib"
"github.com/streamingfast/substreams/wasm/wasi/substream"
)

func main() {
start := time.Now()
log.Print("let's do it")
log.Print("start: ", start)

input, err := substream.ReadInput()
if err != nil {
panic(fmt.Errorf("reading input: %w", err))
}
log.Print("input length: ", len(input))
log.Print("read input duration: ", time.Since(start))

//data, err := readFile("/sys/stores/0/read/first/key_123")
//if err != nil {
// panic(fmt.Errorf("reading store: %w", err))
// //return fmt.Errorf("reading store: %w", err)
//}
//fmt.Println("read store:", string(data))

var entrypoint string
switch len(os.Args) {
case 1:
entrypoint = os.Args[0]
default:
panic(fmt.Errorf("invalid number of arguments: %d", len(os.Args)))
}
fmt.Println("entrypoint", entrypoint)

switch entrypoint {
case "mapBlock":

protoStart := time.Now()
log.Print("proto start: ", protoStart)
mapBlockInput := &pb.MapBlockInput{}
err = mapBlockInput.UnmarshalVT(input)
//err = proto.Unmarshal(input, mapBlockInput)
if err != nil {
panic(fmt.Errorf("unmarshalling args: %w", err))
}
log.Print("proto duration: ", time.Since(protoStart))

log.Print("parameters: ", mapBlockInput.Params)
log.Print("read store: ", mapBlockInput.ReadStore)
log.Print("read store2: ", mapBlockInput.ReadStore2)
log.Print("block: ", mapBlockInput.Block.Number)
err = mapBlock(mapBlockInput.Block)
if err != nil {
panic(fmt.Errorf("mapping block: %w", err))
}
}

log.Print("total duration: ", time.Since(start))
}

type blockStat struct {
TrxCount int
TransferCount int
ApprovalCount int
}

func mapBlock(block *pb.Block) error {
rocketAddress := strings.ToLower("ae78736Cd615f374D3085123A210448E74Fc6393")

approvalTopic := strings.ToLower("8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925")
transferTopic := strings.ToLower("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")

trxCount := 0
transferCount := 0
approvalCount := 0
for _, trace := range block.TransactionTraces {
trxCount++
if trace.Status != 1 {
continue
}
for _, call := range trace.Calls {
if call.StateReverted {
continue
}
for _, log := range call.Logs {
l := hex.EncodeToString(log.Address)
l = strings.ToLower(l)
if l != rocketAddress || len(log.Topics) == 0 {
continue
}
t := hex.EncodeToString(log.Topics[0])
t = strings.ToLower(t)
if t == approvalTopic {
approvalCount++
}
if t == transferTopic {
transferCount++
}
}
}
}
stats := blockStat{
TrxCount: trxCount,
TransferCount: transferCount,
ApprovalCount: approvalCount,
}
data, err := json.Marshal(stats)
if err != nil {
return fmt.Errorf("marshalling stats: %w", err)
}
_, err = substream.WriteOutput(data)
if err != nil {
return fmt.Errorf("writing output: %w", err)
}
return nil
substream.Main()
}
Binary file modified wasm/bench/substreams_wasi_go/main.wasm
Binary file not shown.
1 change: 1 addition & 0 deletions wasm/bench/substreams_wasi_go/pb/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package pb
Loading

0 comments on commit f4a661f

Please sign in to comment.