Skip to content

Commit

Permalink
info: added new package to parse substreams information, update info …
Browse files Browse the repository at this point in the history
…command to use this library
  • Loading branch information
colindickson committed Sep 22, 2023
1 parent a21236e commit 2e097b5
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 44 deletions.
77 changes: 33 additions & 44 deletions cmd/substreams/info.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package main

import (
"encoding/json"
"fmt"
"github.com/streamingfast/substreams/info"
"strings"

"github.com/streamingfast/cli"

"github.com/spf13/cobra"
"github.com/streamingfast/substreams/manifest"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/streamingfast/substreams/pipeline/outputmodules"
)

// var manifestCmd = &cobra.Command{
Expand All @@ -31,6 +30,7 @@ var infoCmd = &cobra.Command{
}

func init() {
infoCmd.Flags().Bool("json", false, "Output as JSON")
rootCmd.AddCommand(infoCmd)
}

Expand All @@ -45,74 +45,63 @@ func runInfo(cmd *cobra.Command, args []string) error {
outputModule = args[1]
}

manifestReader, err := manifest.NewReader(manifestPath)
info, err := info.Extended(manifestPath, outputModule)
if err != nil {
return fmt.Errorf("manifest reader: %w", err)
return err
}

pkg, err := manifestReader.Read()
if err != nil {
return fmt.Errorf("read manifest %q: %w", manifestPath, err)
}

graph, err := manifest.NewModuleGraph(pkg.Modules.Modules)
if err != nil {
return fmt.Errorf("creating module graph: %w", err)
if mustGetBool(cmd, "json") {
res, err := json.MarshalIndent(info, "", " ")
if err != nil {
return err
}
fmt.Println(string(res))
return nil
}

fmt.Println("Package name:", pkg.PackageMeta[0].Name)
fmt.Println("Version:", pkg.PackageMeta[0].Version)
if doc := pkg.PackageMeta[0].Doc; doc != "" {
fmt.Println("Doc: " + strings.Replace(doc, "\n", "\n ", -1))
fmt.Println("Package name:", info.Name)
fmt.Println("Version:", info.Version)
if doc := info.Documentation; doc != nil && *doc != "" {
fmt.Println("Doc: " + strings.Replace(*doc, "\n", "\n ", -1))
}

hashes := manifest.NewModuleHashes()

fmt.Println("Modules:")
fmt.Println("----")
for modIdx, module := range pkg.Modules.Modules {
fmt.Println("Name:", module.Name)
fmt.Println("Initial block:", module.InitialBlock)
kind := module.GetKind()
switch v := kind.(type) {
case *pbsubstreams.Module_KindMap_:
fmt.Println("Kind: map")
fmt.Println("Output Type:", v.KindMap.OutputType)
case *pbsubstreams.Module_KindStore_:
fmt.Println("Kind: store")
fmt.Println("Value Type:", v.KindStore.ValueType)
fmt.Println("Update Policy:", v.KindStore.UpdatePolicy)
for _, mod := range info.Modules {
fmt.Println("Name:", mod.Name)
fmt.Println("Initial block:", mod.InitialBlock)
fmt.Println("Kind:", mod.Kind)

switch mod.Kind {
case "map":
fmt.Println("Output Type:", *mod.OutputType)
case "store":
fmt.Println("Value Type:", *mod.ValueType)
fmt.Println("Update Policy:", *mod.UpdatePolicy)
default:
fmt.Println("Kind: Unknown")
}

hashes.HashModule(pkg.Modules, module, graph)

fmt.Println("Hash:", hashes.Get(module.Name))
moduleMeta := pkg.ModuleMeta[modIdx]
if moduleMeta != nil && moduleMeta.Doc != "" {
fmt.Println("Doc: " + strings.Replace(moduleMeta.Doc, "\n", "\n ", -1))
fmt.Println("Hash:", mod.Hash)
if doc := mod.Documentation; doc != nil && *doc != "" {
fmt.Println("Doc: ", mod.Documentation)
}
fmt.Println("")
}

if outputModule != "" {
outputGraph, err := outputmodules.NewOutputModuleGraph(outputModule, true, pkg.Modules)
if err != nil {
return err
}
for i, layers := range outputGraph.StagedUsedModules() {
stages := info.ExecutionStages
for i, layers := range stages {
var layerDefs []string
for _, l := range layers {
var mods []string
for _, m := range l {
mods = append(mods, m.Name)
mods = append(mods, m)
}
layerDefs = append(layerDefs, fmt.Sprintf(`["%s"]`, strings.Join(mods, `","`)))
}
fmt.Printf("Stage %d: [%s]\n", i, strings.Join(layerDefs, `,`))
}

}

return nil
Expand Down
199 changes: 199 additions & 0 deletions info/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package info

import (
"fmt"
"github.com/streamingfast/substreams/pipeline/outputmodules"
"strings"

"github.com/streamingfast/substreams/manifest"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"google.golang.org/protobuf/types/descriptorpb"
)

type BasicInfo struct {
Name string `json:"name"`
Version string `json:"version"`
Documentation *string `json:"documentation,omitempty"`
Modules []ModulesInfo `json:"modules"`
ProtoPackages []string `json:"proto_packages"`
}

type ExtendedInfo struct {
*BasicInfo

ExecutionStages [][][]string `json:"execution_stages,omitempty"`
}

type ProtoFileInfo struct {
Name *string `json:"name,omitempty"`
Package *string `json:"package,omitempty"`
Dependencies []string `json:"dependencies,omitempty"`
PublicDependencies []int32 `json:"public_dependencies,omitempty"`
MessageType []*descriptorpb.DescriptorProto `json:"message_type,omitempty"`
Services []*descriptorpb.ServiceDescriptorProto `json:"services,omitempty"`
}

type ModulesInfo struct {
Name string `json:"name"`
Kind string `json:"kind"`
Inputs []ModuleInput `json:"inputs"`
OutputType *string `json:"output_type,omitempty"` //for map inputs
ValueType *string `json:"value_type,omitempty"` //for store inputs
UpdatePolicy *string `json:"update_policy,omitempty"` //for store inputs
InitialBlock uint64 `json:"initial_block"`
Documentation *string `json:"documentation,omitempty"`
Hash string `json:"hash"`
}

type ModuleInput struct {
Type string `json:"type"`
Name string `json:"name"`
Mode *string `json:"mode,omitempty"` //for store inputs
}

func Basic(manifestPath string) (*BasicInfo, error) {
reader, err := manifest.NewReader(manifestPath)
if err != nil {
return nil, fmt.Errorf("manifest reader: %w", err)
}

pkg, err := reader.Read()
if err != nil {
return nil, fmt.Errorf("read manifest %q: %w", manifestPath, err)
}

manifestInfo := &BasicInfo{
Name: pkg.PackageMeta[0].Name,
Version: pkg.PackageMeta[0].Version,
}
if pkg.PackageMeta[0].Doc != "" {
manifestInfo.Documentation = strPtr(strings.Replace(pkg.PackageMeta[0].Doc, "\n", "\n ", -1))
}

graph, err := manifest.NewModuleGraph(pkg.Modules.Modules)
if err != nil {
return nil, fmt.Errorf("creating module graph: %w", err)
}

modules := make([]ModulesInfo, 0, len(pkg.Modules.Modules))

hashes := manifest.NewModuleHashes()
for ix, mod := range pkg.Modules.Modules {
modInfo := ModulesInfo{}

_, _ = hashes.HashModule(pkg.Modules, mod, graph)
modInfo.Hash = hashes.Get(mod.Name)

modInfo.Name = mod.Name
modInfo.InitialBlock = mod.InitialBlock

kind := mod.GetKind()
switch v := kind.(type) {
case *pbsubstreams.Module_KindMap_:
modInfo.Kind = "map"
modInfo.OutputType = strPtr(v.KindMap.OutputType)
case *pbsubstreams.Module_KindStore_:
modInfo.Kind = "store"
modInfo.ValueType = strPtr(v.KindStore.ValueType)
modInfo.UpdatePolicy = strPtr(v.KindStore.UpdatePolicy.Pretty())
default:
modInfo.Kind = "unknown"
}

modMeta := pkg.ModuleMeta[ix]
if modMeta != nil && modMeta.Doc != "" {
modInfo.Documentation = strPtr(strings.Replace(modMeta.Doc, "\n", "\n ", -1))
}

inputs := make([]ModuleInput, 0, len(mod.Inputs))
for _, input := range mod.Inputs {
inputInfo := ModuleInput{}

switch v := input.Input.(type) {
case *pbsubstreams.Module_Input_Source_:
inputInfo.Type = "source"
inputInfo.Name = v.Source.Type
case *pbsubstreams.Module_Input_Map_:
inputInfo.Type = "map"
inputInfo.Name = v.Map.ModuleName
case *pbsubstreams.Module_Input_Store_:
inputInfo.Type = "store"
inputInfo.Name = v.Store.ModuleName
if v.Store.Mode > 0 {
inputInfo.Mode = strPtr(v.Store.Mode.Pretty())
}
default:
inputInfo.Type = "unknown"
inputInfo.Name = "unknown"
}

inputs = append(inputs, inputInfo)
}
modInfo.Inputs = inputs

modules = append(modules, modInfo)
}
manifestInfo.Modules = modules

protoPackages := make([]string, 0, len(pkg.ProtoFiles))
protoPackageMap := make(map[string]struct{})
for _, protoFile := range pkg.ProtoFiles {
if _, ok := protoPackageMap[protoFile.GetPackage()]; ok {
continue
} else {
protoPackageMap[protoFile.GetPackage()] = struct{}{}
}

protoPackages = append(protoPackages, protoFile.GetPackage())
}

manifestInfo.ProtoPackages = protoPackages

return manifestInfo, nil
}

func Extended(manifestPath string, outputModule string) (*ExtendedInfo, error) {
basicInfo, err := Basic(manifestPath)
if err != nil {
return nil, err
}

reader, err := manifest.NewReader(manifestPath)
if err != nil {
return nil, fmt.Errorf("manifest reader: %w", err)
}

pkg, err := reader.Read()
if err != nil {
return nil, fmt.Errorf("read manifest %q: %w", manifestPath, err)
}

var stages [][][]string
if outputModule != "" {
outputGraph, err := outputmodules.NewOutputModuleGraph(outputModule, true, pkg.Modules)
if err != nil {
return nil, fmt.Errorf("creating output module graph: %w", err)
}
stages = make([][][]string, 0, len(outputGraph.StagedUsedModules()))
for _, layers := range outputGraph.StagedUsedModules() {
var layerDefs [][]string
for _, l := range layers {
var mods []string
for _, m := range l {
mods = append(mods, m.Name)
}
layerDefs = append(layerDefs, mods)
}
stages = append(stages, layerDefs)
}
}

return &ExtendedInfo{
BasicInfo: basicInfo,
ExecutionStages: stages,
}, nil
}

func strPtr(s string) *string {
return &s
}
28 changes: 28 additions & 0 deletions info/info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package info

import (
"encoding/json"
"fmt"
"github.com/stretchr/testify/require"
"testing"
)

func TestBasicInfo(t *testing.T) {
info, err := Basic("https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg")
require.NoError(t, err)

r, err := json.MarshalIndent(info, "", " ")
require.NoError(t, err)

fmt.Println(string(r))
}

func TestExtendedInfo(t *testing.T) {
info, err := Extended("https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg", "graph_out")
require.NoError(t, err)

r, err := json.MarshalIndent(info, "", " ")
require.NoError(t, err)

fmt.Println(string(r))
}
8 changes: 8 additions & 0 deletions pb/sf/substreams/v1/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ func (x *Module_Input) Pretty() string {

return strings.TrimSpace(result)
}

func (x Module_KindStore_UpdatePolicy) Pretty() string {
return strings.TrimPrefix(strings.ToLower(x.String()), "update_policy_")
}

func (x Module_Input_Store_Mode) Pretty() string {
return strings.ToLower(Module_Input_Store_Mode_name[int32(x)])
}

0 comments on commit 2e097b5

Please sign in to comment.