Skip to content

Commit

Permalink
ensure reading manifest always sets initialBlock, fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Dec 19, 2023
1 parent d2c3dcf commit fb68aaf
Show file tree
Hide file tree
Showing 29 changed files with 92 additions and 189 deletions.
2 changes: 1 addition & 1 deletion cmd/substreams/codegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func runCodeGen(cmd *cobra.Command, args []string) error {
if err != nil {
return fmt.Errorf("loading manifest: %w", err)
}
pkg, err := manifestReader.Read()
pkg, _, err := manifestReader.Read()
if err != nil {
return fmt.Errorf("reading manifest %q: %w", manifestPath, err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func runManifestGraph(cmd *cobra.Command, args []string) error {
return fmt.Errorf("manifest reader: %w", err)
}

pkg, err := manifestReader.Read()
pkg, _, err := manifestReader.Read()
if err != nil {
return fmt.Errorf("read manifest %q: %w", manifestPath, err)
}
Expand Down
6 changes: 1 addition & 5 deletions cmd/substreams/gui.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func runGui(cmd *cobra.Command, args []string) error {
return fmt.Errorf("manifest reader: %w", err)
}

pkg, err := manifestReader.Read()
pkg, graph, err := manifestReader.Read()
if err != nil {
return fmt.Errorf("read manifest %q: %w", manifestPath, err)
}
Expand Down Expand Up @@ -141,10 +141,6 @@ func runGui(cmd *cobra.Command, args []string) error {
}

if readFromModule { // need to tweak the stop block here
graph, err := manifest.NewModuleGraph(pkg.Modules.Modules)
if err != nil {
return err
}
sb, err := graph.ModuleInitialBlock(outputModule)
if err != nil {
return fmt.Errorf("getting module start block: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func runInspect(cmd *cobra.Command, args []string) error {
return fmt.Errorf("manifest reader: %w", err)
}

pkg, err := manifestReader.Read()
pkg, _, err := manifestReader.Read()
if err != nil {
return fmt.Errorf("reading manifest %q: %w", manifestPath, err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func runPack(cmd *cobra.Command, args []string) error {
return fmt.Errorf(`"pack" can only be used to pack local manifest file`)
}

pkg, err := manifestReader.Read()
pkg, _, err := manifestReader.Read()
if err != nil {
return fmt.Errorf("reading manifest %q: %w", manifestPath, err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams/protogen.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func runProtogen(cmd *cobra.Command, args []string) error {
outputPath = newOutputPath
}

pkg, err := manifestReader.Read()
pkg, _, err := manifestReader.Read()
if err != nil {
return fmt.Errorf("reading manifest %q: %w", manifestPath, err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (cs *ConnectServer) Blocks(
return fmt.Errorf("manifest reader: %w", err)
}

pkg, err := manifestReader.Read()
pkg, _, err := manifestReader.Read()
if err != nil {
return fmt.Errorf("read manifest %q: %w", cs.Manifest, err)
}
Expand Down
6 changes: 1 addition & 5 deletions cmd/substreams/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func runRun(cmd *cobra.Command, args []string) error {
return fmt.Errorf("manifest reader: %w", err)
}

pkg, err := manifestReader.Read()
pkg, graph, err := manifestReader.Read()
if err != nil {
return fmt.Errorf("read manifest %q: %w", manifestPath, err)
}
Expand Down Expand Up @@ -128,10 +128,6 @@ func runRun(cmd *cobra.Command, args []string) error {
}

if readFromModule {
graph, err := manifest.NewModuleGraph(pkg.Modules.Modules)
if err != nil {
return err
}
sb, err := graph.ModuleInitialBlock(outputModule)
if err != nil {
return fmt.Errorf("getting module start block: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams/service-deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func deployE(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
pkg, err := reader.Read()
pkg, _, err := reader.Read()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams/service-update.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func updateE(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
pkg, err := reader.Read()
pkg, _, err := reader.Read()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion codegen/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func InitTestGenerator(t *testing.T) *Generator {
protoDefinitions = pd
}))

pkg, err := manifestReader.Read()
pkg, _, err := manifestReader.Read()
if err != nil {
panic(fmt.Errorf("reading manifest file %s :%w", manifestPath, err))
}
Expand Down
15 changes: 5 additions & 10 deletions info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type ModuleInput struct {
Mode *string `json:"mode,omitempty"` //for store inputs
}

func Basic(pkg *pbsubstreams.Package) (*BasicInfo, error) {
func Basic(pkg *pbsubstreams.Package, graph *manifest.ModuleGraph) (*BasicInfo, error) {
name := "Unnamed"
var doc, version string
if len(pkg.PackageMeta) != 0 {
Expand Down Expand Up @@ -118,11 +118,6 @@ func Basic(pkg *pbsubstreams.Package) (*BasicInfo, error) {
manifestInfo.Documentation = strPtr(strings.Replace(doc, "\n", "\n ", -1))
}

graph, err := manifest.NewModuleGraph(pkg.Modules.Modules)
if err != nil {
return nil, fmt.Errorf("creating module graph: %w", err)
}

modules := make([]ModulesInfo, 0, len(pkg.Modules.Modules))

hashes := manifest.NewModuleHashes()
Expand Down Expand Up @@ -223,16 +218,16 @@ func Extended(manifestPath string, outputModule string, skipValidation bool) (*E
return nil, fmt.Errorf("manifest reader: %w", err)
}

pkg, err := reader.Read()
pkg, graph, err := reader.Read()
if err != nil {
return nil, fmt.Errorf("read manifest %q: %w", manifestPath, err)
}

return ExtendedWithPackage(pkg, outputModule)
return ExtendedWithPackage(pkg, graph, outputModule)
}

func ExtendedWithPackage(pkg *pbsubstreams.Package, outputModule string) (*ExtendedInfo, error) {
basicInfo, err := Basic(pkg)
func ExtendedWithPackage(pkg *pbsubstreams.Package, graph *manifest.ModuleGraph, outputModule string) (*ExtendedInfo, error) {
basicInfo, err := Basic(pkg, graph)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions info/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ func TestBasicInfo(t *testing.T) {
reader, err := manifest.NewReader("https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg")
require.NoError(t, err)

pkg, err := reader.Read()
pkg, graph, err := reader.Read()
require.NoError(t, err)

info, err := Basic(pkg)
info, err := Basic(pkg, graph)
require.NoError(t, err)

r, err := json.MarshalIndent(info, "", " ")
Expand Down
12 changes: 0 additions & 12 deletions manifest/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,9 @@ func NewModuleGraph(modules []*pbsubstreams.Module) (*ModuleGraph, error) {
return nil, fmt.Errorf("modules graph has a cycle")
}

if err := computeInitialBlock(modules, g); err != nil {
return nil, err
}

return g, nil
}

func MustNewModuleGraph(modules []*pbsubstreams.Module) *ModuleGraph {
g, err := NewModuleGraph(modules)
if err != nil {
panic(err)
}
return g
}

// ResetGraphHashes is to be called when you want to force a recomputation of the module hashes.
func (graph *ModuleGraph) ResetGraphHashes() {
graph.currentHashesCache = make(map[string][]byte)
Expand Down
20 changes: 15 additions & 5 deletions manifest/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ func TestModuleGraph_computeInitialBlocks(t *testing.T) {
},
}

_, err := NewModuleGraph(startBlockTestModule)
graph, err := NewModuleGraph(startBlockTestModule)
require.NoError(t, err)
err = computeInitialBlock(startBlockTestModule, graph)
require.NoError(t, err)

assert.Equal(t, uint64(20), startBlockTestModule[0].InitialBlock)
Expand Down Expand Up @@ -333,7 +335,9 @@ func TestModuleGraph_ComputeInitialBlocks_WithOneParentContainingNoInitialBlock(
},
}

_, err := NewModuleGraph(testModules)
graph, err := NewModuleGraph(testModules)
require.NoError(t, err)
err = computeInitialBlock(testModules, graph)
require.NoError(t, err)

assert.Equal(t, bstream.GetProtocolFirstStreamableBlock, testModules[0].InitialBlock)
Expand Down Expand Up @@ -361,7 +365,9 @@ func TestModuleGraph_ComputeInitialBlocks_WithOneParentContainingAInitialBlock(t
},
}

_, err := NewModuleGraph(testModules)
graph, err := NewModuleGraph(testModules)
require.NoError(t, err)
err = computeInitialBlock(testModules, graph)
require.NoError(t, err)

assert.Equal(t, uint64(10), testModules[0].GetInitialBlock())
Expand Down Expand Up @@ -422,7 +428,9 @@ func TestModuleGraph_ComputeInitialBlocks_WithTwoParentsAndAGrandParentContainin
},
}

_, err := NewModuleGraph(testModules)
graph, err := NewModuleGraph(testModules)
require.NoError(t, err)
err = computeInitialBlock(testModules, graph)
require.NoError(t, err)

assert.Equal(t, uint64(10), testModules[0].GetInitialBlock())
Expand Down Expand Up @@ -474,6 +482,8 @@ func TestModuleGraph_ComputeInitialBlocks_WithThreeParentsEachContainingAInitial
},
}

_, err := NewModuleGraph(testModules)
graph, err := NewModuleGraph(testModules)
require.NoError(t, err)
err = computeInitialBlock(testModules, graph)
assert.Equal(t, `cannot deterministically determine the initialBlock for module "D"; multiple inputs have conflicting initial blocks defined or inherited`, err.Error())
}
2 changes: 1 addition & 1 deletion manifest/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ inputs:

func TestManifest_ToProto(t *testing.T) {
reader := MustNewReader("./test/test_manifest.yaml")
pkg, err := reader.Read()
pkg, _, err := reader.Read()
require.NoError(t, err)

pbManifest := pkg.Modules
Expand Down
43 changes: 2 additions & 41 deletions manifest/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/streamingfast/substreams/pb/system"
sfproto "github.com/streamingfast/substreams/proto"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
)

Expand Down Expand Up @@ -48,8 +47,8 @@ func loadProtobufs(pkg *pbsubstreams.Package, manif *Manifest) ([]*desc.FileDesc
ImportPaths: importPaths,
IncludeSourceCodeInfo: true,
Accessor: func(filename string) (io.ReadCloser, error) {
// This is a workaround for protoparse's parser that does not honor extensions (google.protobuf.FieldOptions) without access to the full source:
// the source 'sf/substreams/options.proto' file is provided through go_embed, simulating that the file exists on disk.
// This is a workaround for protoparse's parser that does not honor extensions (google.protobuf.FieldOptions) without access to the full source:
// the source 'sf/substreams/options.proto' file is provided through go_embed, simulating that the file exists on disk.
if strings.HasSuffix(filename, sfproto.OptionsPath) {
return io.NopCloser(bytes.NewReader(sfproto.OptionsSource)), nil
}
Expand All @@ -74,44 +73,6 @@ func loadProtobufs(pkg *pbsubstreams.Package, manif *Manifest) ([]*desc.FileDesc
return customFiles, nil
}

type ext struct {
}

func (e ext) New() protoreflect.Value {
//TODO implement me
panic("implement me")
}

func (e ext) Zero() protoreflect.Value {
//TODO implement me
panic("implement me")
}

func (e ext) TypeDescriptor() protoreflect.ExtensionTypeDescriptor {
//TODO implement me
panic("implement me")
}

func (e ext) ValueOf(i interface{}) protoreflect.Value {
//TODO implement me
panic("implement me")
}

func (e ext) InterfaceOf(value protoreflect.Value) interface{} {
//TODO implement me
panic("implement me")
}

func (e ext) IsValidValue(value protoreflect.Value) bool {
//TODO implement me
panic("implement me")
}

func (e ext) IsValidInterface(i interface{}) bool {
//TODO implement me
panic("implement me")
}

func readSystemProtobufs() (*descriptorpb.FileDescriptorSet, error) {
fds := &descriptorpb.FileDescriptorSet{}
err := proto.Unmarshal(system.ProtobufDescriptors, fds)
Expand Down
Loading

0 comments on commit fb68aaf

Please sign in to comment.