Skip to content

Commit

Permalink
changelog, small tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Sep 21, 2023
1 parent deb3a28 commit cb77740
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 47 deletions.
26 changes: 23 additions & 3 deletions cmd/substreams/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/streamingfast/cli"
Expand All @@ -13,7 +15,7 @@ import (
)

func init() {
infoCmd.Flags().Bool("dump-sinkconfig-bytes-to-disk", false, "write sinkconfig fields that are bytes as files to disk")
infoCmd.Flags().String("output-sinkconfig-files-path", "", "if non-empty, any sinkconfig field of type 'bytes' that was packed from a file will be written to that path")
}

var infoCmd = &cobra.Command{
Expand Down Expand Up @@ -45,7 +47,7 @@ func runInfo(cmd *cobra.Command, args []string) error {
outputModule = args[1]
}

dumpBytesToDisk := mustGetBool(cmd, "dump-sinkconfig-bytes-to-disk")
outputSinkconfigFilesPath := mustGetString(cmd, "output-sinkconfig-files-path")

manifestReader, err := manifest.NewReader(manifestPath)
if err != nil {
Expand Down Expand Up @@ -123,14 +125,32 @@ func runInfo(cmd *cobra.Command, args []string) error {
fmt.Println("----")
fmt.Println("type:", pkg.SinkConfig.TypeUrl)

confs, err := manifest.SinkConfigAsString(pkg, dumpBytesToDisk)
confs, files, err := manifest.DescribeSinkConfigs(pkg)
if err != nil {
return err
}

fmt.Println("configs:")
fmt.Println(confs)

if outputSinkconfigFilesPath != "" && files != nil {
if err := os.MkdirAll(outputSinkconfigFilesPath, 0755); err != nil {
return err
}
fmt.Println("output files:")
for k, v := range files {
filename := filepath.Join(outputSinkconfigFilesPath, k)
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
if _, err := f.Write(v); err != nil {
return err
}
fmt.Printf(" - %q written to %q\n", k, filename)
}
}

}

return nil
Expand Down
10 changes: 5 additions & 5 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

### Highlights

* This release brings enhancements to "sink-config" features, as a step to enable more potent descriptions of "deployable units" inside your Substreams package. The Substreams sinks will be able to leverage the new FieldOptions in upcoming releases.

### Added

* Sink configs can now use protobuf annotations (aka Field Options) to determine how the field will be interpreted in
Expand All @@ -23,11 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
}
```

* `substreams info` command now properly displays the sink configs, optionally writing the bytes fields to disk with `--dump-sinkconfig-bytes-to-disk` flag

### Removed

* The `@path/to/textfile` and `@@path/to/binaryfile` notations under sink_configs in substreams.yaml are now DEPRECATED (they will still work but with a warning).
* `substreams info` command now properly displays the content of sink configs, optionally writing the fields that were bundled from files to disk with `--output-sinkconfig-files-path=</some/path>`

## v1.1.14

Expand Down
91 changes: 52 additions & 39 deletions manifest/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"fmt"
"io"
"io/fs"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand All @@ -22,10 +21,11 @@ import (
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/anypb"

pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
pbss "github.com/streamingfast/substreams/pb/sf/substreams"
pbssv1 "github.com/streamingfast/substreams/pb/sf/substreams/v1"
)

func (r *Reader) loadSinkConfig(pkg *pbsubstreams.Package, m *Manifest) error {
func (r *Reader) loadSinkConfig(pkg *pbssv1.Package, m *Manifest) error {
if m.Sink == nil {
return nil
}
Expand Down Expand Up @@ -68,79 +68,92 @@ func (r *Reader) loadSinkConfig(pkg *pbsubstreams.Package, m *Manifest) error {
return nil
}

func SinkConfigAsString(pkg *pbsubstreams.Package, dumpBytesToDisk bool) (string, error) {
// DescribeSinkConfigs returns a human-readable description of the sinkconfigs.
// Fields that were imported from files are returned as bytes in a map
func DescribeSinkConfigs(pkg *pbssv1.Package) (desc string, files map[string][]byte, err error) {
if pkg.SinkConfig == nil {
return "", nil
return "", nil, nil
}

msgDesc, err := getMsgDesc(pkg.SinkConfig.TypeUrl, pkg.ProtoFiles)
if err != nil {
return "", err
return "", nil, err
}

dynMsg := dynamic.NewMessageFactoryWithDefaults().NewDynamicMessage(msgDesc)
val := pkg.SinkConfig.Value
if err := dynMsg.Unmarshal(val); err != nil {
return "", err
return "", nil, err
}

var fields []*fieldAndValue
for _, fd := range dynMsg.GetMessageDescriptor().GetFields() {
field := &fieldAndValue{
key: fd.GetJSONName(),
key: strings.TrimPrefix(strings.TrimPrefix(fd.GetFullyQualifiedJSONName(), pkg.SinkConfig.TypeUrl), "."),
}
if opts := fd.GetFieldOptions(); opts != nil {
if val := opts.ProtoReflect().Get(pbsubstreams.E_Options.TypeDescriptor()); val.IsValid() {
field.opts = val.Message().Interface().(*pbsubstreams.FieldOptions)
if val := opts.ProtoReflect().Get(pbss.E_Options.TypeDescriptor()); val.IsValid() {
field.opts = val.Message().Interface().(*pbss.FieldOptions)
}
}
val, err := dynMsg.TryGetField(fd)
if err != nil {
return "", err
return "", nil, err
}
field.value = val
fields = append(fields, field)
}

out := ""
outfiles := make(map[string][]byte)
for _, fv := range fields {
out += fv.String(dumpBytesToDisk) + "\n"
text, fullContent := fv.Describe()
if fullContent != nil {
outfiles[fv.key] = fullContent
}
desc += text + "\n"
}
if len(outfiles) != 0 {
files = outfiles
}

return out, nil
return desc, files, nil
}

type fieldAndValue struct {
key string
value interface{}
opts *pbsubstreams.FieldOptions
opts *pbss.FieldOptions
}

func (f *fieldAndValue) String(dumpBytesToDisk bool) string {
switch val := f.value.(type) {
case []byte:
// Describe returns the field values as a string, except for fields that were extracted from a file. (with options 'read_from_file or zip_from_folder')
// The latter will show a short description and return the full content as bytes.
func (f *fieldAndValue) Describe() (string, []byte) {

if f.opts != nil && (f.opts.LoadFromFile || f.opts.ZipFromFolder) { // special treatment for fields coming from files: show md5sum, return rawdata as bytes
var rawdata []byte
switch val := f.value.(type) {
case string:
rawdata = []byte(val)
case []byte:
rawdata = val
}

hasher := md5.New()
hasher.Write(val)
hasher.Write(rawdata)
sum := hex.EncodeToString(hasher.Sum(nil))

suffix := optsToString(f.opts)
if dumpBytesToDisk {
file, err := ioutil.TempFile("", "substreams-info")
if err != nil {
zlog.Error("cannot write temporary file to disk", zap.Error(err))
}
if _, err := file.Write(val); err != nil {
zlog.Error("writing data to file", zap.Error(err))
}
suffix += " -> written to " + file.Name()
}
return fmt.Sprintf(" - %v: (%d bytes) MD5SUM: %v %v", f.key, len(rawdata), sum, optsToString(f.opts)), rawdata
}

return fmt.Sprintf(" - %v: (%d bytes) MD5SUM: %v %v", f.key, len(val), sum, suffix)
switch val := f.value.(type) {
case []byte:
return fmt.Sprintf(" - %v: %v (hex-encoded) %v", f.key, hex.EncodeToString(val), optsToString(f.opts)), nil
}
return fmt.Sprintf(" - %v: %v %v", f.key, f.value, optsToString(f.opts))

return fmt.Sprintf(" - %v: %v %v", f.key, f.value, optsToString(f.opts)), nil
}

func optsToString(opts *pbsubstreams.FieldOptions) string {
func optsToString(opts *pbss.FieldOptions) string {
if opts == nil {
return ""
}
Expand All @@ -153,16 +166,16 @@ func optsToString(opts *pbsubstreams.FieldOptions) string {
return ""
}

func fieldResolver(msgDesc *desc.MessageDescriptor) func(string) (opts *pbsubstreams.FieldOptions, isBytes bool) {
return func(name string) (opts *pbsubstreams.FieldOptions, isBytes bool) {
options := &pbsubstreams.FieldOptions{}
func fieldResolver(msgDesc *desc.MessageDescriptor) func(string) (opts *pbss.FieldOptions, isBytes bool) {
return func(name string) (opts *pbss.FieldOptions, isBytes bool) {
options := &pbss.FieldOptions{}
fqdn := msgDesc.GetFullyQualifiedName() + "." + name
for _, fd := range msgDesc.GetFields() {
if fd.GetFullyQualifiedJSONName() == fqdn {
isBytes := fd.GetType() == descriptorpb.FieldDescriptorProto_TYPE_BYTES
if opts := fd.GetFieldOptions(); opts != nil {
if val := opts.ProtoReflect().Get(pbsubstreams.E_Options.TypeDescriptor()); val.IsValid() {
options = val.Message().Interface().(*pbsubstreams.FieldOptions)
if val := opts.ProtoReflect().Get(pbss.E_Options.TypeDescriptor()); val.IsValid() {
options = val.Message().Interface().(*pbss.FieldOptions)
}
}
return options, isBytes
Expand Down Expand Up @@ -196,7 +209,7 @@ func appendScope(prev, in string) string {
return prev + "." + in
}

func convertYAMLtoJSONCompat(i any, resolvePath func(in string) string, scope string, resolveField func(string) (opts *pbsubstreams.FieldOptions, isBytes bool)) (out any, err error) {
func convertYAMLtoJSONCompat(i any, resolvePath func(in string) string, scope string, resolveField func(string) (opts *pbss.FieldOptions, isBytes bool)) (out any, err error) {
switch x := i.(type) {
case map[interface{}]interface{}:
m2 := map[string]interface{}{}
Expand Down

0 comments on commit cb77740

Please sign in to comment.