Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnaudBger committed Mar 20, 2024
1 parent 9049f11 commit c6bfebd
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 48 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
require (
connectrpc.com/connect v1.15.0
connectrpc.com/grpcreflect v1.2.0
github.com/RoaringBitmap/roaring v0.9.4
github.com/alecthomas/chroma v0.10.0
github.com/bytecodealliance/wasmtime-go/v4 v4.0.0
github.com/charmbracelet/bubbles v0.15.0
Expand Down Expand Up @@ -73,11 +74,13 @@ require (
require (
connectrpc.com/grpchealth v1.3.0 // indirect
connectrpc.com/otelconnect v0.7.0 // indirect
github.com/bits-and-blooms/bitset v1.2.0 // indirect
github.com/bobg/go-generics/v2 v2.1.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/mattn/go-sqlite3 v1.14.16 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
golang.org/x/time v0.5.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp
github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator v0.0.0-20221018185641-36f91511cfd7 h1:4cXY9jZO7UoRYKyD+CssnBlwn2HTeUzCQ1b44PJijzc=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator v0.0.0-20221018185641-36f91511cfd7/go.mod h1:FwtSi1M0P8cuMlHxVso1vcivukprUr1bBwf15CRypOI=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/abourget/llerrgroup v0.2.0 h1:2nPXy6Owo/KOKDQYvjMmS8rsjtitvuP2OEGrqgpj428=
Expand All @@ -132,6 +134,8 @@ github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA=
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc=
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d h1:fSlGu5ePbkjBidXuj2O5j9EcYrVB5Cr6/wdkYyDgxZk=
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d/go.mod h1:yCBkgASmKHgUOFjK9h1sOytUVgA+JkQjqj3xYP4AdWY=
Expand Down Expand Up @@ -459,6 +463,8 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/muesli/ansi v0.0.0-20211018074035-2e021307bc4b/go.mod h1:fQuZ0gauxyBcmsdE3ZT4NasjaRdxmbCS0jRHsrWu3Ho=
github.com/muesli/ansi v0.0.0-20211031195517-c9f0611b6c70 h1:kMlmsLSbjkikxQJ1IPwaM+7LJ9ltFu/fi8CRzvSnQmA=
github.com/muesli/ansi v0.0.0-20211031195517-c9f0611b6c70/go.mod h1:fQuZ0gauxyBcmsdE3ZT4NasjaRdxmbCS0jRHsrWu3Ho=
Expand Down
6 changes: 6 additions & 0 deletions manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,12 @@ const (

func (m *Module) setKindToProto(pbModule *pbsubstreams.Module) {
switch m.Kind {
case ModuleKindBlockIndex:
pbModule.Kind = &pbsubstreams.Module_KindBlockIndex_{
KindBlockIndex: &pbsubstreams.Module_KindBlockIndex{
OutputType: m.Output.Type,
},
}
case ModuleKindMap:
pbModule.Kind = &pbsubstreams.Module_KindMap_{
KindMap: &pbsubstreams.Module_KindMap{
Expand Down
8 changes: 4 additions & 4 deletions manifest/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package manifest

import (
"fmt"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"os"
"path"
"path/filepath"

"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
)

type manifestConverter struct {
Expand Down Expand Up @@ -64,7 +65,6 @@ func (r *manifestConverter) validateManifest(manif *Manifest) error {

for _, s := range manif.Modules {
// TODO: let's make sure this is also checked when received in Protobuf in a remote request.

switch s.Kind {
case ModuleKindMap:
if s.Output.Type == "" {
Expand Down
2 changes: 2 additions & 0 deletions manifest/signature.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (m *ModuleHashes) HashModule(modules *pbsubstreams.Modules, module *pbsubst
buf.WriteString("map")
case *pbsubstreams.Module_KindStore_:
buf.WriteString("store")
case *pbsubstreams.Module_KindBlockIndex_:
buf.WriteString("block_index")
default:
return nil, fmt.Errorf("invalid module file %T", module.Kind)
}
Expand Down
82 changes: 53 additions & 29 deletions pb/sf/substreams/intern/v2/deltas.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pb/sf/substreams/v1/modules.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pbsubstreams

import "strings"
import (
"strings"
)

type ModuleKind int

Expand Down
81 changes: 81 additions & 0 deletions pipeline/exec/indexexec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package exec

import (
"context"
"fmt"

"github.com/RoaringBitmap/roaring/roaring64"
pbindex "github.com/streamingfast/substreams/pb/sf/substreams/index/v1"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
"github.com/streamingfast/substreams/storage/execout"
"github.com/streamingfast/substreams/wasm"
"google.golang.org/protobuf/proto"
)

type IndexModuleExecutor struct {
BaseExecutor
indexMapping map[string]*roaring64.Bitmap
}

func NewIndexModuleExecutor(baseExecutor *BaseExecutor) *IndexModuleExecutor {
return &IndexModuleExecutor{BaseExecutor: *baseExecutor}
}

func (i *IndexModuleExecutor) Name() string { return i.moduleName }
func (i *IndexModuleExecutor) String() string { return i.Name() }

func (i *IndexModuleExecutor) applyCachedOutput([]byte) error {
return nil
}

func (i *IndexModuleExecutor) run(ctx context.Context, reader execout.ExecutionOutputGetter) (out []byte, moduleOutputData *pbssinternal.ModuleOutput, err error) {
//TODO: HANDLE exec_index
//ctx, span := reqctx.WithModuleExecutionSpan(ctx, "exec_index")
//defer span.EndWithErr(&err)

var call *wasm.Call
if call, err = i.wasmCall(reader); err != nil {
return nil, nil, fmt.Errorf("maps wasm call: %w", err)
}

if call != nil {
out = call.Output()
}

modOut, err := i.toModuleOutput(out)
if err != nil {
return nil, nil, fmt.Errorf("converting back to module output: %w", err)
}

blockNumber := reader.Clock().Number

for _, key := range modOut.Data.(*pbssinternal.ModuleOutput_IndexKeys).IndexKeys.Keys {
if i.indexMapping == nil {
i.indexMapping = make(map[string]*roaring64.Bitmap)
}
if _, ok := i.indexMapping[key]; !ok {
i.indexMapping[key] = roaring64.New()
}
i.indexMapping[key].Add(blockNumber)
}

return out, modOut, nil
}

func (i *IndexModuleExecutor) toModuleOutput(data []byte) (*pbssinternal.ModuleOutput, error) {
var indexKeys pbindex.Keys
err := proto.Unmarshal(data, &indexKeys)
if err != nil {
return nil, fmt.Errorf("unmarshalling index keys: %w", err)
}

return &pbssinternal.ModuleOutput{
Data: &pbssinternal.ModuleOutput_IndexKeys{
IndexKeys: &indexKeys,
},
}, nil
}

func (i *IndexModuleExecutor) HasValidOutput() bool {
return true
}
6 changes: 2 additions & 4 deletions pipeline/exec/mapexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import (
"context"
"fmt"

"google.golang.org/protobuf/types/known/anypb"

pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
"github.com/streamingfast/substreams/storage/execout"

"github.com/streamingfast/substreams/reqctx"
"github.com/streamingfast/substreams/storage/execout"
"github.com/streamingfast/substreams/wasm"
"google.golang.org/protobuf/types/known/anypb"
)

type MapperModuleExecutor struct {
Expand Down
6 changes: 3 additions & 3 deletions pipeline/outputmodules/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func validateRequest(binaries []*pbsubstreams.Binary, modules *pbsubstreams.Modu

func checkNotImplemented(mods []*pbsubstreams.Module) error {
for _, mod := range mods {
if mod.ModuleKind() == pbsubstreams.ModuleKindBlockIndex {
return fmt.Errorf("block index module is not implemented")
}
//if mod.ModuleKind() == pbsubstreams.ModuleKindBlockIndex {
// return fmt.Errorf("block index module is not implemented")
//}
if mod.GetBlockFilter() != nil {
return fmt.Errorf("block filter module is not implemented")
}
Expand Down
Loading

0 comments on commit c6bfebd

Please sign in to comment.