From cb7774096522cb7425044a47b2feabed8d754ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Thu, 21 Sep 2023 15:33:08 -0400 Subject: [PATCH] changelog, small tweaks --- cmd/substreams/info.go | 26 +++++++-- docs/release-notes/change-log.md | 10 ++-- manifest/sink.go | 91 ++++++++++++++++++-------------- 3 files changed, 80 insertions(+), 47 deletions(-) diff --git a/cmd/substreams/info.go b/cmd/substreams/info.go index 24a24ee63..23e261953 100644 --- a/cmd/substreams/info.go +++ b/cmd/substreams/info.go @@ -2,6 +2,8 @@ package main import ( "fmt" + "os" + "path/filepath" "strings" "github.com/streamingfast/cli" @@ -13,7 +15,7 @@ import ( ) func init() { - infoCmd.Flags().Bool("dump-sinkconfig-bytes-to-disk", false, "write sinkconfig fields that are bytes as files to disk") + infoCmd.Flags().String("output-sinkconfig-files-path", "", "if non-empty, any sinkconfig field of type 'bytes' that was packed from a file will be written to that path") } var infoCmd = &cobra.Command{ @@ -45,7 +47,7 @@ func runInfo(cmd *cobra.Command, args []string) error { outputModule = args[1] } - dumpBytesToDisk := mustGetBool(cmd, "dump-sinkconfig-bytes-to-disk") + outputSinkconfigFilesPath := mustGetString(cmd, "output-sinkconfig-files-path") manifestReader, err := manifest.NewReader(manifestPath) if err != nil { @@ -123,7 +125,7 @@ func runInfo(cmd *cobra.Command, args []string) error { fmt.Println("----") fmt.Println("type:", pkg.SinkConfig.TypeUrl) - confs, err := manifest.SinkConfigAsString(pkg, dumpBytesToDisk) + confs, files, err := manifest.DescribeSinkConfigs(pkg) if err != nil { return err } @@ -131,6 +133,24 @@ func runInfo(cmd *cobra.Command, args []string) error { fmt.Println("configs:") fmt.Println(confs) + if outputSinkconfigFilesPath != "" && files != nil { + if err := os.MkdirAll(outputSinkconfigFilesPath, 0755); err != nil { + return err + } + fmt.Println("output files:") + for k, v := range files { + filename := filepath.Join(outputSinkconfigFilesPath, k) + f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return err + } + if _, err := f.Write(v); err != nil { + return err + } + fmt.Printf(" - %q written to %q\n", k, filename) + } + } + } return nil diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 3ebc9fa9a..760c3f35d 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## Unreleased +### Highlights + +* This release brings enhancements to "sink-config" features, as a step to enable more potent descriptions of "deployable units" inside your Substreams package. The Substreams sinks will be able to leverage the new FieldOptions in upcoming releases. + ### Added * Sink configs can now use protobuf annotations (aka Field Options) to determine how the field will be interpreted in @@ -23,11 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), } ``` -* `substreams info` command now properly displays the sink configs, optionally writing the bytes fields to disk with `--dump-sinkconfig-bytes-to-disk` flag - -### Removed - -* The `@path/to/textfile` and `@@path/to/binaryfile` notations under sink_configs in substreams.yaml are now DEPRECATED (they will still work but with a warning). +* `substreams info` command now properly displays the content of sink configs, optionally writing the fields that were bundled from files to disk with `--output-sinkconfig-files-path=` ## v1.1.14 diff --git a/manifest/sink.go b/manifest/sink.go index 28dc20520..53fc6ce98 100644 --- a/manifest/sink.go +++ b/manifest/sink.go @@ -11,7 +11,6 @@ import ( "fmt" "io" "io/fs" - "io/ioutil" "os" "path/filepath" "strings" @@ -22,10 +21,11 @@ import ( "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/known/anypb" - pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" + pbss "github.com/streamingfast/substreams/pb/sf/substreams" + pbssv1 "github.com/streamingfast/substreams/pb/sf/substreams/v1" ) -func (r *Reader) loadSinkConfig(pkg *pbsubstreams.Package, m *Manifest) error { +func (r *Reader) loadSinkConfig(pkg *pbssv1.Package, m *Manifest) error { if m.Sink == nil { return nil } @@ -68,79 +68,92 @@ func (r *Reader) loadSinkConfig(pkg *pbsubstreams.Package, m *Manifest) error { return nil } -func SinkConfigAsString(pkg *pbsubstreams.Package, dumpBytesToDisk bool) (string, error) { +// DescribeSinkConfigs returns a human-readable description of the sinkconfigs. +// Fields that were imported from files are returned as bytes in a map +func DescribeSinkConfigs(pkg *pbssv1.Package) (desc string, files map[string][]byte, err error) { if pkg.SinkConfig == nil { - return "", nil + return "", nil, nil } msgDesc, err := getMsgDesc(pkg.SinkConfig.TypeUrl, pkg.ProtoFiles) if err != nil { - return "", err + return "", nil, err } dynMsg := dynamic.NewMessageFactoryWithDefaults().NewDynamicMessage(msgDesc) val := pkg.SinkConfig.Value if err := dynMsg.Unmarshal(val); err != nil { - return "", err + return "", nil, err } var fields []*fieldAndValue for _, fd := range dynMsg.GetMessageDescriptor().GetFields() { field := &fieldAndValue{ - key: fd.GetJSONName(), + key: strings.TrimPrefix(strings.TrimPrefix(fd.GetFullyQualifiedJSONName(), pkg.SinkConfig.TypeUrl), "."), } if opts := fd.GetFieldOptions(); opts != nil { - if val := opts.ProtoReflect().Get(pbsubstreams.E_Options.TypeDescriptor()); val.IsValid() { - field.opts = val.Message().Interface().(*pbsubstreams.FieldOptions) + if val := opts.ProtoReflect().Get(pbss.E_Options.TypeDescriptor()); val.IsValid() { + field.opts = val.Message().Interface().(*pbss.FieldOptions) } } val, err := dynMsg.TryGetField(fd) if err != nil { - return "", err + return "", nil, err } field.value = val fields = append(fields, field) } - out := "" + outfiles := make(map[string][]byte) for _, fv := range fields { - out += fv.String(dumpBytesToDisk) + "\n" + text, fullContent := fv.Describe() + if fullContent != nil { + outfiles[fv.key] = fullContent + } + desc += text + "\n" + } + if len(outfiles) != 0 { + files = outfiles } - return out, nil + return desc, files, nil } type fieldAndValue struct { key string value interface{} - opts *pbsubstreams.FieldOptions + opts *pbss.FieldOptions } -func (f *fieldAndValue) String(dumpBytesToDisk bool) string { - switch val := f.value.(type) { - case []byte: +// Describe returns the field values as a string, except for fields that were extracted from a file. (with options 'read_from_file or zip_from_folder') +// The latter will show a short description and return the full content as bytes. +func (f *fieldAndValue) Describe() (string, []byte) { + + if f.opts != nil && (f.opts.LoadFromFile || f.opts.ZipFromFolder) { // special treatment for fields coming from files: show md5sum, return rawdata as bytes + var rawdata []byte + switch val := f.value.(type) { + case string: + rawdata = []byte(val) + case []byte: + rawdata = val + } + hasher := md5.New() - hasher.Write(val) + hasher.Write(rawdata) sum := hex.EncodeToString(hasher.Sum(nil)) - suffix := optsToString(f.opts) - if dumpBytesToDisk { - file, err := ioutil.TempFile("", "substreams-info") - if err != nil { - zlog.Error("cannot write temporary file to disk", zap.Error(err)) - } - if _, err := file.Write(val); err != nil { - zlog.Error("writing data to file", zap.Error(err)) - } - suffix += " -> written to " + file.Name() - } + return fmt.Sprintf(" - %v: (%d bytes) MD5SUM: %v %v", f.key, len(rawdata), sum, optsToString(f.opts)), rawdata + } - return fmt.Sprintf(" - %v: (%d bytes) MD5SUM: %v %v", f.key, len(val), sum, suffix) + switch val := f.value.(type) { + case []byte: + return fmt.Sprintf(" - %v: %v (hex-encoded) %v", f.key, hex.EncodeToString(val), optsToString(f.opts)), nil } - return fmt.Sprintf(" - %v: %v %v", f.key, f.value, optsToString(f.opts)) + + return fmt.Sprintf(" - %v: %v %v", f.key, f.value, optsToString(f.opts)), nil } -func optsToString(opts *pbsubstreams.FieldOptions) string { +func optsToString(opts *pbss.FieldOptions) string { if opts == nil { return "" } @@ -153,16 +166,16 @@ func optsToString(opts *pbsubstreams.FieldOptions) string { return "" } -func fieldResolver(msgDesc *desc.MessageDescriptor) func(string) (opts *pbsubstreams.FieldOptions, isBytes bool) { - return func(name string) (opts *pbsubstreams.FieldOptions, isBytes bool) { - options := &pbsubstreams.FieldOptions{} +func fieldResolver(msgDesc *desc.MessageDescriptor) func(string) (opts *pbss.FieldOptions, isBytes bool) { + return func(name string) (opts *pbss.FieldOptions, isBytes bool) { + options := &pbss.FieldOptions{} fqdn := msgDesc.GetFullyQualifiedName() + "." + name for _, fd := range msgDesc.GetFields() { if fd.GetFullyQualifiedJSONName() == fqdn { isBytes := fd.GetType() == descriptorpb.FieldDescriptorProto_TYPE_BYTES if opts := fd.GetFieldOptions(); opts != nil { - if val := opts.ProtoReflect().Get(pbsubstreams.E_Options.TypeDescriptor()); val.IsValid() { - options = val.Message().Interface().(*pbsubstreams.FieldOptions) + if val := opts.ProtoReflect().Get(pbss.E_Options.TypeDescriptor()); val.IsValid() { + options = val.Message().Interface().(*pbss.FieldOptions) } } return options, isBytes @@ -196,7 +209,7 @@ func appendScope(prev, in string) string { return prev + "." + in } -func convertYAMLtoJSONCompat(i any, resolvePath func(in string) string, scope string, resolveField func(string) (opts *pbsubstreams.FieldOptions, isBytes bool)) (out any, err error) { +func convertYAMLtoJSONCompat(i any, resolvePath func(in string) string, scope string, resolveField func(string) (opts *pbss.FieldOptions, isBytes bool)) (out any, err error) { switch x := i.(type) { case map[interface{}]interface{}: m2 := map[string]interface{}{}