Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/block filtering impl #435

Merged
merged 7 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (
"regexp"
"strings"

"gopkg.in/yaml.v3"

pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"gopkg.in/yaml.v3"
)

const UNSET = math.MaxUint64
Expand All @@ -24,8 +23,9 @@ func init() {
}

const (
ModuleKindStore = "store"
ModuleKindMap = "map"
ModuleKindStore = "store"
ModuleKindMap = "map"
ModuleKindBlockIndex = "blockIndex"
)

// Manifest is a YAML structure used to create a Package and its list
Expand All @@ -39,9 +39,10 @@ type Manifest struct {
Modules []*Module `yaml:"modules"`
Params map[string]string `yaml:"params"`

Network string `yaml:"network"`
Networks map[string]*NetworkParams `yaml:"networks"`
Sink *Sink `yaml:"sink"`
BlockFilters map[string]string `yaml:"blockFilters"`
Network string `yaml:"network"`
Networks map[string]*NetworkParams `yaml:"networks"`
Sink *Sink `yaml:"sink"`

Graph *ModuleGraph `yaml:"-"`
Workdir string `yaml:"-"`
Expand Down Expand Up @@ -82,10 +83,11 @@ type Protobuf struct {
}

type Module struct {
Name string `yaml:"name"`
Doc string `yaml:"doc"`
Kind string `yaml:"kind"`
InitialBlock *uint64 `yaml:"initialBlock"`
Name string `yaml:"name"`
Doc string `yaml:"doc"`
Kind string `yaml:"kind"`
InitialBlock *uint64 `yaml:"initialBlock"`
BlockFilter *BlockFilter `yaml:"blockFilter"`

UpdatePolicy string `yaml:"updatePolicy"`
ValueType string `yaml:"valueType"`
Expand All @@ -96,6 +98,11 @@ type Module struct {
Use string `yaml:"use"`
}

type BlockFilter struct {
Module string `yaml:"module"`
Query string `yaml:"query"`
}

type Input struct {
Source string `yaml:"source"`
Store string `yaml:"store"`
Expand Down Expand Up @@ -282,6 +289,9 @@ func (m *Module) ToProtoWASM(codeIndex uint32) (*pbsubstreams.Module, error) {

m.setOutputToProto(out)
m.setKindToProto(out)

m.setBlockFilterToProto(out)

err := m.setInputsToProto(out)
if err != nil {
return nil, fmt.Errorf("setting input for module, %s: %w", m.Name, err)
Expand All @@ -290,6 +300,15 @@ func (m *Module) ToProtoWASM(codeIndex uint32) (*pbsubstreams.Module, error) {
return out, nil
}

func (m *Module) setBlockFilterToProto(pbModule *pbsubstreams.Module) {
if m.BlockFilter != nil {
pbModule.BlockFilter = &pbsubstreams.Module_BlockFilter{
Module: m.BlockFilter.Module,
Query: m.BlockFilter.Query,
}
}
}

func (m *Module) setInputsToProto(pbModule *pbsubstreams.Module) error {
for i, input := range m.Inputs {
if input.Source != "" {
Expand Down
36 changes: 36 additions & 0 deletions manifest/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,42 @@ inputs:
Inputs: []*Input{{Source: "proto:sf.ethereum.type.v1.Block"}, {Store: "pairs"}, {Map: "map_clocks"}},
},
},
{
name: "basic index",
rawYamlInput: `---
name: basic_index
kind: blockIndex
output:
type: proto:sf.substreams.index.v1.Keys
`,
expectedOutput: Module{
Kind: ModuleKindBlockIndex,
Name: "basic_index",
Output: StreamOutput{Type: "proto:sf.substreams.index.v1.Keys"},
},
},
{
name: "basic with block filter",
rawYamlInput: `---
name: bf_module
kind: map
blockFilter:
module: basic_index
query: this is my query
output:
type: proto:sf.substreams.database.changes.v1
`,

expectedOutput: Module{
Kind: ModuleKindMap,
Name: "bf_module",
Output: StreamOutput{Type: "proto:sf.substreams.database.changes.v1"},
BlockFilter: &BlockFilter{
Module: "basic_index",
Query: "this is my query",
},
},
},
}

for _, tt := range tests {
Expand Down
34 changes: 29 additions & 5 deletions manifest/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,29 @@ func (r *manifestConverter) validateManifest(manif *Manifest) error {
if s.Use != "" {
return fmt.Errorf("stream %q: 'use' is not allowed for kind 'store'", s.Name)
}
case ModuleKindBlockIndex:
if s.Inputs == nil {
return fmt.Errorf("stream %q: block index module should have inputs", s.Name)
}

for _, input := range s.Inputs {
if input.IsParams() {
return fmt.Errorf("stream %q: block index module cannot have params input", s.Name)
}
}

if s.InitialBlock != nil {
return fmt.Errorf("stream %q: block index module cannot have initial block", s.Name)
}

if s.BlockFilter != nil {
return fmt.Errorf("stream %q: block index module cannot have block filter", s.Name)
}

if s.Output.Type != "proto:sf.substreams.index.v1.Keys" {
return fmt.Errorf("stream %q: block index module must have output type 'proto:sf.substreams.index.v1.Keys'", s.Name)
}

case "":
if s.Use == "" {
return fmt.Errorf("module kind not specified for %q", s.Name)
Expand Down Expand Up @@ -261,7 +284,6 @@ func (r *manifestConverter) convertToPkg(m *Manifest) (pkg *pbsubstreams.Package
}
}
}

doc = string(readmeContent)
}

Expand All @@ -271,11 +293,13 @@ func (r *manifestConverter) convertToPkg(m *Manifest) (pkg *pbsubstreams.Package
Name: m.Package.Name,
Doc: doc,
}

pkg = &pbsubstreams.Package{
Version: 1,
PackageMeta: []*pbsubstreams.PackageMetadata{pkgMeta},
Modules: &pbsubstreams.Modules{},
Network: m.Network,
Version: 1,
PackageMeta: []*pbsubstreams.PackageMetadata{pkgMeta},
Modules: &pbsubstreams.Modules{},
Network: m.Network,
BlockFilters: m.BlockFilters,
}

if m.Networks != nil {
Expand Down
Loading
Loading