Skip to content

Commit

Permalink
Adding test for handle use modules
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnaudBger committed Mar 12, 2024
1 parent 03cbddb commit 32fb45f
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 54 deletions.
115 changes: 61 additions & 54 deletions manifest/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,69 +105,76 @@ func handleUseModules(pkg *pbsubstreams.Package, manif *Manifest) error {
packageModulesMapping[module.Name] = module
}

for _, mod := range manif.Modules {
if mod.Use != "" {
usedModule, found := packageModulesMapping[mod.Use]
for _, manifestModule := range manif.Modules {
if manifestModule.Use != "" {
usedModule, found := packageModulesMapping[manifestModule.Use]
if !found {
return fmt.Errorf("module %q: use module %q not found", mod.Name, mod.Use)
return fmt.Errorf("module %q: use module %q not found", manifestModule.Name, manifestModule.Use)
}
pkgPbModule := packageModulesMapping[mod.Name]
pkgPbModule.BinaryIndex = usedModule.BinaryIndex
pkgPbModule.BinaryEntrypoint = usedModule.BinaryEntrypoint

for index, input := range pkgPbModule.Inputs {
usedModuleInput := usedModule.Inputs[index]
if input.GetParams() != nil {
if usedModuleInput.GetParams() == nil {
return fmt.Errorf("module %q: input %q is not a params type", mod.Name, input.String())
}
if input.GetParams().Value != usedModuleInput.GetParams().Value {
return fmt.Errorf("module %q: input %q has different value than the used module %q: input %q", mod.Name, input.String(), mod.Use, usedModuleInput.String())
}
}
if input.GetSource() != nil {
if usedModuleInput.GetSource() == nil {
return fmt.Errorf("module %q: input %q is not a source type", mod.Name, input.String())
}
if input.GetSource().Type != usedModuleInput.GetSource().Type {
return fmt.Errorf("module %q: input %q has different source than the used module %q: input %q", mod.Name, input.String(), mod.Use, usedModuleInput.String())
}
}
if input.GetStore() != nil {
if usedModuleInput.GetStore() == nil {
return fmt.Errorf("module %q: input %q is not a store type", mod.Name, input.String())
}
if input.GetStore().GetMode() != usedModuleInput.GetStore().GetMode() {
return fmt.Errorf("module %q: input %q has different mode than the used module %q: input %q", mod.Name, input.String(), mod.Use, usedModuleInput.String())
}
moduleWithUse := packageModulesMapping[manifestModule.Name]

inputStoreModuleName := input.GetStore().ModuleName
usedModuleStoreMapModuleName := usedModuleInput.GetStore().ModuleName
if packageModulesMapping[inputStoreModuleName].Output.Type != packageModulesMapping[usedModuleStoreMapModuleName].Output.Type {
return fmt.Errorf("module %q: input %q has different output than the used module %q: input %q", mod.Name, input.String(), mod.Use, usedModuleInput.String())
}
if err := checkEqualInputs(moduleWithUse, usedModule, manifestModule, packageModulesMapping); err != nil {
return fmt.Errorf("checking inputs for module %q: %w", manifestModule.Name, err)
}

}
if input.GetMap() != nil {
if usedModuleInput.GetMap() == nil {
return fmt.Errorf("module %q: input %q is not a map type", mod.Name, input.String())
}
moduleWithUse.BinaryIndex = usedModule.BinaryIndex
moduleWithUse.BinaryEntrypoint = usedModule.BinaryEntrypoint

inputMapModuleName := input.GetMap().ModuleName
usedModuleInputMapModuleName := usedModuleInput.GetMap().ModuleName
if packageModulesMapping[inputMapModuleName].Output.Type != packageModulesMapping[usedModuleInputMapModuleName].Output.Type {
fmt.Printf("Module Output One %q, two %q", packageModulesMapping[inputMapModuleName].Output, packageModulesMapping[usedModuleInputMapModuleName].Output)
return fmt.Errorf("module %q: input %q has different output than the used module %q: input %q", mod.Name, input.String(), mod.Use, usedModuleInput.String())
}
}
if moduleWithUse.InitialBlock == 0 {
moduleWithUse.InitialBlock = usedModule.InitialBlock
}

moduleWithUse.Output = usedModule.Output
moduleWithUse.Kind = usedModule.Kind
}
}
return nil
}

func checkEqualInputs(moduleWithUse, usedModule *pbsubstreams.Module, manifestModuleWithUse *Module, packageModulesMapping map[string]*pbsubstreams.Module) error {
for index, input := range moduleWithUse.Inputs {
usedModuleInput := usedModule.Inputs[index]
if input.GetParams() != nil {
if usedModuleInput.GetParams() == nil {
return fmt.Errorf("module %q: input %q is not a params type", manifestModuleWithUse.Name, input.String())
}
if input.GetParams().Value != usedModuleInput.GetParams().Value {
return fmt.Errorf("module %q: input %q has different value than the used module %q: input %q", manifestModuleWithUse.Name, input.String(), manifestModuleWithUse.Use, usedModuleInput.String())
}
}
if input.GetSource() != nil {
if usedModuleInput.GetSource() == nil {
return fmt.Errorf("module %q: input %q is not a source type", manifestModuleWithUse.Name, input.String())
}
if input.GetSource().Type != usedModuleInput.GetSource().Type {
return fmt.Errorf("module %q: input %q has different source than the used module %q: input %q", manifestModuleWithUse.Name, input.String(), manifestModuleWithUse.Use, usedModuleInput.String())
}
}
if input.GetStore() != nil {
if usedModuleInput.GetStore() == nil {
return fmt.Errorf("module %q: input %q is not a store type", manifestModuleWithUse.Name, input.String())
}
if input.GetStore().GetMode() != usedModuleInput.GetStore().GetMode() {
return fmt.Errorf("module %q: input %q has different mode than the used module %q: input %q", manifestModuleWithUse.Name, input.String(), manifestModuleWithUse.Use, usedModuleInput.String())
}

if pkgPbModule.InitialBlock == 0 {
pkgPbModule.InitialBlock = usedModule.InitialBlock
inputStoreModuleName := input.GetStore().ModuleName
usedModuleStoreMapModuleName := usedModuleInput.GetStore().ModuleName
if packageModulesMapping[inputStoreModuleName].Output.Type != packageModulesMapping[usedModuleStoreMapModuleName].Output.Type {
return fmt.Errorf("module %q: input %q has different output than the used module %q: input %q", manifestModuleWithUse.Name, input.String(), manifestModuleWithUse.Use, usedModuleInput.String())
}

pkgPbModule.Output = usedModule.Output
pkgPbModule.Kind = usedModule.Kind
}
if input.GetMap() != nil {
if usedModuleInput.GetMap() == nil {
return fmt.Errorf("module %q: input %q is not a map type", manifestModuleWithUse.Name, input.String())
}

inputMapModuleName := input.GetMap().ModuleName
usedModuleInputMapModuleName := usedModuleInput.GetMap().ModuleName
if packageModulesMapping[inputMapModuleName].Output.Type != packageModulesMapping[usedModuleInputMapModuleName].Output.Type {
return fmt.Errorf("module %q: input %q has different output than the used module %q: input %q", manifestModuleWithUse.Name, input.String(), manifestModuleWithUse.Use, usedModuleInput.String())
}
}
}
return nil
Expand Down
172 changes: 172 additions & 0 deletions manifest/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package manifest

import (
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"testing"

"github.com/stretchr/testify/require"
)

func TestHandleUseModules(t *testing.T) {
cases := []struct {
name string
pkg *pbsubstreams.Package
manifest *Manifest
expectedOutputModules []*pbsubstreams.Module
expectedError string
}{
{
name: "sunny path",
pkg: &pbsubstreams.Package{
ProtoFiles: nil,
Version: 0,
Modules: &pbsubstreams.Modules{Modules: []*pbsubstreams.Module{
{
Name: "use_module",
Kind: nil,
BinaryIndex: 0,
BinaryEntrypoint: "use_module",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "B"}}}},
InitialBlock: 5,
},
{
Name: "B",
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}},
BinaryIndex: 0,
BinaryEntrypoint: "B",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "clock"}}}},
Output: &pbsubstreams.Module_Output{Type: "proto:sf.database.v1.changes"},
},
{
Name: "dbout_to_graphout",
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.entity.v1.changes"}},
BinaryIndex: 1,
BinaryEntrypoint: "dbout_to_graphout",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "example_dbout"}}}},
Output: &pbsubstreams.Module_Output{Type: "proto:sf.entity.v1.changes"},
},
{
Name: "example_dbout",
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}},
BinaryIndex: 1,
BinaryEntrypoint: "example_dbout",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "block"}}}},
Output: &pbsubstreams.Module_Output{Type: "proto:sf.database.v1.changes"},
},
},
},
},
manifest: &Manifest{
Modules: []*Module{
{Name: "use_module", Kind: "map", Inputs: []*Input{{Source: "proto:sf.database.v1.changes"}}, Use: "dbout_to_graphout"},
{Name: "B", Kind: "map", Inputs: []*Input{{Source: "clock"}}, Output: StreamOutput{Type: "proto:sf.database.v1.changes"}},
{Name: "dbout_to_graphout", Kind: "map", Inputs: []*Input{{Source: "proto:sf.database.v1.changes"}}},
{Name: "example_dbout", Kind: "map", Inputs: []*Input{{Source: "block"}}, Output: StreamOutput{Type: "proto:sf.database.v1.changes"}},
},
},
expectedOutputModules: []*pbsubstreams.Module{
{
Name: "use_module",
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.entity.v1.changes"}},
BinaryIndex: 1,
BinaryEntrypoint: "dbout_to_graphout",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "B"}}}},
InitialBlock: 5,
Output: &pbsubstreams.Module_Output{Type: "proto:sf.entity.v1.changes"},
},
{
Name: "B",
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}},
BinaryIndex: 0,
BinaryEntrypoint: "B",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "clock"}}}},
Output: &pbsubstreams.Module_Output{Type: "proto:sf.database.v1.changes"},
},
{
Name: "dbout_to_graphout",
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.entity.v1.changes"}},
BinaryIndex: 1,
BinaryEntrypoint: "dbout_to_graphout",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "example_dbout"}}}},
Output: &pbsubstreams.Module_Output{Type: "proto:sf.entity.v1.changes"},
},
{
Name: "example_dbout",
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}},
BinaryIndex: 1,
BinaryEntrypoint: "example_dbout",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "block"}}}},
Output: &pbsubstreams.Module_Output{Type: "proto:sf.database.v1.changes"},
},
},
},

{
name: "input's output type not matching",
pkg: &pbsubstreams.Package{
ProtoFiles: nil,
Version: 0,
Modules: &pbsubstreams.Modules{Modules: []*pbsubstreams.Module{
{
Name: "use_module",
Kind: nil,
BinaryIndex: 0,
BinaryEntrypoint: "use_module",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "B"}}}},
InitialBlock: 5,
},
{
Name: "B",
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}},
BinaryIndex: 0,
BinaryEntrypoint: "B",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "clock"}}}},
Output: &pbsubstreams.Module_Output{Type: "proto:sf.kv.v1.changes"},
},
{
Name: "dbout_to_graphout",
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.entity.v1.changes"}},
BinaryIndex: 1,
BinaryEntrypoint: "dbout_to_graphout",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "example_dbout"}}}},
Output: &pbsubstreams.Module_Output{Type: "proto:sf.entity.v1.changes"},
},
{
Name: "example_dbout",
Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}},
BinaryIndex: 1,
BinaryEntrypoint: "example_dbout",
Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "block"}}}},
Output: &pbsubstreams.Module_Output{Type: "proto:sf.database.v1.changes"},
},
},
},
},
manifest: &Manifest{
Modules: []*Module{
{Name: "use_module", Kind: "map", Inputs: []*Input{{Source: "proto:sf.database.v1.changes"}}, Use: "dbout_to_graphout"},
{Name: "B", Kind: "map", Inputs: []*Input{{Source: "clock"}}, Output: StreamOutput{Type: "proto:sf.database.v1.changes"}},
{Name: "dbout_to_graphout", Kind: "map", Inputs: []*Input{{Source: "proto:sf.database.v1.changes"}}},
{Name: "example_dbout", Kind: "map", Inputs: []*Input{{Source: "block"}}, Output: StreamOutput{Type: "proto:sf.database.v1.changes"}},
},
},
expectedError: "checking inputs for module \"use_module\": module \"use_module\": input \"map:{module_name:\\\"B\\\"}\" has different output than the used module \"dbout_to_graphout\": input \"map:{module_name:\\\"example_dbout\\\"}\"",
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
err := handleUseModules(c.pkg, c.manifest)
if c.expectedError != "" {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedError)
return
}
require.NoError(t, err)

for index, mod := range c.pkg.Modules.Modules {
require.Equal(t, mod.String(), c.expectedOutputModules[index].String())
}
})
}
}

0 comments on commit 32fb45f

Please sign in to comment.