Skip to content

Commit

Permalink
Merge pull request #435 from streamingfast/feature/block_filtering_impl
Browse files Browse the repository at this point in the history
Feature/block filtering impl
  • Loading branch information
ArnaudBger authored Mar 14, 2024
2 parents db4e901 + 58d6fee commit 2a98793
Show file tree
Hide file tree
Showing 32 changed files with 1,421 additions and 620 deletions.
41 changes: 30 additions & 11 deletions manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (
"regexp"
"strings"

"gopkg.in/yaml.v3"

pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"gopkg.in/yaml.v3"
)

const UNSET = math.MaxUint64
Expand All @@ -24,8 +23,9 @@ func init() {
}

const (
ModuleKindStore = "store"
ModuleKindMap = "map"
ModuleKindStore = "store"
ModuleKindMap = "map"
ModuleKindBlockIndex = "blockIndex"
)

// Manifest is a YAML structure used to create a Package and its list
Expand All @@ -39,9 +39,10 @@ type Manifest struct {
Modules []*Module `yaml:"modules"`
Params map[string]string `yaml:"params"`

Network string `yaml:"network"`
Networks map[string]*NetworkParams `yaml:"networks"`
Sink *Sink `yaml:"sink"`
BlockFilters map[string]string `yaml:"blockFilters"`
Network string `yaml:"network"`
Networks map[string]*NetworkParams `yaml:"networks"`
Sink *Sink `yaml:"sink"`

Graph *ModuleGraph `yaml:"-"`
Workdir string `yaml:"-"`
Expand Down Expand Up @@ -82,10 +83,11 @@ type Protobuf struct {
}

type Module struct {
Name string `yaml:"name"`
Doc string `yaml:"doc"`
Kind string `yaml:"kind"`
InitialBlock *uint64 `yaml:"initialBlock"`
Name string `yaml:"name"`
Doc string `yaml:"doc"`
Kind string `yaml:"kind"`
InitialBlock *uint64 `yaml:"initialBlock"`
BlockFilter *BlockFilter `yaml:"blockFilter"`

UpdatePolicy string `yaml:"updatePolicy"`
ValueType string `yaml:"valueType"`
Expand All @@ -96,6 +98,11 @@ type Module struct {
Use string `yaml:"use"`
}

type BlockFilter struct {
Module string `yaml:"module"`
Query string `yaml:"query"`
}

type Input struct {
Source string `yaml:"source"`
Store string `yaml:"store"`
Expand Down Expand Up @@ -282,6 +289,9 @@ func (m *Module) ToProtoWASM(codeIndex uint32) (*pbsubstreams.Module, error) {

m.setOutputToProto(out)
m.setKindToProto(out)

m.setBlockFilterToProto(out)

err := m.setInputsToProto(out)
if err != nil {
return nil, fmt.Errorf("setting input for module, %s: %w", m.Name, err)
Expand All @@ -290,6 +300,15 @@ func (m *Module) ToProtoWASM(codeIndex uint32) (*pbsubstreams.Module, error) {
return out, nil
}

func (m *Module) setBlockFilterToProto(pbModule *pbsubstreams.Module) {
if m.BlockFilter != nil {
pbModule.BlockFilter = &pbsubstreams.Module_BlockFilter{
Module: m.BlockFilter.Module,
Query: m.BlockFilter.Query,
}
}
}

func (m *Module) setInputsToProto(pbModule *pbsubstreams.Module) error {
for i, input := range m.Inputs {
if input.Source != "" {
Expand Down
36 changes: 36 additions & 0 deletions manifest/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,42 @@ inputs:
Inputs: []*Input{{Source: "proto:sf.ethereum.type.v1.Block"}, {Store: "pairs"}, {Map: "map_clocks"}},
},
},
{
name: "basic index",
rawYamlInput: `---
name: basic_index
kind: blockIndex
output:
type: proto:sf.substreams.index.v1.Keys
`,
expectedOutput: Module{
Kind: ModuleKindBlockIndex,
Name: "basic_index",
Output: StreamOutput{Type: "proto:sf.substreams.index.v1.Keys"},
},
},
{
name: "basic with block filter",
rawYamlInput: `---
name: bf_module
kind: map
blockFilter:
module: basic_index
query: this is my query
output:
type: proto:sf.substreams.database.changes.v1
`,

expectedOutput: Module{
Kind: ModuleKindMap,
Name: "bf_module",
Output: StreamOutput{Type: "proto:sf.substreams.database.changes.v1"},
BlockFilter: &BlockFilter{
Module: "basic_index",
Query: "this is my query",
},
},
},
}

for _, tt := range tests {
Expand Down
34 changes: 29 additions & 5 deletions manifest/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,29 @@ func (r *manifestConverter) validateManifest(manif *Manifest) error {
if s.Use != "" {
return fmt.Errorf("stream %q: 'use' is not allowed for kind 'store'", s.Name)
}
case ModuleKindBlockIndex:
if s.Inputs == nil {
return fmt.Errorf("stream %q: block index module should have inputs", s.Name)
}

for _, input := range s.Inputs {
if input.IsParams() {
return fmt.Errorf("stream %q: block index module cannot have params input", s.Name)
}
}

if s.InitialBlock != nil {
return fmt.Errorf("stream %q: block index module cannot have initial block", s.Name)
}

if s.BlockFilter != nil {
return fmt.Errorf("stream %q: block index module cannot have block filter", s.Name)
}

if s.Output.Type != "proto:sf.substreams.index.v1.Keys" {
return fmt.Errorf("stream %q: block index module must have output type 'proto:sf.substreams.index.v1.Keys'", s.Name)
}

case "":
if s.Use == "" {
return fmt.Errorf("module kind not specified for %q", s.Name)
Expand Down Expand Up @@ -261,7 +284,6 @@ func (r *manifestConverter) convertToPkg(m *Manifest) (pkg *pbsubstreams.Package
}
}
}

doc = string(readmeContent)
}

Expand All @@ -271,11 +293,13 @@ func (r *manifestConverter) convertToPkg(m *Manifest) (pkg *pbsubstreams.Package
Name: m.Package.Name,
Doc: doc,
}

pkg = &pbsubstreams.Package{
Version: 1,
PackageMeta: []*pbsubstreams.PackageMetadata{pkgMeta},
Modules: &pbsubstreams.Modules{},
Network: m.Network,
Version: 1,
PackageMeta: []*pbsubstreams.PackageMetadata{pkgMeta},
Modules: &pbsubstreams.Modules{},
Network: m.Network,
BlockFilters: m.BlockFilters,
}

if m.Networks != nil {
Expand Down
Loading

0 comments on commit 2a98793

Please sign in to comment.