From 2e097b54df8d1a23f949fc4afcad31165b76f3d0 Mon Sep 17 00:00:00 2001 From: colindickson Date: Tue, 19 Sep 2023 19:29:15 -0400 Subject: [PATCH] info: added new package to parse substreams information, update info command to use this library --- cmd/substreams/info.go | 77 ++++++------- info/info.go | 199 +++++++++++++++++++++++++++++++++ info/info_test.go | 28 +++++ pb/sf/substreams/v1/modules.go | 8 ++ 4 files changed, 268 insertions(+), 44 deletions(-) create mode 100644 info/info.go create mode 100644 info/info_test.go diff --git a/cmd/substreams/info.go b/cmd/substreams/info.go index 92851eeff..e205c70e3 100644 --- a/cmd/substreams/info.go +++ b/cmd/substreams/info.go @@ -1,15 +1,14 @@ package main import ( + "encoding/json" "fmt" + "github.com/streamingfast/substreams/info" "strings" "github.com/streamingfast/cli" "github.com/spf13/cobra" - "github.com/streamingfast/substreams/manifest" - pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" - "github.com/streamingfast/substreams/pipeline/outputmodules" ) // var manifestCmd = &cobra.Command{ @@ -31,6 +30,7 @@ var infoCmd = &cobra.Command{ } func init() { + infoCmd.Flags().Bool("json", false, "Output as JSON") rootCmd.AddCommand(infoCmd) } @@ -45,74 +45,63 @@ func runInfo(cmd *cobra.Command, args []string) error { outputModule = args[1] } - manifestReader, err := manifest.NewReader(manifestPath) + info, err := info.Extended(manifestPath, outputModule) if err != nil { - return fmt.Errorf("manifest reader: %w", err) + return err } - pkg, err := manifestReader.Read() - if err != nil { - return fmt.Errorf("read manifest %q: %w", manifestPath, err) - } - - graph, err := manifest.NewModuleGraph(pkg.Modules.Modules) - if err != nil { - return fmt.Errorf("creating module graph: %w", err) + if mustGetBool(cmd, "json") { + res, err := json.MarshalIndent(info, "", " ") + if err != nil { + return err + } + fmt.Println(string(res)) + return nil } - fmt.Println("Package name:", pkg.PackageMeta[0].Name) - fmt.Println("Version:", pkg.PackageMeta[0].Version) - if doc := pkg.PackageMeta[0].Doc; doc != "" { - fmt.Println("Doc: " + strings.Replace(doc, "\n", "\n ", -1)) + fmt.Println("Package name:", info.Name) + fmt.Println("Version:", info.Version) + if doc := info.Documentation; doc != nil && *doc != "" { + fmt.Println("Doc: " + strings.Replace(*doc, "\n", "\n ", -1)) } - hashes := manifest.NewModuleHashes() - fmt.Println("Modules:") fmt.Println("----") - for modIdx, module := range pkg.Modules.Modules { - fmt.Println("Name:", module.Name) - fmt.Println("Initial block:", module.InitialBlock) - kind := module.GetKind() - switch v := kind.(type) { - case *pbsubstreams.Module_KindMap_: - fmt.Println("Kind: map") - fmt.Println("Output Type:", v.KindMap.OutputType) - case *pbsubstreams.Module_KindStore_: - fmt.Println("Kind: store") - fmt.Println("Value Type:", v.KindStore.ValueType) - fmt.Println("Update Policy:", v.KindStore.UpdatePolicy) + for _, mod := range info.Modules { + fmt.Println("Name:", mod.Name) + fmt.Println("Initial block:", mod.InitialBlock) + fmt.Println("Kind:", mod.Kind) + + switch mod.Kind { + case "map": + fmt.Println("Output Type:", *mod.OutputType) + case "store": + fmt.Println("Value Type:", *mod.ValueType) + fmt.Println("Update Policy:", *mod.UpdatePolicy) default: fmt.Println("Kind: Unknown") } - hashes.HashModule(pkg.Modules, module, graph) - - fmt.Println("Hash:", hashes.Get(module.Name)) - moduleMeta := pkg.ModuleMeta[modIdx] - if moduleMeta != nil && moduleMeta.Doc != "" { - fmt.Println("Doc: " + strings.Replace(moduleMeta.Doc, "\n", "\n ", -1)) + fmt.Println("Hash:", mod.Hash) + if doc := mod.Documentation; doc != nil && *doc != "" { + fmt.Println("Doc: ", mod.Documentation) } fmt.Println("") } if outputModule != "" { - outputGraph, err := outputmodules.NewOutputModuleGraph(outputModule, true, pkg.Modules) - if err != nil { - return err - } - for i, layers := range outputGraph.StagedUsedModules() { + stages := info.ExecutionStages + for i, layers := range stages { var layerDefs []string for _, l := range layers { var mods []string for _, m := range l { - mods = append(mods, m.Name) + mods = append(mods, m) } layerDefs = append(layerDefs, fmt.Sprintf(`["%s"]`, strings.Join(mods, `","`))) } fmt.Printf("Stage %d: [%s]\n", i, strings.Join(layerDefs, `,`)) } - } return nil diff --git a/info/info.go b/info/info.go new file mode 100644 index 000000000..7da742841 --- /dev/null +++ b/info/info.go @@ -0,0 +1,199 @@ +package info + +import ( + "fmt" + "github.com/streamingfast/substreams/pipeline/outputmodules" + "strings" + + "github.com/streamingfast/substreams/manifest" + pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" + "google.golang.org/protobuf/types/descriptorpb" +) + +type BasicInfo struct { + Name string `json:"name"` + Version string `json:"version"` + Documentation *string `json:"documentation,omitempty"` + Modules []ModulesInfo `json:"modules"` + ProtoPackages []string `json:"proto_packages"` +} + +type ExtendedInfo struct { + *BasicInfo + + ExecutionStages [][][]string `json:"execution_stages,omitempty"` +} + +type ProtoFileInfo struct { + Name *string `json:"name,omitempty"` + Package *string `json:"package,omitempty"` + Dependencies []string `json:"dependencies,omitempty"` + PublicDependencies []int32 `json:"public_dependencies,omitempty"` + MessageType []*descriptorpb.DescriptorProto `json:"message_type,omitempty"` + Services []*descriptorpb.ServiceDescriptorProto `json:"services,omitempty"` +} + +type ModulesInfo struct { + Name string `json:"name"` + Kind string `json:"kind"` + Inputs []ModuleInput `json:"inputs"` + OutputType *string `json:"output_type,omitempty"` //for map inputs + ValueType *string `json:"value_type,omitempty"` //for store inputs + UpdatePolicy *string `json:"update_policy,omitempty"` //for store inputs + InitialBlock uint64 `json:"initial_block"` + Documentation *string `json:"documentation,omitempty"` + Hash string `json:"hash"` +} + +type ModuleInput struct { + Type string `json:"type"` + Name string `json:"name"` + Mode *string `json:"mode,omitempty"` //for store inputs +} + +func Basic(manifestPath string) (*BasicInfo, error) { + reader, err := manifest.NewReader(manifestPath) + if err != nil { + return nil, fmt.Errorf("manifest reader: %w", err) + } + + pkg, err := reader.Read() + if err != nil { + return nil, fmt.Errorf("read manifest %q: %w", manifestPath, err) + } + + manifestInfo := &BasicInfo{ + Name: pkg.PackageMeta[0].Name, + Version: pkg.PackageMeta[0].Version, + } + if pkg.PackageMeta[0].Doc != "" { + manifestInfo.Documentation = strPtr(strings.Replace(pkg.PackageMeta[0].Doc, "\n", "\n ", -1)) + } + + graph, err := manifest.NewModuleGraph(pkg.Modules.Modules) + if err != nil { + return nil, fmt.Errorf("creating module graph: %w", err) + } + + modules := make([]ModulesInfo, 0, len(pkg.Modules.Modules)) + + hashes := manifest.NewModuleHashes() + for ix, mod := range pkg.Modules.Modules { + modInfo := ModulesInfo{} + + _, _ = hashes.HashModule(pkg.Modules, mod, graph) + modInfo.Hash = hashes.Get(mod.Name) + + modInfo.Name = mod.Name + modInfo.InitialBlock = mod.InitialBlock + + kind := mod.GetKind() + switch v := kind.(type) { + case *pbsubstreams.Module_KindMap_: + modInfo.Kind = "map" + modInfo.OutputType = strPtr(v.KindMap.OutputType) + case *pbsubstreams.Module_KindStore_: + modInfo.Kind = "store" + modInfo.ValueType = strPtr(v.KindStore.ValueType) + modInfo.UpdatePolicy = strPtr(v.KindStore.UpdatePolicy.Pretty()) + default: + modInfo.Kind = "unknown" + } + + modMeta := pkg.ModuleMeta[ix] + if modMeta != nil && modMeta.Doc != "" { + modInfo.Documentation = strPtr(strings.Replace(modMeta.Doc, "\n", "\n ", -1)) + } + + inputs := make([]ModuleInput, 0, len(mod.Inputs)) + for _, input := range mod.Inputs { + inputInfo := ModuleInput{} + + switch v := input.Input.(type) { + case *pbsubstreams.Module_Input_Source_: + inputInfo.Type = "source" + inputInfo.Name = v.Source.Type + case *pbsubstreams.Module_Input_Map_: + inputInfo.Type = "map" + inputInfo.Name = v.Map.ModuleName + case *pbsubstreams.Module_Input_Store_: + inputInfo.Type = "store" + inputInfo.Name = v.Store.ModuleName + if v.Store.Mode > 0 { + inputInfo.Mode = strPtr(v.Store.Mode.Pretty()) + } + default: + inputInfo.Type = "unknown" + inputInfo.Name = "unknown" + } + + inputs = append(inputs, inputInfo) + } + modInfo.Inputs = inputs + + modules = append(modules, modInfo) + } + manifestInfo.Modules = modules + + protoPackages := make([]string, 0, len(pkg.ProtoFiles)) + protoPackageMap := make(map[string]struct{}) + for _, protoFile := range pkg.ProtoFiles { + if _, ok := protoPackageMap[protoFile.GetPackage()]; ok { + continue + } else { + protoPackageMap[protoFile.GetPackage()] = struct{}{} + } + + protoPackages = append(protoPackages, protoFile.GetPackage()) + } + + manifestInfo.ProtoPackages = protoPackages + + return manifestInfo, nil +} + +func Extended(manifestPath string, outputModule string) (*ExtendedInfo, error) { + basicInfo, err := Basic(manifestPath) + if err != nil { + return nil, err + } + + reader, err := manifest.NewReader(manifestPath) + if err != nil { + return nil, fmt.Errorf("manifest reader: %w", err) + } + + pkg, err := reader.Read() + if err != nil { + return nil, fmt.Errorf("read manifest %q: %w", manifestPath, err) + } + + var stages [][][]string + if outputModule != "" { + outputGraph, err := outputmodules.NewOutputModuleGraph(outputModule, true, pkg.Modules) + if err != nil { + return nil, fmt.Errorf("creating output module graph: %w", err) + } + stages = make([][][]string, 0, len(outputGraph.StagedUsedModules())) + for _, layers := range outputGraph.StagedUsedModules() { + var layerDefs [][]string + for _, l := range layers { + var mods []string + for _, m := range l { + mods = append(mods, m.Name) + } + layerDefs = append(layerDefs, mods) + } + stages = append(stages, layerDefs) + } + } + + return &ExtendedInfo{ + BasicInfo: basicInfo, + ExecutionStages: stages, + }, nil +} + +func strPtr(s string) *string { + return &s +} diff --git a/info/info_test.go b/info/info_test.go new file mode 100644 index 000000000..846f88072 --- /dev/null +++ b/info/info_test.go @@ -0,0 +1,28 @@ +package info + +import ( + "encoding/json" + "fmt" + "github.com/stretchr/testify/require" + "testing" +) + +func TestBasicInfo(t *testing.T) { + info, err := Basic("https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg") + require.NoError(t, err) + + r, err := json.MarshalIndent(info, "", " ") + require.NoError(t, err) + + fmt.Println(string(r)) +} + +func TestExtendedInfo(t *testing.T) { + info, err := Extended("https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg", "graph_out") + require.NoError(t, err) + + r, err := json.MarshalIndent(info, "", " ") + require.NoError(t, err) + + fmt.Println(string(r)) +} diff --git a/pb/sf/substreams/v1/modules.go b/pb/sf/substreams/v1/modules.go index 282bd18dd..ccf63d787 100644 --- a/pb/sf/substreams/v1/modules.go +++ b/pb/sf/substreams/v1/modules.go @@ -36,3 +36,11 @@ func (x *Module_Input) Pretty() string { return strings.TrimSpace(result) } + +func (x Module_KindStore_UpdatePolicy) Pretty() string { + return strings.TrimPrefix(strings.ToLower(x.String()), "update_policy_") +} + +func (x Module_Input_Store_Mode) Pretty() string { + return strings.ToLower(Module_Input_Store_Mode_name[int32(x)]) +}