From 32fb45fd5038add6f263b1a2166932e5010ea1ba Mon Sep 17 00:00:00 2001 From: arnaudberger Date: Tue, 12 Mar 2024 11:50:17 -0400 Subject: [PATCH] Adding test for handle use modules --- manifest/package.go | 115 ++++++++++++++------------ manifest/package_test.go | 172 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+), 54 deletions(-) create mode 100644 manifest/package_test.go diff --git a/manifest/package.go b/manifest/package.go index 6bc807af7..539669f77 100644 --- a/manifest/package.go +++ b/manifest/package.go @@ -105,69 +105,76 @@ func handleUseModules(pkg *pbsubstreams.Package, manif *Manifest) error { packageModulesMapping[module.Name] = module } - for _, mod := range manif.Modules { - if mod.Use != "" { - usedModule, found := packageModulesMapping[mod.Use] + for _, manifestModule := range manif.Modules { + if manifestModule.Use != "" { + usedModule, found := packageModulesMapping[manifestModule.Use] if !found { - return fmt.Errorf("module %q: use module %q not found", mod.Name, mod.Use) + return fmt.Errorf("module %q: use module %q not found", manifestModule.Name, manifestModule.Use) } - pkgPbModule := packageModulesMapping[mod.Name] - pkgPbModule.BinaryIndex = usedModule.BinaryIndex - pkgPbModule.BinaryEntrypoint = usedModule.BinaryEntrypoint - - for index, input := range pkgPbModule.Inputs { - usedModuleInput := usedModule.Inputs[index] - if input.GetParams() != nil { - if usedModuleInput.GetParams() == nil { - return fmt.Errorf("module %q: input %q is not a params type", mod.Name, input.String()) - } - if input.GetParams().Value != usedModuleInput.GetParams().Value { - return fmt.Errorf("module %q: input %q has different value than the used module %q: input %q", mod.Name, input.String(), mod.Use, usedModuleInput.String()) - } - } - if input.GetSource() != nil { - if usedModuleInput.GetSource() == nil { - return fmt.Errorf("module %q: input %q is not a source type", mod.Name, input.String()) - } - if input.GetSource().Type != usedModuleInput.GetSource().Type { - return fmt.Errorf("module %q: input %q has different source than the used module %q: input %q", mod.Name, input.String(), mod.Use, usedModuleInput.String()) - } - } - if input.GetStore() != nil { - if usedModuleInput.GetStore() == nil { - return fmt.Errorf("module %q: input %q is not a store type", mod.Name, input.String()) - } - if input.GetStore().GetMode() != usedModuleInput.GetStore().GetMode() { - return fmt.Errorf("module %q: input %q has different mode than the used module %q: input %q", mod.Name, input.String(), mod.Use, usedModuleInput.String()) - } + moduleWithUse := packageModulesMapping[manifestModule.Name] - inputStoreModuleName := input.GetStore().ModuleName - usedModuleStoreMapModuleName := usedModuleInput.GetStore().ModuleName - if packageModulesMapping[inputStoreModuleName].Output.Type != packageModulesMapping[usedModuleStoreMapModuleName].Output.Type { - return fmt.Errorf("module %q: input %q has different output than the used module %q: input %q", mod.Name, input.String(), mod.Use, usedModuleInput.String()) - } + if err := checkEqualInputs(moduleWithUse, usedModule, manifestModule, packageModulesMapping); err != nil { + return fmt.Errorf("checking inputs for module %q: %w", manifestModule.Name, err) + } - } - if input.GetMap() != nil { - if usedModuleInput.GetMap() == nil { - return fmt.Errorf("module %q: input %q is not a map type", mod.Name, input.String()) - } + moduleWithUse.BinaryIndex = usedModule.BinaryIndex + moduleWithUse.BinaryEntrypoint = usedModule.BinaryEntrypoint - inputMapModuleName := input.GetMap().ModuleName - usedModuleInputMapModuleName := usedModuleInput.GetMap().ModuleName - if packageModulesMapping[inputMapModuleName].Output.Type != packageModulesMapping[usedModuleInputMapModuleName].Output.Type { - fmt.Printf("Module Output One %q, two %q", packageModulesMapping[inputMapModuleName].Output, packageModulesMapping[usedModuleInputMapModuleName].Output) - return fmt.Errorf("module %q: input %q has different output than the used module %q: input %q", mod.Name, input.String(), mod.Use, usedModuleInput.String()) - } - } + if moduleWithUse.InitialBlock == 0 { + moduleWithUse.InitialBlock = usedModule.InitialBlock + } + + moduleWithUse.Output = usedModule.Output + moduleWithUse.Kind = usedModule.Kind + } + } + return nil +} + +func checkEqualInputs(moduleWithUse, usedModule *pbsubstreams.Module, manifestModuleWithUse *Module, packageModulesMapping map[string]*pbsubstreams.Module) error { + for index, input := range moduleWithUse.Inputs { + usedModuleInput := usedModule.Inputs[index] + if input.GetParams() != nil { + if usedModuleInput.GetParams() == nil { + return fmt.Errorf("module %q: input %q is not a params type", manifestModuleWithUse.Name, input.String()) + } + if input.GetParams().Value != usedModuleInput.GetParams().Value { + return fmt.Errorf("module %q: input %q has different value than the used module %q: input %q", manifestModuleWithUse.Name, input.String(), manifestModuleWithUse.Use, usedModuleInput.String()) + } + } + if input.GetSource() != nil { + if usedModuleInput.GetSource() == nil { + return fmt.Errorf("module %q: input %q is not a source type", manifestModuleWithUse.Name, input.String()) + } + if input.GetSource().Type != usedModuleInput.GetSource().Type { + return fmt.Errorf("module %q: input %q has different source than the used module %q: input %q", manifestModuleWithUse.Name, input.String(), manifestModuleWithUse.Use, usedModuleInput.String()) + } + } + if input.GetStore() != nil { + if usedModuleInput.GetStore() == nil { + return fmt.Errorf("module %q: input %q is not a store type", manifestModuleWithUse.Name, input.String()) + } + if input.GetStore().GetMode() != usedModuleInput.GetStore().GetMode() { + return fmt.Errorf("module %q: input %q has different mode than the used module %q: input %q", manifestModuleWithUse.Name, input.String(), manifestModuleWithUse.Use, usedModuleInput.String()) } - if pkgPbModule.InitialBlock == 0 { - pkgPbModule.InitialBlock = usedModule.InitialBlock + inputStoreModuleName := input.GetStore().ModuleName + usedModuleStoreMapModuleName := usedModuleInput.GetStore().ModuleName + if packageModulesMapping[inputStoreModuleName].Output.Type != packageModulesMapping[usedModuleStoreMapModuleName].Output.Type { + return fmt.Errorf("module %q: input %q has different output than the used module %q: input %q", manifestModuleWithUse.Name, input.String(), manifestModuleWithUse.Use, usedModuleInput.String()) } - pkgPbModule.Output = usedModule.Output - pkgPbModule.Kind = usedModule.Kind + } + if input.GetMap() != nil { + if usedModuleInput.GetMap() == nil { + return fmt.Errorf("module %q: input %q is not a map type", manifestModuleWithUse.Name, input.String()) + } + + inputMapModuleName := input.GetMap().ModuleName + usedModuleInputMapModuleName := usedModuleInput.GetMap().ModuleName + if packageModulesMapping[inputMapModuleName].Output.Type != packageModulesMapping[usedModuleInputMapModuleName].Output.Type { + return fmt.Errorf("module %q: input %q has different output than the used module %q: input %q", manifestModuleWithUse.Name, input.String(), manifestModuleWithUse.Use, usedModuleInput.String()) + } } } return nil diff --git a/manifest/package_test.go b/manifest/package_test.go new file mode 100644 index 000000000..2314c065f --- /dev/null +++ b/manifest/package_test.go @@ -0,0 +1,172 @@ +package manifest + +import ( + pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestHandleUseModules(t *testing.T) { + cases := []struct { + name string + pkg *pbsubstreams.Package + manifest *Manifest + expectedOutputModules []*pbsubstreams.Module + expectedError string + }{ + { + name: "sunny path", + pkg: &pbsubstreams.Package{ + ProtoFiles: nil, + Version: 0, + Modules: &pbsubstreams.Modules{Modules: []*pbsubstreams.Module{ + { + Name: "use_module", + Kind: nil, + BinaryIndex: 0, + BinaryEntrypoint: "use_module", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "B"}}}}, + InitialBlock: 5, + }, + { + Name: "B", + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}}, + BinaryIndex: 0, + BinaryEntrypoint: "B", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "clock"}}}}, + Output: &pbsubstreams.Module_Output{Type: "proto:sf.database.v1.changes"}, + }, + { + Name: "dbout_to_graphout", + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.entity.v1.changes"}}, + BinaryIndex: 1, + BinaryEntrypoint: "dbout_to_graphout", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "example_dbout"}}}}, + Output: &pbsubstreams.Module_Output{Type: "proto:sf.entity.v1.changes"}, + }, + { + Name: "example_dbout", + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}}, + BinaryIndex: 1, + BinaryEntrypoint: "example_dbout", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "block"}}}}, + Output: &pbsubstreams.Module_Output{Type: "proto:sf.database.v1.changes"}, + }, + }, + }, + }, + manifest: &Manifest{ + Modules: []*Module{ + {Name: "use_module", Kind: "map", Inputs: []*Input{{Source: "proto:sf.database.v1.changes"}}, Use: "dbout_to_graphout"}, + {Name: "B", Kind: "map", Inputs: []*Input{{Source: "clock"}}, Output: StreamOutput{Type: "proto:sf.database.v1.changes"}}, + {Name: "dbout_to_graphout", Kind: "map", Inputs: []*Input{{Source: "proto:sf.database.v1.changes"}}}, + {Name: "example_dbout", Kind: "map", Inputs: []*Input{{Source: "block"}}, Output: StreamOutput{Type: "proto:sf.database.v1.changes"}}, + }, + }, + expectedOutputModules: []*pbsubstreams.Module{ + { + Name: "use_module", + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.entity.v1.changes"}}, + BinaryIndex: 1, + BinaryEntrypoint: "dbout_to_graphout", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "B"}}}}, + InitialBlock: 5, + Output: &pbsubstreams.Module_Output{Type: "proto:sf.entity.v1.changes"}, + }, + { + Name: "B", + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}}, + BinaryIndex: 0, + BinaryEntrypoint: "B", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "clock"}}}}, + Output: &pbsubstreams.Module_Output{Type: "proto:sf.database.v1.changes"}, + }, + { + Name: "dbout_to_graphout", + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.entity.v1.changes"}}, + BinaryIndex: 1, + BinaryEntrypoint: "dbout_to_graphout", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "example_dbout"}}}}, + Output: &pbsubstreams.Module_Output{Type: "proto:sf.entity.v1.changes"}, + }, + { + Name: "example_dbout", + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}}, + BinaryIndex: 1, + BinaryEntrypoint: "example_dbout", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "block"}}}}, + Output: &pbsubstreams.Module_Output{Type: "proto:sf.database.v1.changes"}, + }, + }, + }, + + { + name: "input's output type not matching", + pkg: &pbsubstreams.Package{ + ProtoFiles: nil, + Version: 0, + Modules: &pbsubstreams.Modules{Modules: []*pbsubstreams.Module{ + { + Name: "use_module", + Kind: nil, + BinaryIndex: 0, + BinaryEntrypoint: "use_module", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "B"}}}}, + InitialBlock: 5, + }, + { + Name: "B", + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}}, + BinaryIndex: 0, + BinaryEntrypoint: "B", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "clock"}}}}, + Output: &pbsubstreams.Module_Output{Type: "proto:sf.kv.v1.changes"}, + }, + { + Name: "dbout_to_graphout", + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.entity.v1.changes"}}, + BinaryIndex: 1, + BinaryEntrypoint: "dbout_to_graphout", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: "example_dbout"}}}}, + Output: &pbsubstreams.Module_Output{Type: "proto:sf.entity.v1.changes"}, + }, + { + Name: "example_dbout", + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{OutputType: "proto:sf.database.v1.changes"}}, + BinaryIndex: 1, + BinaryEntrypoint: "example_dbout", + Inputs: []*pbsubstreams.Module_Input{{Input: &pbsubstreams.Module_Input_Source_{Source: &pbsubstreams.Module_Input_Source{Type: "block"}}}}, + Output: &pbsubstreams.Module_Output{Type: "proto:sf.database.v1.changes"}, + }, + }, + }, + }, + manifest: &Manifest{ + Modules: []*Module{ + {Name: "use_module", Kind: "map", Inputs: []*Input{{Source: "proto:sf.database.v1.changes"}}, Use: "dbout_to_graphout"}, + {Name: "B", Kind: "map", Inputs: []*Input{{Source: "clock"}}, Output: StreamOutput{Type: "proto:sf.database.v1.changes"}}, + {Name: "dbout_to_graphout", Kind: "map", Inputs: []*Input{{Source: "proto:sf.database.v1.changes"}}}, + {Name: "example_dbout", Kind: "map", Inputs: []*Input{{Source: "block"}}, Output: StreamOutput{Type: "proto:sf.database.v1.changes"}}, + }, + }, + expectedError: "checking inputs for module \"use_module\": module \"use_module\": input \"map:{module_name:\\\"B\\\"}\" has different output than the used module \"dbout_to_graphout\": input \"map:{module_name:\\\"example_dbout\\\"}\"", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + err := handleUseModules(c.pkg, c.manifest) + if c.expectedError != "" { + require.Error(t, err) + require.Contains(t, err.Error(), c.expectedError) + return + } + require.NoError(t, err) + + for index, mod := range c.pkg.Modules.Modules { + require.Equal(t, mod.String(), c.expectedOutputModules[index].String()) + } + }) + } +}