diff --git a/.gitignore b/.gitignore index 85d77f0..c6d0140 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea +.vscode /build /dist .envrc diff --git a/CHANGELOG.md b/CHANGELOG.md index de54e0b..360cbff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,14 +8,65 @@ Operators, you should copy/paste content of this content straight to your projec If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary. -## Unreleased +## v1.6.4 -* [Operator] Node Manager HTTP `/v1/resume` call now accepts `extra-env==&extra-env==` enabling to override environment variables for the next restart **only**. Use `curl -XPOST "http://localhost:10011/v1/resume?sync=true&extra-env=NODE_DEBUG=true"` (change `localhost:10011` accordingly to your setup). +### Substreams fixes + +* Fixed an(other) issue where multiple stores running on the same stage with different initialBlocks will fail to proress (and hang) + +## v1.6.3 + +### Substreams fixes + +* Fix "cannot resolve 'old cursor' from files in passthrough mode" error on some requests with an old cursor +* Fix handling of 'special case' substreams module with only "params" as its input: should not skip this execution (used in graph-node for head tracking) + -> empty files in module cache with hash `d3b1920483180cbcd2fd10abcabbee431146f4c8` should be deleted for consistency +* Fix bug where some invalid cursors may be sent (with 'LIB' being above the block being sent) and add safeguard/loggin if the bug appears again +* Fix panic in the whole tier2 process when stores go above the size limit while being read from "kvops" cached changes + +### Core fixes + +* fix: reader-node-stdin not shutting down after receiving an EOF + +## v1.6.2 + +### Core + +* [Operator] The flag `--advertise-block-id-encoding` now accepts shorter form: `hex`, `base64`, etc. The older longer form `BLOCK_ID_ENCODING_HEX` is still supported but we suggested using the shorter form from now on. + +### Substreams v1.10.2 + +> **Note** Since a bug that affected substreams with "skipping blocks" was corrected in this release, any previously produced substreams cache should be considered as possibly corrupted and be eventually replaced + +* Substreams: fix bad handling of modules with multiple inputs when only one of them is filtered, resulting in bad outputs in production-mode. +* Substreams: fix stalling on some substreams with stores and mappers with different start block numbers on the same stage +* Substreams: fix 'development mode' and LIVE mode executing some modules that should be skipped + +## v1.6.1 + +* Bump substreams to v1.10.0: Version 1.10.0 adds a new `EndpointInfo/Info` endpoint, introduces a 3-minute default execution timeout per block, updates metering metrics with a deprecation warning, enhances `substreams init` commands, and improves wasm module caching and Prometheus tool flexibility. Full changelog: https://github.com/streamingfast/substreams/releases/tag/v1.10.0 +* Metering update: more detailed metering with addition of new metrics. *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead + +## v1.6.0 + +* Add `sf.firehose.v2.EndpointInfo/Info` service on Firehose and `sf.substreams.rpc.v2.EndpointInfo/Info` to Substreams endpoints. This involves the following new flags: + - `advertise-chain-name` Canonical name of the chain according to https://thegraph.com/docs/en/developing/supported-networks/ (required, unless it is in the "well-known" list) + - `advertise-chain-aliases` Alternate names for that chain (optional) + - `advertise-block-features` List of features describing the blocks (optional) + - `advertise-block-id-encoding` Encoding format of the block ID [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_BASE64URL, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] (required, unless the block type is in the "well-known" list) + - `ignore-advertise-validation` Runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail. - > This is **not** persistent upon restart +* Add a well-known list of chains (hard-coded in `wellknown/chains.go` to help automatically determine the 'advertise' flag values). Users are encouraged to propose Pull Requests to add more chains to the list. +* The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service. + It validates the following on a well-known chain: + - if the first-streamable-block Num/ID match the genesis block of a known chain, e.g. `matic`, it will refuse another value for `advertise-chain-name` than `matic` or one of its aliases (`polygon`) + - If the first-streamable-block does not match any known chain, it will require the `advertise-chain-name` to be non-empty + - If the first-streamable-block type is unknown (i.e. not ethereum, solana, near, cosmos, bitcoin...), it will require the user to provide `advertise-chain-name` as well as `advertise-block-id-encoding` -* Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero first streamable block via `--common-first-streamable-block`. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module. +* Substreams: add `--common-tmp-dir` flag and activate local caching of pre-compiled WASM modules through wazero feature +* Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module. * Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling +* Metering update: more detailed metering with addition of new metrics (`live_uncompressed_read_bytes`, `live_uncompressed_read_forked_bytes`, `file_uncompressed_read_bytes`, `file_uncompressed_read_forked_bytes`, `file_compressed_read_forked_bytes`, `file_compressed_read_bytes`). *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead. ## v1.5.7 diff --git a/README.md b/README.md index 6e7be44..deef046 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Or download the latest Release from https://github.com/streamingfast/firehose-co ## Running directly -* firehose-core can run one or many of the following components: +* `firehose-core` can run one or many of the following components: - reader-node - merger - relayer @@ -33,6 +33,17 @@ start: * Run it with `firecore start --config-file=./firehose.yaml` or set an empty value for config-file (`--config-file=`) to use the default values. +### Development + +For development purposes, the easiest set up is to install the [dummy-blockchain](https://github.com/streamingfast/dummy-blockchain) and then use the `./devel/standard/start.sh` script we provide in the repository that launches a full fledged `firehose-core` instance backed by this dummy blockchain: + +``` +# Needed only once, if you don't have the binary locally already +go install github.com/streamingfast/dummy-blockchain@latest + +# The -c cleans any previous data, remove to keep data upon restarts +./devel/standard/start.sh -c +``` ## Using as a library @@ -41,7 +52,6 @@ For chains that implement "firehose block filters" and extensions like "eth_call * [firehose-ethereum](https://github.com/streamingfast/firehose-ethereum) * [firehose-antelope](https://github.com/pinax-network/firehose-antelope) - ### Philosophy Firehose maintenance cost comes from two sides. First, there is the chain integration that needs to be maintained. This is done within the chain's code directly by the chain's core developers. The second side of things is the maintenance of the Golang part of the Firehose stack. diff --git a/blockpoller/poller.go b/blockpoller/poller.go index dbbdec1..a62244b 100644 --- a/blockpoller/poller.go +++ b/blockpoller/poller.go @@ -349,7 +349,7 @@ func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream } if skipped { - return nil, fmt.Errorf("block %d was skipped and sould not have been requested", blkNum) + return nil, fmt.Errorf("block %d was skipped and should not have been requested", blkNum) } if p.forceFinalityAfterBlocks != nil { diff --git a/chain.go b/chain.go index 789c6e5..ae83714 100644 --- a/chain.go +++ b/chain.go @@ -6,6 +6,7 @@ import ( "runtime/debug" "strings" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "github.com/streamingfast/substreams/wasm" "github.com/spf13/cobra" @@ -154,6 +155,10 @@ type Chain[B Block] struct { DefaultBlockType string RegisterSubstreamsExtensions func() (wasm.WASMExtensioner, error) + + // InfoResponseFiller is a function that fills the `pbfirehose.InfoResponse` from the first streamable block of the chain. + // It can validate that we are on the right chain by checking against a known hash, or populate missing fields. + InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error } type ToolsConfig[B Block] struct { @@ -261,6 +266,10 @@ func (c *Chain[B]) Validate() { err = multierr.Append(err, fmt.Errorf("field 'BlockIndexerFactories' must have at most one element")) } + if c.InfoResponseFiller == nil { + err = multierr.Append(err, fmt.Errorf("field 'InfoResponseFiller' must be set")) + } + for key, indexerFactory := range c.BlockIndexerFactories { if indexerFactory == nil { err = multierr.Append(err, fmt.Errorf("entry %q for field 'BlockIndexerFactories' must be non-nil", key)) diff --git a/cmd/apps/firehose.go b/cmd/apps/firehose.go index 4585299..de38dee 100644 --- a/cmd/apps/firehose.go +++ b/cmd/apps/firehose.go @@ -101,6 +101,7 @@ func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *za HeadBlockNumberMetric: headBlockNumMetric, TransformRegistry: registry, CheckPendingShutdown: runtime.IsPendingShutdown, + InfoServer: runtime.InfoServer, }), nil }, }) diff --git a/cmd/apps/start.go b/cmd/apps/start.go index b191290..5f354a5 100644 --- a/cmd/apps/start.go +++ b/cmd/apps/start.go @@ -26,7 +26,9 @@ import ( "github.com/streamingfast/cli/sflags" "github.com/streamingfast/dmetering" firecore "github.com/streamingfast/firehose-core" + info "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/firehose-core/launcher" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" tracing "github.com/streamingfast/sf-tracing" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -45,7 +47,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st configFile := sflags.MustGetString(cmd, "config-file") rootLog.Info(fmt.Sprintf("starting Firehose on %s with config file '%s'", chain.LongName, configFile)) - err = start(cmd, dataDir, args, rootLog) + err = start(cmd, dataDir, args, chain, rootLog) if err != nil { return fmt.Errorf("unable to launch: %w", err) } @@ -55,7 +57,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st } } -func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logger) (err error) { +func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string, chain *firecore.Chain[B], rootLog *zap.Logger) (err error) { dataDirAbs, err := filepath.Abs(dataDir) if err != nil { return fmt.Errorf("unable to setup directory structure: %w", err) @@ -82,7 +84,32 @@ func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logge }() dmetering.SetDefaultEmitter(eventEmitter) - launch := launcher.NewLauncher(rootLog, dataDirAbs) + blockIDEncoding := pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET + if enc := sflags.MustGetString(cmd, "advertise-block-id-encoding"); enc != "" { + v, found := pbfirehose.InfoResponse_BlockIdEncoding_value[enc] + if !found { + longCandidate := "BLOCK_ID_ENCODING_" + strings.ToUpper(enc) + v, found = pbfirehose.InfoResponse_BlockIdEncoding_value[longCandidate] + if !found { + return fmt.Errorf("invalid block id encoding: %s", enc) + } + } + + blockIDEncoding = pbfirehose.InfoResponse_BlockIdEncoding(v) + } + + infoServer := info.NewInfoServer( + sflags.MustGetString(cmd, "advertise-chain-name"), + sflags.MustGetStringSlice(cmd, "advertise-chain-aliases"), + blockIDEncoding, + sflags.MustGetStringSlice(cmd, "advertise-block-features"), + bstream.GetProtocolFirstStreamableBlock, + !sflags.MustGetBool(cmd, "ignore-advertise-validation"), + chain.InfoResponseFiller, + rootLog, + ) + + launch := launcher.NewLauncher(rootLog, dataDirAbs, infoServer) rootLog.Debug("launcher created") runByDefault := func(app string) bool { diff --git a/cmd/apps/substreams_tier1.go b/cmd/apps/substreams_tier1.go index 0044fb3..848a61c 100644 --- a/cmd/apps/substreams_tier1.go +++ b/cmd/apps/substreams_tier1.go @@ -120,6 +120,11 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root wasmExtensions = exts } + tmpDir, err := firecore.GetTmpDir(sfDataDir) + if err != nil { + return nil, fmt.Errorf("getting temporary directory: %w", err) + } + return app.NewTier1(appLogger, &app.Tier1Config{ MeteringConfig: GetCommonMeteringPluginValue(), @@ -128,6 +133,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root OneBlocksStoreURL: oneBlocksStoreURL, ForkedBlocksStoreURL: forkedBlocksStoreURL, BlockStreamAddr: blockstreamAddr, + TmpDir: tmpDir, StateStoreURL: stateStoreURL, StateStoreDefaultTag: stateStoreDefaultTag, @@ -150,6 +156,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root HeadTimeDriftMetric: ss1HeadTimeDriftmetric, HeadBlockNumberMetric: ss1HeadBlockNumMetric, CheckPendingShutDown: runtime.IsPendingShutdown, + InfoServer: runtime.InfoServer, }), nil }, }) diff --git a/cmd/apps/substreams_tier2.go b/cmd/apps/substreams_tier2.go index e9e3876..7d4003a 100644 --- a/cmd/apps/substreams_tier2.go +++ b/cmd/apps/substreams_tier2.go @@ -82,6 +82,11 @@ func RegisterSubstreamsTier2App[B firecore.Block](chain *firecore.Chain[B], root wasmExtensions = exts } + tmpDir, err := firecore.GetTmpDir(runtime.AbsDataDir) + if err != nil { + return nil, fmt.Errorf("getting temporary directory: %w", err) + } + return app.NewTier2(appLogger, &app.Tier2Config{ Tracing: tracing, @@ -90,6 +95,7 @@ func RegisterSubstreamsTier2App[B firecore.Block](chain *firecore.Chain[B], root ServiceDiscoveryURL: serviceDiscoveryURL, WASMExtensions: wasmExtensions, BlockExecutionTimeout: executionTimeout, + TmpDir: tmpDir, MaximumConcurrentRequests: maximumConcurrentRequests, }, &app.Tier2Modules{ diff --git a/cmd/firecore/main.go b/cmd/firecore/main.go index 3eb14cf..be062c7 100644 --- a/cmd/firecore/main.go +++ b/cmd/firecore/main.go @@ -4,6 +4,7 @@ import ( pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" firecore "github.com/streamingfast/firehose-core" fhCMD "github.com/streamingfast/firehose-core/cmd" + info "github.com/streamingfast/firehose-core/firehose/info" ) func main() { @@ -17,6 +18,7 @@ func main() { Version: version, BlockFactory: func() firecore.Block { return new(pbbstream.Block) }, ConsoleReaderFactory: firecore.NewConsoleReader, + InfoResponseFiller: info.DefaultInfoResponseFiller, Tools: &firecore.ToolsConfig[*pbbstream.Block]{}, }) } diff --git a/cmd/main.go b/cmd/main.go index 7c0793d..833b45a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -23,6 +23,7 @@ import ( "github.com/streamingfast/firehose-core/cmd/tools" "github.com/streamingfast/firehose-core/launcher" paymentGatewayMetering "github.com/streamingfast/payment-gateway/metering" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "github.com/streamingfast/logging" "go.uber.org/zap" @@ -72,7 +73,7 @@ func Main[B firecore.Block](chain *firecore.Chain[B]) { flags.StringP("config-file", "c", "./firehose.yaml", "Configuration file to use. No config file loaded if set to an empty string.") flags.String("log-format", "text", "Format for logging to stdout. Either 'text' or 'stackdriver'") - flags.Bool("log-to-file", true, "Also write logs to {data-dir}/firehose.log.json ") + flags.Bool("log-to-file", true, fmt.Sprintf("Also write logs to {data-dir}/%s ", launcher.DefaultLogFile)) flags.String("log-level-switcher-listen-addr", "localhost:1065", cli.FlagDescription(` If non-empty, a JSON based HTTP server will listen on this address to let you switch the default logging level of all registered loggers to a different one on the fly. This enables switching to debug level on @@ -170,9 +171,31 @@ func registerCommonFlags[B firecore.Block](chain *firecore.Chain[B]) { cmd.Flags().String("common-merged-blocks-store-url", firecore.MergedBlocksStoreURL, "[COMMON] Store URL where to read/write merged blocks.") cmd.Flags().String("common-forked-blocks-store-url", firecore.ForkedBlocksStoreURL, "[COMMON] Store URL where to read/write forked block files that we want to keep.") cmd.Flags().String("common-live-blocks-addr", firecore.RelayerServingAddr, "[COMMON] gRPC endpoint to get real-time blocks.") + cmd.Flags().String("common-tmp-dir", firecore.TmpDir, "[COMMON] Local directory to store temporary files") + + cmd.Flags().String("advertise-chain-name", "", "[firehose,substreams-tier1] Chain name to advertise in the Info Endpoint. Required but it may be inferred from the genesis blocks.") + cmd.Flags().StringSlice("advertise-chain-aliases", nil, "[firehose,substreams-tier1] List of chain name aliases to advertise in the Info Endpoint. If unset, it may be inferred from the genesis blocks.") + cmd.Flags().StringSlice("advertise-block-features", nil, "[firehose,substreams-tier1] List of block features to advertise in the Info Endpoint. If unset, it may be inferred from the genesis block.") + cmd.Flags().Bool("ignore-advertise-validation", false, "[firehose,substreams-tier1] When true, runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail.") + + acceptedEncodings := make([]string, len(pbfirehose.InfoResponse_BlockIdEncoding_value)-1) + i := 0 + for encoding := range pbfirehose.InfoResponse_BlockIdEncoding_value { + if encoding != "BLOCK_ID_ENCODING_UNSET" { + acceptedEncodings[i] = strings.ToLower(strings.Replace(encoding, "BLOCK_ID_ENCODING_", "", 1)) + i++ + } + } + cmd.Flags().String("advertise-block-id-encoding", "", FlagMultilineDescription(` + [firehose,substreams-tier1] Block ID encoding type to advertise in the Info Endpoint. If unset, it may be inferred + from the genesis block. Accepted encodings are: %s. + + Previous versions accepted 'BLOCK_ID_ENCODING_' as well, this still works but we + recommend using the short form lowercase version. + `, strings.Join(acceptedEncodings, ", "))) cmd.Flags().String("common-index-store-url", firecore.IndexStoreURL, "[COMMON] Store URL where to read/write index files (if used on the chain).") - cmd.Flags().IntSlice("common-index-block-sizes", []int{100000, 10000, 1000, 100}, "Index bundle sizes that that are considered valid when looking for block indexes") + cmd.Flags().IntSlice("common-index-block-sizes", []int{100000, 10000, 1000, 100}, "[COMMON] Index bundle sizes that that are considered valid when looking for block indexes") cmd.Flags().Bool("common-blocks-cache-enabled", false, cli.FlagDescription(` [COMMON] Use a disk cache to store the blocks data to disk and instead of keeping it in RAM. By enabling this, block's Protobuf content, in bytes, @@ -235,3 +258,7 @@ Additional help topics:{{range .Commands}}{{if .IsAdditionalHelpTopicCommand}} Use "{{.CommandPath}} [command] --help" for more information about a command.{{end}} ` + +func FlagMultilineDescription(input string, args ...any) string { + return cli.Dedent(fmt.Sprintf(input, args...)) +} diff --git a/cmd/setup.go b/cmd/setup.go index a609703..bfbbf39 100644 --- a/cmd/setup.go +++ b/cmd/setup.go @@ -51,6 +51,10 @@ func setupCmd(cmd *cobra.Command, binaryName string) error { return fmt.Errorf("invalid flag %s in config file under command %s", k, subCommand) } + // Keep compatibility with config files, allow empty value to unset the flag + if v == nil { + v = "" + } viper.SetDefault(flag.viperKey, v) // For root command, we want to keep compatibility for `viper.GetXXX("global-")` to work with config loaded value diff --git a/cmd/tools/mergeblock/tools_merge_blocks.go b/cmd/tools/mergeblock/tools_merge_blocks.go index 29a2965..95d1aa4 100644 --- a/cmd/tools/mergeblock/tools_merge_blocks.go +++ b/cmd/tools/mergeblock/tools_merge_blocks.go @@ -24,8 +24,6 @@ func NewToolsMergeBlocksCmd[B firecore.Block](chain *firecore.Chain[B], zlog *za RunE: runMergeBlocksE(zlog), } - cmd.Flags().String("force-block-type", "", "When set, will force the block type to the given value.") - return cmd } diff --git a/cmd/tools/mergeblock/tools_upgrade_merged_blocks.go b/cmd/tools/mergeblock/tools_upgrade_merged_blocks.go index ad476a6..ca9e629 100644 --- a/cmd/tools/mergeblock/tools_upgrade_merged_blocks.go +++ b/cmd/tools/mergeblock/tools_upgrade_merged_blocks.go @@ -54,6 +54,7 @@ func getMergedBlockUpgrader(tweakFunc func(block *pbbstream.Block) (*pbbstream.B LowBlockNum: firecore.LowBoundary(start), StopBlockNum: stop, TweakBlock: tweakFunc, + Logger: rootLog, } stream := stream.New(nil, sourceStore, nil, int64(start), writer, stream.WithFinalBlocksOnly()) diff --git a/cmd/tools/print/tools_print.go b/cmd/tools/print/tools_print.go index e3b9578..f7c403f 100644 --- a/cmd/tools/print/tools_print.go +++ b/cmd/tools/print/tools_print.go @@ -54,7 +54,7 @@ func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B]) *cobra.Command toolsPrintCmd.AddCommand(toolsPrintMergedBlocksCmd) toolsPrintCmd.PersistentFlags().StringP("output", "o", "text", "Output mode for block printing, either 'text', 'json' or 'jsonl'") - toolsPrintCmd.PersistentFlags().String("bytes-encoding", "hex", "Encoding for bytes fields, either 'hex' or 'base58'") + toolsPrintCmd.PersistentFlags().String("bytes-encoding", "hex", "Encoding for bytes fields, either 'hex', 'base58' or 'base64'") toolsPrintCmd.PersistentFlags().StringSlice("proto-paths", []string{""}, "Paths to proto files to use for dynamic decoding of blocks") toolsPrintCmd.PersistentFlags().Bool("transactions", false, "When in 'text' output mode, also print transactions summary") @@ -281,9 +281,14 @@ func SetupJsonMarshaller(cmd *cobra.Command, chainFileDescriptor protoreflect.Fi var options []fcjson.MarshallerOption bytesEncoding := sflags.MustGetString(cmd, "bytes-encoding") + if bytesEncoding == "base58" { options = append(options, fcjson.WithBytesEncoderFunc(fcjson.ToBase58)) } + if bytesEncoding == "base64" { + options = append(options, fcjson.WithBytesEncoderFunc(fcjson.ToBase64)) + } + return fcjson.NewMarshaller(registry, options...), nil } diff --git a/constants.go b/constants.go index 026e4e1..259a5e0 100644 --- a/constants.go +++ b/constants.go @@ -23,4 +23,5 @@ var ( OneBlockStoreURL string = "file://{data-dir}/storage/one-blocks" ForkedBlocksStoreURL string = "file://{data-dir}/storage/forked-blocks" IndexStoreURL string = "file://{data-dir}/storage/index" + TmpDir string = "{data-dir}/tmp" ) diff --git a/devel/standard/standard.yaml b/devel/standard/standard.yaml index caf133a..e83984d 100644 --- a/devel/standard/standard.yaml +++ b/devel/standard/standard.yaml @@ -7,6 +7,8 @@ start: - substreams-tier1 - substreams-tier2 flags: + advertise-block-id-encoding: "hex" + advertise-chain-name: "acme-dummy-blockchain" # Specifies the path to the binary, we assume you did # `go install github.com/streamingfast/dummy-blockchain@latest` (and that you have value # of `go env GOPATH` in your environment). diff --git a/firehose/app/firehose/app.go b/firehose/app/firehose/app.go index aa02b52..7474a78 100644 --- a/firehose/app/firehose/app.go +++ b/firehose/app/firehose/app.go @@ -31,6 +31,7 @@ import ( "github.com/streamingfast/dstore" firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-core/firehose" + "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/firehose-core/firehose/metrics" "github.com/streamingfast/firehose-core/firehose/server" "github.com/streamingfast/logging" @@ -64,6 +65,7 @@ type Modules struct { TransformRegistry *transform.Registry RegisterServiceExtension RegisterServiceExtensionFunc CheckPendingShutdown func() bool + InfoServer *info.InfoServer } type App struct { @@ -158,6 +160,7 @@ func (a *App) Run() error { a.IsReady, a.config.GRPCListenAddr, a.config.ServiceDiscoveryURL, + a.modules.InfoServer, a.config.ServerOptions..., ) @@ -186,6 +189,12 @@ func (a *App) Run() error { } } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + if err := a.modules.InfoServer.Init(ctx, forkableHub, mergedBlocksStore, oneBlocksStore, a.logger); err != nil { + a.Shutdown(fmt.Errorf("cannot initialize info server: %w", err)) + } + a.logger.Info("launching gRPC firehoseServer", zap.Bool("live_support", withLive)) a.isReady.CAS(false, true) firehoseServer.Launch() diff --git a/firehose/block_getter.go b/firehose/block_getter.go index 6ac564b..11c67e9 100644 --- a/firehose/block_getter.go +++ b/firehose/block_getter.go @@ -11,6 +11,7 @@ import ( "github.com/streamingfast/derr" "github.com/streamingfast/dmetering" "github.com/streamingfast/dstore" + "github.com/streamingfast/firehose-core/metering" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -59,10 +60,12 @@ func (g *BlockGetter) Get( mergedBlocksStore := g.mergedBlocksStore if clonable, ok := mergedBlocksStore.(dstore.Clonable); ok { var err error - mergedBlocksStore, err = clonable.Clone(ctx) + mergedBlocksStore, err = clonable.Clone(ctx, metering.WithBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this mergedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } @@ -91,10 +94,12 @@ func (g *BlockGetter) Get( forkedBlocksStore := g.forkedBlocksStore if clonable, ok := forkedBlocksStore.(dstore.Clonable); ok { var err error - forkedBlocksStore, err = clonable.Clone(ctx) + forkedBlocksStore, err = clonable.Clone(ctx, metering.WithForkedBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this forkedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } diff --git a/firehose/info/endpoint_info.go b/firehose/info/endpoint_info.go new file mode 100644 index 0000000..e57ab9b --- /dev/null +++ b/firehose/info/endpoint_info.go @@ -0,0 +1,234 @@ +package info + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/streamingfast/bstream" + "github.com/streamingfast/bstream/hub" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dstore" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type InfoServer struct { + sync.Mutex + + validate bool + responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error + response *pbfirehose.InfoResponse + ready chan struct{} + initDone bool + initError error + logger *zap.Logger +} + +func (s *InfoServer) Info(ctx context.Context, request *pbfirehose.InfoRequest) (*pbfirehose.InfoResponse, error) { + select { + case <-s.ready: + return s.response, nil + default: + return nil, fmt.Errorf("info server not ready") + } +} + +func NewInfoServer( + chainName string, + chainNameAliases []string, + blockIDEncoding pbfirehose.InfoResponse_BlockIdEncoding, + blockFeatures []string, + firstStreamableBlock uint64, + validate bool, + responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error, + logger *zap.Logger, +) *InfoServer { + + resp := &pbfirehose.InfoResponse{ + ChainName: chainName, + ChainNameAliases: chainNameAliases, + BlockIdEncoding: blockIDEncoding, + BlockFeatures: blockFeatures, + FirstStreamableBlockNum: firstStreamableBlock, + } + + return &InfoServer{ + responseFiller: responseFiller, + response: resp, + validate: validate, + ready: make(chan struct{}), + logger: logger, + } +} + +func validateInfoResponse(resp *pbfirehose.InfoResponse) error { + switch { + case resp.ChainName == "": + return fmt.Errorf("chain name is not set") + case resp.BlockIdEncoding == pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET: + return fmt.Errorf("block id encoding is not set") + case resp.FirstStreamableBlockId == "": + return fmt.Errorf("first streamable block id is not set") + } + + return nil +} + +// multiple apps (firehose, substreams...) can initialize the same server, we only need one +func (s *InfoServer) Init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) error { + s.Lock() + defer func() { + s.initDone = true + s.Unlock() + }() + + if s.initDone { + return s.initError + } + + if err := s.init(ctx, fhub, mergedBlocksStore, oneBlockStore, logger); err != nil { + s.initError = err + return err + } + + return nil +} + +func (s *InfoServer) getBlockFromMergedBlocksStore(ctx context.Context, blockNum uint64, mergedBlocksStore dstore.Store) *pbbstream.Block { + for { + if ctx.Err() != nil { + return nil + } + + block, err := bstream.FetchBlockFromMergedBlocksStore(ctx, blockNum, mergedBlocksStore) + if err != nil { + time.Sleep(time.Millisecond * 500) + continue + } + return block + } +} + +func (s *InfoServer) getBlockFromForkableHub(ctx context.Context, blockNum uint64, forkableHub *hub.ForkableHub) *pbbstream.Block { + for { + if ctx.Err() != nil { + return nil + } + + block := forkableHub.GetBlock(s.response.FirstStreamableBlockNum, "") + if block == nil { + time.Sleep(time.Millisecond * 500) + continue + } + return block + } + +} + +func (s *InfoServer) getBlockFromOneBlockStore(ctx context.Context, blockNum uint64, oneBlockStore dstore.Store) *pbbstream.Block { + for { + if ctx.Err() != nil { + return nil + } + + block, err := bstream.FetchBlockFromOneBlockStore(ctx, blockNum, "", oneBlockStore) + if err != nil { + time.Sleep(time.Millisecond * 500) + continue + } + return block + } +} + +// init tries to fetch the first streamable block from the different sources and fills the response with it +// returns an error if it is incomplete +// it can be called only once +func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) error { + ctx, cancel := context.WithCancel(ctx) + // cancel is later and depends on s.validate + + ch := make(chan *pbbstream.Block) + + if fhub != nil { + go func() { + select { + case ch <- s.getBlockFromForkableHub(ctx, s.response.FirstStreamableBlockNum, fhub): + case <-ctx.Done(): + } + }() + } + + go func() { + select { + case ch <- s.getBlockFromMergedBlocksStore(ctx, s.response.FirstStreamableBlockNum, mergedBlocksStore): + case <-ctx.Done(): + } + }() + + go func() { + select { + case ch <- s.getBlockFromOneBlockStore(ctx, s.response.FirstStreamableBlockNum, oneBlockStore): + case <-ctx.Done(): + } + }() + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Second): + loglevel := zapcore.WarnLevel + if !s.validate { + loglevel = zapcore.DebugLevel + } + logger.Log(loglevel, "waiting to read the first_streamable_block before starting firehose/substreams endpoints", + zap.Uint64("first_streamable_block", s.response.FirstStreamableBlockNum), + zap.Stringer("merged_blocks_store", mergedBlocksStore.BaseURL()), + zap.Stringer("one_block_store", oneBlockStore.BaseURL()), + ) + } + } + }() + + if !s.validate { + // in this case we don't wait for an answer, but we still try to fill the response + go func() { + defer cancel() + select { + case blk := <-ch: + if err := s.responseFiller(blk, s.response, s.validate); err != nil { + logger.Warn("unable to fill and validate info response", zap.Error(err)) + } + case <-ctx.Done(): + } + if err := validateInfoResponse(s.response); err != nil { + logger.Warn("info response", zap.Error(err)) + } + close(s.ready) + cancel() + }() + + return nil + } + defer cancel() + + select { + case blk := <-ch: + if err := s.responseFiller(blk, s.response, s.validate); err != nil { + return fmt.Errorf("%w -- use --ignore-advertise-validation to skip these checks", err) + } + case <-ctx.Done(): + return fmt.Errorf("%w: no block found for first streamable block %d in the stores or on live source -- use --ignore-advertise-validation to skip these checks", ctx.Err(), s.response.FirstStreamableBlockNum) + } + + if err := validateInfoResponse(s.response); err != nil { + return err + } + + close(s.ready) + return nil +} diff --git a/firehose/info/info_filler.go b/firehose/info/info_filler.go new file mode 100644 index 0000000..3ac8c54 --- /dev/null +++ b/firehose/info/info_filler.go @@ -0,0 +1,70 @@ +package info + +import ( + "fmt" + "strings" + + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + wellknown "github.com/streamingfast/firehose-core/well-known" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" +) + +var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error { + resp.FirstStreamableBlockId = firstStreamableBlock.Id + + shortTypeURL := strings.TrimPrefix(firstStreamableBlock.Payload.TypeUrl, "type.googleapis.com/") + for _, protocol := range wellknown.WellKnownProtocols { + if protocol.BlockType == shortTypeURL { + resp.BlockIdEncoding = protocol.BytesEncoding + break + } + } + + if !validate { + if resp.ChainName == "" { + // still try to fill the chain name if it is not given + if chain := wellknown.WellKnownProtocols.ChainByGenesisBlock(firstStreamableBlock.Number, firstStreamableBlock.Id); chain != nil { + resp.ChainName = chain.Name + resp.ChainNameAliases = chain.Aliases + } + } + return nil + } + + if resp.ChainName != "" { + if chain := wellknown.WellKnownProtocols.ChainByName(resp.ChainName); chain != nil { + if firstStreamableBlock.Number == chain.GenesisBlockNumber && chain.GenesisBlockID != firstStreamableBlock.Id { // we don't check if the firstStreamableBlock is something other than our well-known genesis block + return fmt.Errorf("chain name defined in flag: %q inconsistent with the genesis block ID %q (expected: %q)", resp.ChainName, ox(firstStreamableBlock.Id), ox(chain.GenesisBlockID)) + } + resp.ChainName = chain.Name // ensure we use the canonical name if the user provided one of the aliases + resp.ChainNameAliases = chain.Aliases + } else if chain := wellknown.WellKnownProtocols.ChainByGenesisBlock(firstStreamableBlock.Number, firstStreamableBlock.Id); chain != nil { + return fmt.Errorf("chain name defined in flag: %q inconsistent with the one discovered from genesis block %q", resp.ChainName, chain.Name) + } + } else { + if chain := wellknown.WellKnownProtocols.ChainByGenesisBlock(firstStreamableBlock.Number, firstStreamableBlock.Id); chain != nil { + resp.ChainName = chain.Name + resp.ChainNameAliases = chain.Aliases + } + } + + // Extra validation for ethereum blocks + if firstStreamableBlock.Payload.TypeUrl == "type.googleapis.com/sf.ethereum.type.v2.Block" { + var seenDetailLevel bool + for _, feature := range resp.BlockFeatures { + if feature == "base" || feature == "extended" || feature == "hybrid" { + seenDetailLevel = true + break + } + } + if !seenDetailLevel { + return fmt.Errorf("ethereum blocks are used without setting detail level in 'advertise-block-features': expected one of 'base', 'extended' or 'hybrid' (or use 'firehose-ethereum' binary instead to serve this chain and get automatic detection/validation)") + } + } + + return nil +} + +func ox(s string) string { + return "0x" + s +} diff --git a/firehose/server/blocks.go b/firehose/server/blocks.go index b73da6c..7e57fe9 100644 --- a/firehose/server/blocks.go +++ b/firehose/server/blocks.go @@ -14,6 +14,7 @@ import ( "github.com/streamingfast/dauth" "github.com/streamingfast/dmetering" "github.com/streamingfast/firehose-core/firehose/metrics" + "github.com/streamingfast/firehose-core/metering" "github.com/streamingfast/logging" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" @@ -66,29 +67,9 @@ func (s *Server) Block(ctx context.Context, request *pbfirehose.SingleBlockReque }, } - ////////////////////////////////////////////////////////////////////// meter := dmetering.GetBytesMeter(ctx) - bytesRead := meter.BytesReadDelta() - bytesWritten := meter.BytesWrittenDelta() - size := proto.Size(resp) - auth := dauth.FromContext(ctx) - event := dmetering.Event{ - UserID: auth.UserID(), - ApiKeyID: auth.APIKeyID(), - IpAddress: auth.RealIP(), - Meta: auth.Meta(), - Endpoint: "sf.firehose.v2.Firehose/Block", - Metrics: map[string]float64{ - "egress_bytes": float64(size), - "written_bytes": float64(bytesWritten), - "read_bytes": float64(bytesRead), - "block_count": 1, - }, - Timestamp: time.Now(), - } - dmetering.Emit(ctx, event) - ////////////////////////////////////////////////////////////////////// + metering.Send(ctx, meter, auth.UserID(), auth.APIKeyID(), auth.RealIP(), auth.Meta(), "sf.firehose.v2.Firehose/Block", resp) return resp, nil } @@ -127,10 +108,6 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream } } - isLiveBlock := func(step pbfirehose.ForkStep) bool { - return step == pbfirehose.ForkStep_STEP_NEW - } - var blockCount uint64 handlerFunc := bstream.HandlerFunc(func(block *pbbstream.Block, obj interface{}) error { blockCount++ @@ -188,10 +165,6 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream return NewErrSendBlock(err) } - if isLiveBlock(protoStep) { - dmetering.GetBytesMeter(ctx).AddBytesRead(len(block.Payload.Value)) - } - level := zap.DebugLevel if block.Number%200 == 0 { level = zap.InfoLevel @@ -206,8 +179,45 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream return status.Errorf(codes.Unimplemented, "no transforms registry configured within this instance") } + liveSourceMiddlewareHandler := func(next bstream.Handler) bstream.Handler { + return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { + if stepable, ok := obj.(bstream.Stepable); ok { + if stepable.Step().Matches(bstream.StepNew) { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterLiveUncompressedReadBytes, len(blk.GetPayload().GetValue())) + + // legacy metering + // todo(colin): remove this once we are sure the new metering is working + dmetering.GetBytesMeter(ctx).AddBytesRead(len(blk.GetPayload().GetValue())) + } else { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterLiveUncompressedReadForkedBytes, len(blk.GetPayload().GetValue())) + } + } + return next.ProcessBlock(blk, obj) + }) + } + + fileSourceMiddlewareHandler := func(next bstream.Handler) bstream.Handler { + return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { + if stepable, ok := obj.(bstream.Stepable); ok { + if stepable.Step().Matches(bstream.StepNew) { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadBytes, len(blk.GetPayload().GetValue())) + } else { + dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadForkedBytes, len(blk.GetPayload().GetValue())) + } + } + return next.ProcessBlock(blk, obj) + }) + } + ctx = s.initFunc(ctx, request) - str, err := s.streamFactory.New(ctx, handlerFunc, request, logger) + str, err := s.streamFactory.New( + ctx, + handlerFunc, + request, + logger, + stream.WithLiveSourceHandlerMiddleware(liveSourceMiddlewareHandler), + stream.WithFileSourceHandlerMiddleware(fileSourceMiddlewareHandler), + ) if err != nil { return err } diff --git a/firehose/server/server.go b/firehose/server/server.go index 0a9a208..1b494a4 100644 --- a/firehose/server/server.go +++ b/firehose/server/server.go @@ -16,7 +16,9 @@ import ( "github.com/streamingfast/dmetrics" firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-core/firehose" + "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/firehose-core/firehose/rate" + "github.com/streamingfast/firehose-core/metering" pbfirehoseV1 "github.com/streamingfast/pbgo/sf/firehose/v1" pbfirehoseV2 "github.com/streamingfast/pbgo/sf/firehose/v2" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -61,44 +63,23 @@ func New( isReady func(context.Context) bool, listenAddr string, serviceDiscoveryURL *url.URL, + infoServer *info.InfoServer, opts ...Option, ) *Server { initFunc := func(ctx context.Context, _ *pbfirehoseV2.Request) context.Context { - ////////////////////////////////////////////////////////////////////// ctx = dmetering.WithBytesMeter(ctx) ctx = withRequestMeter(ctx) return ctx - ////////////////////////////////////////////////////////////////////// } postHookFunc := func(ctx context.Context, response *pbfirehoseV2.Response) { - ////////////////////////////////////////////////////////////////////// - meter := dmetering.GetBytesMeter(ctx) - bytesRead := meter.BytesReadDelta() - bytesWritten := meter.BytesWrittenDelta() - size := proto.Size(response) - - auth := dauth.FromContext(ctx) - event := dmetering.Event{ - UserID: auth.UserID(), - ApiKeyID: auth.APIKeyID(), - IpAddress: auth.RealIP(), - Meta: auth.Meta(), - Endpoint: "sf.firehose.v2.Firehose/Blocks", - Metrics: map[string]float64{ - "egress_bytes": float64(size), - "written_bytes": float64(bytesWritten), - "read_bytes": float64(bytesRead), - "block_count": 1, - }, - Timestamp: time.Now(), - } - requestMeter := getRequestMeter(ctx) requestMeter.blocks++ - requestMeter.egressBytes += size - dmetering.Emit(ctx, event) - ////////////////////////////////////////////////////////////////////// + requestMeter.egressBytes += proto.Size(response) + + meter := dmetering.GetBytesMeter(ctx) + auth := dauth.FromContext(ctx) + metering.Send(ctx, meter, auth.UserID(), auth.APIKeyID(), auth.RealIP(), auth.Meta(), "sf.firehose.v2.Firehose/Block", response) } tracerProvider := otel.GetTracerProvider() @@ -140,6 +121,7 @@ func New( if blockGetter != nil { pbfirehoseV2.RegisterFetchServer(gs, s) } + pbfirehoseV2.RegisterEndpointInfoServer(gs, infoServer) pbfirehoseV2.RegisterStreamServer(gs, s) pbfirehoseV1.RegisterStreamServer(gs, NewFirehoseProxyV1ToV2(s)) // compatibility with firehose }) diff --git a/go.mod b/go.mod index 63ef618..dea2a03 100644 --- a/go.mod +++ b/go.mod @@ -17,27 +17,27 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 - github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf + github.com/streamingfast/bstream v0.0.2-0.20240916154503-c9c5c8bbeca0 github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 - github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1 + github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 - github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb + github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 - github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d + github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 - github.com/streamingfast/substreams v1.9.4-0.20240801142150-9da72c6fa000 + github.com/streamingfast/substreams v1.10.5 github.com/stretchr/testify v1.8.4 github.com/test-go/testify v1.1.4 go.uber.org/multierr v1.10.0 go.uber.org/zap v1.26.0 - golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 + golang.org/x/exp v0.0.0-20231006140011-7918f672742d google.golang.org/grpc v1.64.0 google.golang.org/protobuf v1.33.0 ) @@ -160,7 +160,7 @@ require ( github.com/streamingfast/shutter v1.5.0 github.com/subosito/gotenv v1.4.2 // indirect github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect - github.com/tetratelabs/wazero v1.7.1 // indirect + github.com/tetratelabs/wazero v1.8.0 // indirect github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 // indirect go.opencensus.io v0.24.0 go.opentelemetry.io/contrib/detectors/gcp v1.9.0 // indirect @@ -174,13 +174,13 @@ require ( go.uber.org/atomic v1.10.0 go.uber.org/automaxprocs v1.5.1 golang.org/x/crypto v0.23.0 // indirect - golang.org/x/mod v0.12.0 // indirect - golang.org/x/net v0.22.0 // indirect + golang.org/x/mod v0.17.0 // indirect + golang.org/x/net v0.23.0 // indirect golang.org/x/oauth2 v0.18.0 - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.20.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.24.0 // indirect golang.org/x/term v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/api v0.172.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/go.sum b/go.sum index 308a87d..3e635cf 100644 --- a/go.sum +++ b/go.sum @@ -441,8 +441,8 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI= github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= -github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= -github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0= +github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= @@ -537,8 +537,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU= github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= -github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf h1:LXFIz2pyTlIMNzyifvKsZpFLcLbJkTcRyu7OlABV1S0= -github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg= +github.com/streamingfast/bstream v0.0.2-0.20240916154503-c9c5c8bbeca0 h1:7qWlxoUY8r/RUOYEH48ZJC1lwIRLiXyOIp2Xwp2TKoE= +github.com/streamingfast/bstream v0.0.2-0.20240916154503-c9c5c8bbeca0/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg= github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d h1:9tsEt2tLCp94CW6MyJZY+Rw6+t0WH2kioBR6ucO6P/E= github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d/go.mod h1:og+6lDBPLZ24lbF/YISmVsSduZUZwXSmJGD3pZ/sW2Y= github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU= @@ -551,12 +551,12 @@ github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a h1:JwAGZ7f5vkB github.com/streamingfast/dgrpc v0.0.0-20240423143010-f36784700c9a/go.mod h1:EPtUX/vhRphE37Zo6sDcgD/S3sm5YqXHhxAgzS6Ebwo= github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 h1:HKi8AIkLBzxZWmbCRUo1RxoOLK33iXO6gZprfsE9rf4= github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4/go.mod h1:ehPytv7E4rI65iLcrwTes4rNGGqPPiugnH+20nDQyp4= -github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1 h1:zPqUBv2dBJ/N898pZ9W+1qDamQjbtdD7cwtwQB8PWTQ= -github.com/streamingfast/dmetering v0.0.0-20240422183130-658027cbb7a1/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= +github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951 h1:6o6MS3JHrp9A7V6EBHbR7W7mzVCFmXc8U0AjTfvz7PI= +github.com/streamingfast/dmetering v0.0.0-20240816165719-51768d3da951/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y= github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw= -github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb h1:tmu8wGiSTzdqk2CnPnI7GywKwepGieqNOQDRKKSiVJg= -github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w= +github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2 h1:BB3VSDl8/OHBSvjqfgufwqr4tD5l7XPjXybDm6uudj4= +github.com/streamingfast/dstore v0.1.1-0.20240826190906-91345d4a31f2/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w= github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1jAL+/gBS7Bh9jyzWaTib6N47m06gZOTUPwQ= github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns= github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 h1:g8eEYbFSykyzIyuxNMmHEUGGUvJE0ivmqZagLDK42gw= @@ -572,8 +572,8 @@ github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef h1:9IVFHR github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef/go.mod h1:cq8CvbZ3ioFmGrHokSAJalS0lC+pVXLKhITScItUGXY= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 h1:bliib3pAObbM+6cKYQFa8axbCY/x6RczQZrOxdM7OZA= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2/go.mod h1:DsnLrpKZ3DIDL6FmYVuxbC44fXvQdY7aCdSLMpbqZ8Q= -github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d h1:rgXXfBFlQ9C8casyay7UL53VSGR6JoUnhqGv4h6lhxM= -github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= +github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb h1:Xqt4ned9ELmQMKcg7cFbm56MKG2gBjnE1M+2HObOs6w= +github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d h1:33VIARqUqBUKXJcuQoOS1rVSms54tgxhhNCmrLptpLg= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d/go.mod h1:aBJivEdekmFWYSQ29EE/fN9IanJWJXbtjy3ky0XD/jE= github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90 h1:94HllkX4ttYVilo8ZJv05b5z8JiMmqBvv4+Jdgk/+2A= @@ -582,8 +582,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4= -github.com/streamingfast/substreams v1.9.4-0.20240801142150-9da72c6fa000 h1:MG9jByDKr0fSljdABFUjEsUYAsOu4uUKifuN1NhXB/0= -github.com/streamingfast/substreams v1.9.4-0.20240801142150-9da72c6fa000/go.mod h1:yPSIRwtAl9/1dB45j82ZsGe8lndRdN8oKy4k/WvHaao= +github.com/streamingfast/substreams v1.10.5 h1:+boRdnIZAgYT/RjYKPwjTsQAtQpZSZQZ604egUA+FZg= +github.com/streamingfast/substreams v1.10.5/go.mod h1:1lUc2GMpbLQ55IAbkMx8aQ0thyg0uJY7MV6TJmaZ2dA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -605,8 +605,8 @@ github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf h1:Z2X3Os7oRzpdJ7 github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0= github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU= -github.com/tetratelabs/wazero v1.7.1 h1:QtSfd6KLc41DIMpDYlJdoMc6k7QTN246DM2+n2Y/Dx8= -github.com/tetratelabs/wazero v1.7.1/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= +github.com/tetratelabs/wazero v1.8.0 h1:iEKu0d4c2Pd+QSRieYbnQC9yiFlMS9D+Jr0LsRmcF4g= +github.com/tetratelabs/wazero v1.8.0/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs= github.com/tsenart/deadcode v0.0.0-20160724212837-210d2dc333e9/go.mod h1:q+QjxYvZ+fpjMXqs+XEriussHjSYqeXVnAdSV1tkMYk= github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 h1:7v7L5lsfw4w8iqBBXETukHo4IPltmD+mWoLRYUmeGN8= github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869/go.mod h1:Rfzr+sqaDreiCaoQbFCu3sTXxeFq/9kXRuyOoSlGQHE= @@ -690,8 +690,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= -golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -718,8 +718,8 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= -golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -763,8 +763,8 @@ golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= -golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= -golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -795,8 +795,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -861,8 +861,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -879,8 +879,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -945,8 +945,8 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= -golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/json/marshallers.go b/json/marshallers.go index daaf399..673d226 100644 --- a/json/marshallers.go +++ b/json/marshallers.go @@ -2,6 +2,7 @@ package json import ( "bytes" + "encoding/base64" "encoding/hex" "fmt" "os" @@ -189,6 +190,10 @@ func ToBase58(encoder *jsontext.Encoder, t []byte, options json.Options) error { return encoder.WriteToken(jsontext.String(base58.Encode(t))) } +func ToBase64(encoder *jsontext.Encoder, t []byte, options json.Options) error { + return encoder.WriteToken(jsontext.String(base64.StdEncoding.EncodeToString(t))) +} + func ToHex(encoder *jsontext.Encoder, t []byte, options json.Options) error { return encoder.WriteToken(jsontext.String(hex.EncodeToString(t))) } diff --git a/launcher/app_enum.go b/launcher/app_enum.go index 2a626f1..7715609 100644 --- a/launcher/app_enum.go +++ b/launcher/app_enum.go @@ -24,6 +24,8 @@ const ( AppStatusStopped ) +var ErrInvalidAppStatus = fmt.Errorf("not a valid AppStatus, try [%s]", strings.Join(_AppStatusNames, ", ")) + const _AppStatusName = "NotFoundCreatedRunningWarningStopped" var _AppStatusNames = []string{ @@ -57,6 +59,13 @@ func (x AppStatus) String() string { return fmt.Sprintf("AppStatus(%d)", x) } +// IsValid provides a quick way to determine if the typed value is +// part of the allowed enumerated values +func (x AppStatus) IsValid() bool { + _, ok := _AppStatusMap[x] + return ok +} + var _AppStatusValue = map[string]AppStatus{ _AppStatusName[0:8]: AppStatusNotFound, _AppStatusName[8:15]: AppStatusCreated, @@ -65,20 +74,20 @@ var _AppStatusValue = map[string]AppStatus{ _AppStatusName[29:36]: AppStatusStopped, } -// ParseAppStatus attempts to convert a string to a AppStatus +// ParseAppStatus attempts to convert a string to a AppStatus. func ParseAppStatus(name string) (AppStatus, error) { if x, ok := _AppStatusValue[name]; ok { return x, nil } - return AppStatus(0), fmt.Errorf("%s is not a valid AppStatus, try [%s]", name, strings.Join(_AppStatusNames, ", ")) + return AppStatus(0), fmt.Errorf("%s is %w", name, ErrInvalidAppStatus) } -// MarshalText implements the text marshaller method +// MarshalText implements the text marshaller method. func (x AppStatus) MarshalText() ([]byte, error) { return []byte(x.String()), nil } -// UnmarshalText implements the text unmarshaller method +// UnmarshalText implements the text unmarshaller method. func (x *AppStatus) UnmarshalText(text []byte) error { name := string(text) tmp, err := ParseAppStatus(name) diff --git a/launcher/config.go b/launcher/config.go index 6f7871e..b74406c 100644 --- a/launcher/config.go +++ b/launcher/config.go @@ -10,8 +10,8 @@ import ( var Config map[string]*CommandConfig type CommandConfig struct { - Args []string `json:"args"` - Flags map[string]string `json:"flags"` + Args []string `json:"args"` + Flags map[string]any `json:"flags"` } // Load reads a YAML config, and sets the global DfuseConfig variable diff --git a/launcher/launcher.go b/launcher/launcher.go index 4c8f086..bffca60 100644 --- a/launcher/launcher.go +++ b/launcher/launcher.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/shutter" "go.uber.org/atomic" "go.uber.org/zap" @@ -42,7 +43,7 @@ type Launcher struct { logger *zap.Logger } -func NewLauncher(logger *zap.Logger, absDataDir string) *Launcher { +func NewLauncher(logger *zap.Logger, absDataDir string, infoServer *info.InfoServer) *Launcher { l := &Launcher{ shutter: shutter.New(), apps: make(map[string]App), @@ -53,6 +54,7 @@ func NewLauncher(logger *zap.Logger, absDataDir string) *Launcher { l.runtime = &Runtime{ AbsDataDir: absDataDir, + InfoServer: infoServer, IsPendingShutdown: func() bool { return l.hasBeenSignaled.Load() }, diff --git a/launcher/logging.go b/launcher/logging.go index 90bca97..6b9653f 100644 --- a/launcher/logging.go +++ b/launcher/logging.go @@ -21,6 +21,8 @@ import ( "go.uber.org/zap" ) +const DefaultLogFile = "app.log.json" + type LoggingOptions struct { WorkingDir string // the folder where the data will be stored, in our case will be used to store the logger Verbosity int // verbosity level @@ -50,7 +52,7 @@ func SetupLogger(rootLogger *zap.Logger, opts *LoggingOptions) { } if opts.LogToFile { - options = append(options, logging.WithOutputToFile(filepath.Join(opts.WorkingDir, "app.log.json"))) + options = append(options, logging.WithOutputToFile(filepath.Join(opts.WorkingDir, DefaultLogFile))) } logging.InstantiateLoggers(options...) diff --git a/launcher/runtime.go b/launcher/runtime.go index a5df1b3..271189e 100644 --- a/launcher/runtime.go +++ b/launcher/runtime.go @@ -1,7 +1,10 @@ package launcher +import "github.com/streamingfast/firehose-core/firehose/info" + type Runtime struct { AbsDataDir string + InfoServer *info.InfoServer // IsPendingShutdown is a function that is going to return true as soon as the initial SIGINT signal is // received which can be used to turn a healthz monitor as unhealthy so that a load balancer can diff --git a/metering/metering.go b/metering/metering.go new file mode 100644 index 0000000..6155c85 --- /dev/null +++ b/metering/metering.go @@ -0,0 +1,90 @@ +package metering + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/streamingfast/dstore" + + "github.com/streamingfast/dmetering" + "github.com/streamingfast/substreams/reqctx" + "google.golang.org/protobuf/proto" +) + +const ( + MeterLiveUncompressedReadBytes = "live_uncompressed_read_bytes" + MeterLiveUncompressedReadForkedBytes = "live_uncompressed_read_forked_bytes" + + MeterFileUncompressedReadBytes = "file_uncompressed_read_bytes" + MeterFileUncompressedReadForkedBytes = "file_uncompressed_read_forked_bytes" + MeterFileCompressedReadForkedBytes = "file_compressed_read_forked_bytes" + MeterFileCompressedReadBytes = "file_compressed_read_bytes" + + TotalReadBytes = "total_read_bytes" +) + +func WithBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option { + return []dstore.Option{dstore.WithCompressedReadCallback(func(ctx context.Context, n int) { + meter.CountInc(MeterFileCompressedReadBytes, n) + })} +} + +func WithForkedBlockBytesReadMeteringOptions(meter dmetering.Meter, logger *zap.Logger) []dstore.Option { + return []dstore.Option{dstore.WithCompressedReadCallback(func(ctx context.Context, n int) { + meter.CountInc(MeterFileCompressedReadForkedBytes, n) + })} +} + +func GetTotalBytesRead(meter dmetering.Meter) uint64 { + total := uint64(meter.GetCount(TotalReadBytes)) + return total +} + +func Send(ctx context.Context, meter dmetering.Meter, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) { + bytesRead := meter.BytesReadDelta() + bytesWritten := meter.BytesWrittenDelta() + egressBytes := proto.Size(resp) + + liveUncompressedReadBytes := meter.GetCountAndReset(MeterLiveUncompressedReadBytes) + liveUncompressedReadForkedBytes := meter.GetCountAndReset(MeterLiveUncompressedReadForkedBytes) + + fileUncompressedReadBytes := meter.GetCountAndReset(MeterFileUncompressedReadBytes) + fileUncompressedReadForkedBytes := meter.GetCountAndReset(MeterFileUncompressedReadForkedBytes) + fileCompressedReadForkedBytes := meter.GetCountAndReset(MeterFileCompressedReadForkedBytes) + fileCompressedReadBytes := meter.GetCountAndReset(MeterFileCompressedReadBytes) + + totalReadBytes := fileCompressedReadBytes + fileCompressedReadForkedBytes + liveUncompressedReadBytes + liveUncompressedReadForkedBytes + + meter.CountInc(TotalReadBytes, int(totalReadBytes)) + + event := dmetering.Event{ + UserID: userID, + ApiKeyID: apiKeyID, + IpAddress: ip, + Meta: userMeta, + + Endpoint: endpoint, + Metrics: map[string]float64{ + "egress_bytes": float64(egressBytes), + "written_bytes": float64(bytesWritten), + "read_bytes": float64(bytesRead), + MeterLiveUncompressedReadBytes: float64(liveUncompressedReadBytes), + MeterLiveUncompressedReadForkedBytes: float64(liveUncompressedReadForkedBytes), + MeterFileUncompressedReadBytes: float64(fileUncompressedReadBytes), + MeterFileUncompressedReadForkedBytes: float64(fileUncompressedReadForkedBytes), + MeterFileCompressedReadForkedBytes: float64(fileCompressedReadForkedBytes), + MeterFileCompressedReadBytes: float64(fileCompressedReadBytes), + "block_count": 1, + }, + Timestamp: time.Now(), + } + + emitter := reqctx.Emitter(ctx) + if emitter == nil { + dmetering.Emit(context.WithoutCancel(ctx), event) + } else { + emitter.Emit(context.WithoutCancel(ctx), event) + } +} diff --git a/node-manager/app/node_reader_stdin/app.go b/node-manager/app/node_reader_stdin/app.go index 29e4792..375677e 100644 --- a/node-manager/app/node_reader_stdin/app.go +++ b/node-manager/app/node_reader_stdin/app.go @@ -158,6 +158,7 @@ func (a *App) Run() error { } a.zlogger.Info("done reading from stdin") + mindreaderLogPlugin.Shutdown(nil) }() return nil diff --git a/proto/generator/generator.go b/proto/generator/generator.go index 9de2a2c..375d1d8 100644 --- a/proto/generator/generator.go +++ b/proto/generator/generator.go @@ -20,24 +20,13 @@ import ( connect "connectrpc.com/connect" "github.com/iancoleman/strcase" "github.com/streamingfast/cli" + wellknown "github.com/streamingfast/firehose-core/well-known" "google.golang.org/protobuf/proto" ) //go:embed *.gotmpl var templates embed.FS -var wellKnownProtoRepos = []string{ - "buf.build/streamingfast/firehose-ethereum", - "buf.build/streamingfast/firehose-near", - "buf.build/streamingfast/firehose-solana", - "buf.build/streamingfast/firehose-bitcoin", - "buf.build/pinax/firehose-antelope", - "buf.build/pinax/firehose-arweave", - "buf.build/pinax/firehose-beacon", - "buf.build/streamingfast/firehose-starknet", - "buf.build/streamingfast/firehose-cosmos", -} - func main() { cli.Ensure(len(os.Args) == 3, "go run ./generator ") @@ -57,7 +46,8 @@ func main() { var protofiles []ProtoFile - for _, wellKnownProtoRepo := range wellKnownProtoRepos { + for _, protocol := range wellknown.WellKnownProtocols { + wellKnownProtoRepo := protocol.BufBuildURL request := connect.NewRequest(&reflectv1beta1.GetFileDescriptorSetRequest{ Module: wellKnownProtoRepo, }) diff --git a/proto/well_known.go b/proto/well_known.go index 53f5952..fc7075a 100644 --- a/proto/well_known.go +++ b/proto/well_known.go @@ -16,7 +16,6 @@ type WellKnownType struct { } func RegisterWellKnownFileDescriptors(registry *Registry) error { - for _, wt := range wellKnownTypes { fd, err := protoToFileDescriptor(registry, wt.proto) if err != nil { diff --git a/proto/well_known_types.go b/proto/well_known_types.go index f96d362..66dd870 100644 --- a/proto/well_known_types.go +++ b/proto/well_known_types.go @@ -85,5 +85,13 @@ func init() { // sf/cosmos/type/v2/block.proto (https://buf.build/streamingfast/firehose-cosmos/docs/453eb75cabe64abcb4a559f47bcb7e9a:sf.cosmos.type.v2) proto: "0a1d73662f636f736d6f732f747970652f76322f626c6f636b2e70726f746f121173662e636f736d6f732e747970652e76321a1f676f6f676c652f70726f746f6275662f74696d657374616d702e70726f746f1a1e676f6f676c652f70726f746f6275662f6475726174696f6e2e70726f746f2298040a05426c6f636b12120a046861736818012001280c52046861736812160a066865696768741802200128035206686569676874122e0a0474696d6518032001280b321a2e676f6f676c652e70726f746f6275662e54696d657374616d70520474696d6512310a0668656164657218042001280b32192e73662e636f736d6f732e747970652e76322e486561646572520668656164657212400a0b6d69736265686176696f7218062003280b321e2e73662e636f736d6f732e747970652e76322e4d69736265686176696f72520b6d69736265686176696f7212300a066576656e747318072003280b32182e73662e636f736d6f732e747970652e76322e4576656e7452066576656e747312100a0374787318082003280c5203747873123b0a0a74785f726573756c747318092003280b321c2e73662e636f736d6f732e747970652e76322e5478526573756c747352097478526573756c7473124f0a1176616c696461746f725f75706461746573180a2003280b32222e73662e636f736d6f732e747970652e76322e56616c696461746f72557064617465521076616c696461746f7255706461746573125a0a17636f6e73656e7375735f706172616d5f75706461746573180b2001280b32222e73662e636f736d6f732e747970652e76322e436f6e73656e737573506172616d735215636f6e73656e737573506172616d557064617465734a04080c100d4a04080d100e4a04080e100f22c3040a0648656164657212360a0776657273696f6e18012001280b321c2e73662e636f736d6f732e747970652e76322e436f6e73656e737573520776657273696f6e12190a08636861696e5f69641802200128095207636861696e496412160a066865696768741803200128035206686569676874122e0a0474696d6518042001280b321a2e676f6f676c652e70726f746f6275662e54696d657374616d70520474696d65123e0a0d6c6173745f626c6f636b5f696418052001280b321a2e73662e636f736d6f732e747970652e76322e426c6f636b4944520b6c617374426c6f636b496412280a106c6173745f636f6d6d69745f6861736818062001280c520e6c617374436f6d6d697448617368121b0a09646174615f6861736818072001280c5208646174614861736812270a0f76616c696461746f72735f6861736818082001280c520e76616c696461746f72734861736812300a146e6578745f76616c696461746f72735f6861736818092001280c52126e65787456616c696461746f72734861736812250a0e636f6e73656e7375735f68617368180a2001280c520d636f6e73656e7375734861736812190a086170705f68617368180b2001280c520761707048617368122a0a116c6173745f726573756c74735f68617368180c2001280c520f6c617374526573756c74734861736812230a0d65766964656e63655f68617368180d2001280c520c65766964656e63654861736812290a1070726f706f7365725f61646472657373180e2001280c520f70726f706f7365724164647265737322330a09436f6e73656e73757312140a05626c6f636b1801200128045205626c6f636b12100a03617070180220012804520361707022670a07426c6f636b494412120a046861736818012001280c52046861736812480a0f706172745f7365745f68656164657218022001280b32202e73662e636f736d6f732e747970652e76322e50617274536574486561646572520d7061727453657448656164657222390a0d5061727453657448656164657212140a05746f74616c18012001280d5205746f74616c12120a046861736818022001280c52046861736822f7010a0b4d69736265686176696f7212360a047479706518012001280e32222e73662e636f736d6f732e747970652e76322e4d69736265686176696f7254797065520474797065123a0a0976616c696461746f7218022001280b321c2e73662e636f736d6f732e747970652e76322e56616c696461746f72520976616c696461746f7212160a066865696768741803200128035206686569676874122e0a0474696d6518042001280b321a2e676f6f676c652e70726f746f6275662e54696d657374616d70520474696d65122c0a12746f74616c5f766f74696e675f706f7765721805200128035210746f74616c566f74696e67506f776572223b0a0956616c696461746f7212180a076164647265737318012001280c52076164647265737312140a05706f7765721803200128035205706f776572225e0a054576656e7412120a047479706518012001280952047479706512410a0a6174747269627574657318022003280b32212e73662e636f736d6f732e747970652e76322e4576656e74417474726962757465520a6174747269627574657322380a0e4576656e7441747472696275746512100a036b657918012001280952036b657912140a0576616c7565180220012809520576616c756522680a0a4576656e74427974657312120a047479706518012001280952047479706512460a0a6174747269627574657318022003280b32262e73662e636f736d6f732e747970652e76322e4576656e744174747269627574654279746573520a61747472696275746573223d0a134576656e74417474726962757465427974657312100a036b657918012001280c52036b657912140a0576616c756518022001280c520576616c756522e3010a095478526573756c747312120a04636f646518012001280d5204636f646512120a046461746118022001280c52046461746112100a036c6f6718032001280952036c6f6712120a04696e666f1804200128095204696e666f121d0a0a6761735f77616e746564180520012803520967617357616e74656412190a086761735f7573656418062001280352076761735573656412300a066576656e747318072003280b32182e73662e636f736d6f732e747970652e76322e4576656e7452066576656e7473121c0a09636f646573706163651808200128095209636f64657370616365225e0a0f56616c696461746f7255706461746512350a077075625f6b657918012001280b321c2e73662e636f736d6f732e747970652e76322e5075626c69634b657952067075624b657912140a05706f7765721802200128035205706f776572224e0a095075626c69634b6579121a0a076564323535313918012001280c4800520765643235353139121e0a09736563703235366b3118022001280c48005209736563703235366b3142050a0373756d2284020a0f436f6e73656e737573506172616d7312340a05626c6f636b18012001280b321e2e73662e636f736d6f732e747970652e76322e426c6f636b506172616d735205626c6f636b123d0a0865766964656e636518022001280b32212e73662e636f736d6f732e747970652e76322e45766964656e6365506172616d73520865766964656e636512400a0976616c696461746f7218032001280b32222e73662e636f736d6f732e747970652e76322e56616c696461746f72506172616d73520976616c696461746f72123a0a0776657273696f6e18042001280b32202e73662e636f736d6f732e747970652e76322e56657273696f6e506172616d73520776657273696f6e22490a0b426c6f636b506172616d73121b0a096d61785f627974657318012001280352086d6178427974657312170a076d61785f67617318022001280352066d61784761734a0408031004229f010a0e45766964656e6365506172616d73122b0a126d61785f6167655f6e756d5f626c6f636b73180120012803520f6d61784167654e756d426c6f636b7312430a106d61785f6167655f6475726174696f6e18022001280b32192e676f6f676c652e70726f746f6275662e4475726174696f6e520e6d61784167654475726174696f6e121b0a096d61785f627974657318032001280352086d6178427974657322350a0f56616c696461746f72506172616d7312220a0d7075625f6b65795f7479706573180120032809520b7075624b6579547970657322210a0d56657273696f6e506172616d7312100a036170701801200128045203617070225a0a0c486173686564506172616d7312260a0f626c6f636b5f6d61785f6279746573180120012803520d626c6f636b4d6178427974657312220a0d626c6f636b5f6d61785f676173180220012803520b626c6f636b4d61784761732a4b0a0f4d69736265686176696f7254797065120b0a07554e4b4e4f574e100012120a0e4455504c49434154455f564f5445100112170a134c494748545f434c49454e545f41545441434b1002424e5a4c6769746875622e636f6d2f73747265616d696e67666173742f66697265686f73652d636f736d6f732f636f736d6f732f70622f73662f636f736d6f732f747970652f76323b7062636f6d6f73620670726f746f33", }, + { + // google/protobuf/timestamp.proto (https://buf.build/streamingfast/firehose-gear/docs/5eb3cd2c66a540a19505476f405cd5fa:google.protobuf) + proto: "0a1f676f6f676c652f70726f746f6275662f74696d657374616d702e70726f746f120f676f6f676c652e70726f746f627566223b0a0954696d657374616d7012180a077365636f6e647318012001280352077365636f6e647312140a056e616e6f7318022001280552056e616e6f734285010a13636f6d2e676f6f676c652e70726f746f627566420e54696d657374616d7050726f746f50015a32676f6f676c652e676f6c616e672e6f72672f70726f746f6275662f74797065732f6b6e6f776e2f74696d657374616d707062f80101a20203475042aa021e476f6f676c652e50726f746f6275662e57656c6c4b6e6f776e5479706573620670726f746f33", + }, + { + // sf/gear/type/v1/block.proto (https://buf.build/streamingfast/firehose-gear/docs/5eb3cd2c66a540a19505476f405cd5fa:sf.gear.type.v1) + proto: "0a1b73662f676561722f747970652f76312f626c6f636b2e70726f746f120f73662e676561722e747970652e76311a1f676f6f676c652f70726f746f6275662f74696d657374616d702e70726f746f228f030a05426c6f636b12160a066e756d62657218012001280452066e756d62657212120a046861736818022001280c520468617368122f0a0668656164657218032001280b32172e73662e676561722e747970652e76312e4865616465725206686561646572123a0a0a65787472696e7369637318042003280b321a2e73662e676561722e747970652e76312e45787472696e736963520a65787472696e73696373122e0a066576656e747318052003280b32162e73662e676561722e747970652e76312e4576656e7452066576656e7473123e0a0c6469676573745f6974656d7318062003280b321b2e73662e676561722e747970652e76312e4469676573744974656d520b6469676573744974656d7312240a0d6a757374696669636174696f6e18072001280c520d6a757374696669636174696f6e121d0a0a7261775f6576656e747318082001280c52097261774576656e747312380a0974696d657374616d7018092001280b321a2e676f6f676c652e70726f746f6275662e54696d657374616d70520974696d657374616d7022bf010a06486561646572121f0a0b706172656e745f6861736818012001280c520a706172656e7448617368121d0a0a73746174655f726f6f7418022001280c52097374617465526f6f7412270a0f65787472696e736963735f726f6f7418032001280c520e65787472696e73696373526f6f7412210a0c737065635f76657273696f6e18042001280d520b7370656356657273696f6e12290a10757064617465645f6d6574616461746118052001280c520f757064617465644d6574616461746122f7020a0a4469676573744974656d12310a1461735f6368616e6765735f747269655f726f6f7418012001280c4800521161734368616e67657354726965526f6f7412430a0e61735f7072655f72756e74696d6518022001280b321b2e73662e676561722e747970652e76312e50726552756e74696d654800520c617350726552756e74696d65123f0a0c61735f636f6e73656e73757318032001280b321a2e73662e676561722e747970652e76312e436f6e73656e7375734800520b6173436f6e73656e73757312300a0761735f7365616c18042001280b32152e73662e676561722e747970652e76312e5365616c4800520661735365616c12590a1661735f6368616e6765735f747269655f7369676e616c18052001280b32222e73662e676561722e747970652e76312e4368616e676573547269655369676e616c4800521361734368616e676573547269655369676e616c121b0a0861735f6f7468657218062001280c4800520761734f7468657242060a044974656d22520a0a50726552756e74696d65122e0a13636f6e73656e7375735f656e67696e655f696418012001280d5211636f6e73656e737573456e67696e65496412140a05627974657318022001280c5205627974657322510a09436f6e73656e737573122e0a13636f6e73656e7375735f656e67696e655f696418012001280d5211636f6e73656e737573456e67696e65496412140a05627974657318022001280c52056279746573224c0a045365616c122e0a13636f6e73656e7375735f656e67696e655f696418012001280d5211636f6e73656e737573456e67696e65496412140a05627974657318022001280c5205627974657322770a114368616e676573547269655369676e616c12300a1469735f6e65775f636f6e66696775726174696f6e180120012808521269734e6577436f6e66696775726174696f6e12300a1461735f6e65775f636f6e66696775726174696f6e18022001280c521261734e6577436f6e66696775726174696f6e228e010a0945787472696e73696312180a0776657273696f6e18012001280d520776657273696f6e12380a097369676e617475726518022001280b321a2e73662e676561722e747970652e76312e5369676e617475726552097369676e6174757265122d0a066d6574686f6418032001280b32152e73662e676561722e747970652e76312e43616c6c52066d6574686f6422da010a095369676e617475726512350a067369676e657218012001280b321d2e73662e676561722e747970652e76312e4d756c74694164647265737352067369676e6572123d0a097369676e617475726518022001280b321f2e73662e676561722e747970652e76312e4d756c74695369676e617475726552097369676e6174757265122f0a0365726118032001280b321d2e73662e676561722e747970652e76312e45787472696e736963457261520365726112140a056e6f6e636518042001280952056e6f6e636512100a03746970180520012809520374697022ac020a0c4d756c74694164647265737312130a0569735f696418012001280852046973496412130a0561735f696418022001280c52046173496412190a0869735f696e64657818032001280852076973496e64657812190a0861735f696e64657818042001280d52076173496e64657812150a0669735f7261771805200128085205697352617712150a0661735f72617718062001280c5205617352617712220a0d69735f616464726573735f3332180720012808520b697341646472657373333212220a0d61735f616464726573735f333218082001280c520b617341646472657373333212220a0d69735f616464726573735f3230180920012808520b697341646472657373323012220a0d61735f616464726573735f3230180a2001280c520b617341646472657373323022c6010a0e4d756c74695369676e6174757265121e0a0b69735f65645f32353531391801200128085209697345643235353139121e0a0b61735f65645f323535313918022001280c5209617345643235353139121e0a0b69735f73725f32353531391803200128085209697353723235353139121e0a0b61735f73725f323535313918042001280c520961735372323535313912190a0869735f656364736118052001280852076973456364736112190a0861735f656364736118062001280c520761734563647361229a010a0c45787472696e73696345726112260a0f69735f696d6d6f7274616c5f657261180120012808520d6973496d6d6f7274616c45726112220a0d69735f6d6f7274616c5f657261180220012808520b69734d6f7274616c457261123e0a0d61735f6d6f7274616c5f65726118032001280b321a2e73662e676561722e747970652e76312e4d6f7274616c457261520b61734d6f7274616c45726122550a0443616c6c12390a0a63616c6c5f696e64657818012001280b321a2e73662e676561722e747970652e76312e43616c6c496e646578520963616c6c496e64657812120a046172677318022001280c52046172677322530a0943616c6c496e64657812230a0d73656374696f6e5f696e64657818012001280d520c73656374696f6e496e64657812210a0c6d6574686f645f696e64657818022001280d520b6d6574686f64496e64657822710a054576656e7412120a046e616d6518012001280952046e616d65120e0a02696418032001280c52026964122c0a05706861736518042001280b32162e73662e676561722e747970652e76312e50686173655205706861736512160a06746f7069637318052003280c5206746f7069637322b9010a055068617365122c0a1269735f6170706c795f65787472696e736963180120012808521069734170706c7945787472696e736963122c0a1261735f6170706c795f65787472696e73696318022001280d521061734170706c7945787472696e73696312270a0f69735f66696e616c697a6174696f6e180320012808520e697346696e616c697a6174696f6e122b0a1169735f696e697469616c697a6174696f6e18042001280852106973496e697469616c697a6174696f6e22390a094d6f7274616c45726112140a05666972737418012001280d5205666972737412160a067365636f6e6418022001280d52067365636f6e6422210a0d5061796d656e744669656c647312100a03746970180120012809520374697042425a406769746875622e636f6d2f73747265616d696e67666173742f66697265686f73652d676561722f70622f73662f676561722f747970652f76313b706267656172620670726f746f33", + }, } -} +} \ No newline at end of file diff --git a/storage.go b/storage.go index 48e604b..f12b196 100644 --- a/storage.go +++ b/storage.go @@ -14,6 +14,18 @@ import ( var commonStoresCreated bool var indexStoreCreated bool +var tmpDirCreated bool + +func GetTmpDir(dataDir string) (tmpDir string, err error) { + if tmpDirCreated { + return + } + + tmpDir = MustReplaceDataDir(dataDir, viperExpandedEnvGetString("common-tmp-dir")) + err = os.MkdirAll(tmpDir, 0755) + tmpDirCreated = true + return +} func GetCommonStoresURLs(dataDir string) (mergedBlocksStoreURL, oneBlocksStoreURL, forkedBlocksStoreURL string, err error) { mergedBlocksStoreURL = MustReplaceDataDir(dataDir, viperExpandedEnvGetString("common-merged-blocks-store-url")) diff --git a/stream_factory.go b/stream_factory.go index 25f3d31..31346be 100644 --- a/stream_factory.go +++ b/stream_factory.go @@ -4,12 +4,15 @@ import ( "context" "fmt" + "github.com/streamingfast/dmetering" + + "github.com/streamingfast/firehose-core/metering" + "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/hub" "github.com/streamingfast/bstream/stream" "github.com/streamingfast/bstream/transform" "github.com/streamingfast/dauth" - "github.com/streamingfast/dmetering" "github.com/streamingfast/dstore" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" @@ -44,7 +47,8 @@ func (sf *StreamFactory) New( ctx context.Context, handler bstream.Handler, request *pbfirehose.Request, - logger *zap.Logger) (*stream.Stream, error) { + logger *zap.Logger, + extraOpts ...stream.Option) (*stream.Stream, error) { reqLogger := logger.With( zap.Int64("req_start_block", request.StartBlockNum), @@ -108,23 +112,31 @@ func (sf *StreamFactory) New( forkedBlocksStore := sf.forkedBlocksStore if clonable, ok := forkedBlocksStore.(dstore.Clonable); ok { var err error - forkedBlocksStore, err = clonable.Clone(ctx) + forkedBlocksStore, err = clonable.Clone(ctx, metering.WithForkedBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this forkedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } mergedBlocksStore := sf.mergedBlocksStore if clonable, ok := mergedBlocksStore.(dstore.Clonable); ok { var err error - mergedBlocksStore, err = clonable.Clone(ctx) + mergedBlocksStore, err = clonable.Clone(ctx, metering.WithBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...) if err != nil { return nil, err } + + //todo: (deprecated) remove this mergedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx)) } + for _, opt := range extraOpts { + options = append(options, opt) + } + str := stream.New( forkedBlocksStore, mergedBlocksStore, diff --git a/types/block_range_enum.go b/types/block_range_enum.go index 668a645..c7d4c86 100644 --- a/types/block_range_enum.go +++ b/types/block_range_enum.go @@ -18,6 +18,8 @@ const ( RangeBoundaryExclusive ) +var ErrInvalidRangeBoundary = fmt.Errorf("not a valid RangeBoundary, try [%s]", strings.Join(_RangeBoundaryNames, ", ")) + const _RangeBoundaryName = "InclusiveExclusive" var _RangeBoundaryNames = []string{ @@ -45,6 +47,13 @@ func (x RangeBoundary) String() string { return fmt.Sprintf("RangeBoundary(%d)", x) } +// IsValid provides a quick way to determine if the typed value is +// part of the allowed enumerated values +func (x RangeBoundary) IsValid() bool { + _, ok := _RangeBoundaryMap[x] + return ok +} + var _RangeBoundaryValue = map[string]RangeBoundary{ _RangeBoundaryName[0:9]: RangeBoundaryInclusive, strings.ToLower(_RangeBoundaryName[0:9]): RangeBoundaryInclusive, @@ -52,7 +61,7 @@ var _RangeBoundaryValue = map[string]RangeBoundary{ strings.ToLower(_RangeBoundaryName[9:18]): RangeBoundaryExclusive, } -// ParseRangeBoundary attempts to convert a string to a RangeBoundary +// ParseRangeBoundary attempts to convert a string to a RangeBoundary. func ParseRangeBoundary(name string) (RangeBoundary, error) { if x, ok := _RangeBoundaryValue[name]; ok { return x, nil @@ -61,15 +70,15 @@ func ParseRangeBoundary(name string) (RangeBoundary, error) { if x, ok := _RangeBoundaryValue[strings.ToLower(name)]; ok { return x, nil } - return RangeBoundary(0), fmt.Errorf("%s is not a valid RangeBoundary, try [%s]", name, strings.Join(_RangeBoundaryNames, ", ")) + return RangeBoundary(0), fmt.Errorf("%s is %w", name, ErrInvalidRangeBoundary) } -// MarshalText implements the text marshaller method +// MarshalText implements the text marshaller method. func (x RangeBoundary) MarshalText() ([]byte, error) { return []byte(x.String()), nil } -// UnmarshalText implements the text unmarshaller method +// UnmarshalText implements the text unmarshaller method. func (x *RangeBoundary) UnmarshalText(text []byte) error { name := string(text) tmp, err := ParseRangeBoundary(name) diff --git a/well-known/chains.go b/well-known/chains.go new file mode 100644 index 0000000..d443cf1 --- /dev/null +++ b/well-known/chains.go @@ -0,0 +1,314 @@ +package wellknown + +import ( + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" +) + +type WellKnownProtocol struct { + Name string + BlockType string // we always trim `type.googleapis.com/` + BufBuildURL string + BytesEncoding pbfirehose.InfoResponse_BlockIdEncoding + KnownChains []*Chain +} + +type Chain struct { + // Canonical name, from https://thegraph.com/docs/en/developing/supported-networks/ + Name string + // Aliases are other names that can be used to refer to the chain, for example 'polygon' is a popular name for the chain 'matic' + Aliases []string + // Genesis block here is actually the "lowest possible" first streamable block through firehose blocks. + // In most cases, it matches the "genesis block" of the chain. + // It must match the value of the `sf.bstream.v1.Block.id` field (https://github.com/streamingfast/bstream/blob/develop/proto/sf/bstream/v1/bstream.proto#L71) + // and it follows the encoding specified in the `BytesEncoding` field of the WellKnownProtocol + // You can generally get the genesis block ID by running `firecore tools print merged-blocks ` on the merged-blocks + GenesisBlockID string + GenesisBlockNumber uint64 +} + +type WellKnownProtocolList []WellKnownProtocol + +var WellKnownProtocols = WellKnownProtocolList([]WellKnownProtocol{ + { + Name: "ethereum", + BlockType: "sf.ethereum.type.v2.Block", + BufBuildURL: "buf.build/streamingfast/firehose-ethereum", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, + KnownChains: []*Chain{ + { + Name: "mainnet", + Aliases: []string{"ethereum"}, + GenesisBlockID: "d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", + GenesisBlockNumber: 0, + }, + { + Name: "sepolia", + Aliases: []string{}, + GenesisBlockID: "25a5cc106eea7138acab33231d7160d69cb777ee0c2c553fcddf5138993e6dd9", + GenesisBlockNumber: 0, + }, + { + Name: "holesky", + Aliases: []string{}, + GenesisBlockID: "b5f7f912443c940f21fd611f12828d75b534364ed9e95ca4e307729a4661bde4", + GenesisBlockNumber: 0, + }, + { + Name: "matic", + Aliases: []string{"polygon"}, + GenesisBlockID: "a9c28ce2141b56c474f1dc504bee9b01eb1bd7d1a507580d5519d4437a97de1b", + GenesisBlockNumber: 0, + }, + { + Name: "bsc", + Aliases: []string{"bnb", "bsc-mainnet"}, + GenesisBlockID: "0d21840abff46b96c84b2ac9e10e4f5cdaeb5693cb665db62a2f3b02d2d57b5b", + GenesisBlockNumber: 0, + }, + { + Name: "optimism", + Aliases: []string{}, + GenesisBlockID: "7ca38a1916c42007829c55e69d3e9a73265554b586a499015373241b8a3fa48b", + GenesisBlockNumber: 0, + }, + { + Name: "base", + Aliases: []string{}, + GenesisBlockID: "f712aa9241cc24369b143cf6dce85f0902a9731e70d66818a3a5845b296c73dd", + GenesisBlockNumber: 0, + }, + { + Name: "optimism-sepolia", + Aliases: []string{}, + GenesisBlockID: "102de6ffb001480cc9b8b548fd05c34cd4f46ae4aa91759393db90ea0409887d", + GenesisBlockNumber: 0, + }, + { + Name: "chapel", + Aliases: []string{"bsc-chapel", "bsc-testnet"}, + GenesisBlockID: "6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", + GenesisBlockNumber: 0, + }, + { + Name: "arbitrum-one", + Aliases: []string{"arb-one", "arbitrum"}, + GenesisBlockID: "7ee576b35482195fc49205cec9af72ce14f003b9ae69f6ba0faef4514be8b442", + GenesisBlockNumber: 0, + }, + { + Name: "sei-mainnet", // EVM blocks on SEI start at 79123881 + Aliases: []string{"sei-evm-mainnet"}, + GenesisBlockID: "e43c3e52a4f74dc7bb35f55cc813ed2ce97e46aafb3fcf8884056b0a757f5b75", + GenesisBlockNumber: 79123881, + }, + // We do not auto-discover avalanche because the genesis block ID is the same as their testnet fuji and we can't differentiate them + //{ + // Name: "avalanche", + // Aliases: []string{"avax"}, + // GenesisBlockID: "31ced5b9beb7f8782b014660da0cb18cc409f121f408186886e1ca3e8eeca96b", + // GenesisBlockNumber: 0, + //}, + }, + }, + { + Name: "near", + BlockType: "sf.near.type.v1.Block", + BufBuildURL: "buf.build/streamingfast/firehose-near", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, // even though the usual encoding is base58, firehose blocks are written with the hex-encoded version + KnownChains: []*Chain{ + { + Name: "near-mainnet", + Aliases: []string{"near"}, + GenesisBlockID: "a7110b9052e1be68f7fa8bb4065bf54e731205801878e708db7464ec4b9b8014", + GenesisBlockNumber: 9820214, + }, + { + Name: "near-testnet", + Aliases: []string{}, + GenesisBlockID: "09d69c8ba4f319f8e78d0b701f0c1f763cc0d920551bfbd00072d5f56016d853", + GenesisBlockNumber: 42376923, + }, + }, + }, + { + Name: "solana", + BlockType: "sf.solana.type.v1.Block", + BufBuildURL: "buf.build/streamingfast/firehose-solana", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_BASE58, + KnownChains: []*Chain{ + { + Name: "solana-mainnet-beta", + Aliases: []string{"solana", "solana-mainnet"}, + GenesisBlockID: "4sGjMW1sUnHzSxGspuhpqLDx6wiyjNtZAMdL4VZHirAn", + GenesisBlockNumber: 0, + }, + }, + }, + { + Name: "bitcoin", + BlockType: "sf.bitcoin.type.v1.Block", + BufBuildURL: "buf.build/streamingfast/firehose-bitcoin", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, + KnownChains: []*Chain{ + { + Name: "btc", + Aliases: []string{"bitcoin"}, + GenesisBlockID: "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f", + GenesisBlockNumber: 0, + }, + }, + }, + { + Name: "antelope", + BlockType: "sf.antelope.type.v1.Block", + BufBuildURL: "buf.build/pinax/firehose-antelope", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, + KnownChains: []*Chain{ + { + Name: "eos", + Aliases: []string{"eos-mainnet"}, + GenesisBlockID: "0000000267f3e2284b482f3afc2e724be1d6cbc1804532ec62d4e7af47c30693", + GenesisBlockNumber: 2, // even though the genesis block is 1, it is never available through firehose/substreams + }, + { + Name: "kylin", + Aliases: []string{}, + GenesisBlockID: "00000002a1ec7ae214b9e43a904b6c010fb1260c9e8a12e5967bdbe451152a07", + GenesisBlockNumber: 2, // even though the genesis block is 1, it is never available through firehose/substreams + }, + { + Name: "jungle4", + Aliases: []string{}, + GenesisBlockID: "00000002d61d836f51657f886a5bc55b18a731f7eace6565784328fbd051fc90", + GenesisBlockNumber: 2, // even though the genesis block is 1, it is never available through firehose/substreams + }, + }, + }, + { + Name: "arweave", + BlockType: "sf.arweave.type.v1.Block", + BufBuildURL: "buf.build/pinax/firehose-arweave", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, // even though the usual encoding is base64url, firehose blocks are written with the hex-encoded version + KnownChains: []*Chain{ + { + Name: "arweave", + Aliases: []string{}, + GenesisBlockID: "ef0214ecaa252020230a5325719dfc2d9cec86123bc46926dad0c2251ed6be17b7112528dbe678fb2d31d6e6a0951244", + GenesisBlockNumber: 0, + }, + }, + }, + { + Name: "beacon", + BlockType: "sf.beacon.type.v1.Block", + BufBuildURL: "buf.build/pinax/firehose-beacon", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_0X_HEX, + KnownChains: []*Chain{ + { + Name: "mainnet-cl", + Aliases: []string{}, + GenesisBlockID: "0x4d611d5b93fdab69013a7f0a2f961caca0c853f87cfe9595fe50038163079360", + GenesisBlockNumber: 0, + }, + { + Name: "sepolia-cl", + Aliases: []string{}, + GenesisBlockID: "0xfb9b64fe445f76696407e1e3cc390371edff147bf712db86db6197d4b31ede43", + GenesisBlockNumber: 0, + }, + { + Name: "holesky-cl", + Aliases: []string{}, + GenesisBlockID: "0xab09edd9380f8451c3ff5c809821174a36dce606fea8b5ea35ea936915dbf889", + GenesisBlockNumber: 0, + }, + }, + }, + { + Name: "starknet", + BlockType: "sf.starknet.type.v1.Block", + BufBuildURL: "buf.build/streamingfast/firehose-starknet", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_0X_HEX, + KnownChains: []*Chain{ + { + Name: "starknet-mainnet", + Aliases: []string{}, + GenesisBlockID: "0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943", + GenesisBlockNumber: 0, + }, + { + Name: "starknet-testnet", + Aliases: []string{}, + GenesisBlockID: "0x5c627d4aeb51280058bed93c7889bce78114d63baad1be0f0aeb32496d5f19c", + GenesisBlockNumber: 0, + }, + }, + }, + { + Name: "cosmos", + BlockType: "sf.cosmos.type.v2.Block", + BufBuildURL: "buf.build/streamingfast/firehose-cosmos", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, + KnownChains: []*Chain{ + { + Name: "injective-mainnet", + Aliases: []string{}, + GenesisBlockID: "24c9714291a999b952859ee02ec9b233394fe743b07ea3578d432a4a2707b6af", + GenesisBlockNumber: 1, + }, + { + Name: "injective-testnet", + Aliases: []string{}, + GenesisBlockID: "a9effb99c7bc3ba8c18a487ffffd800c137bc2b2f47f73c350f3ca27077044a1", + GenesisBlockNumber: 37368800, // Not the real genesis block, but the other blocks are lost on the testnet + }, + }, + }, + { + Name: "gear", + BlockType: "sf.gear.type.v1.Block", + BufBuildURL: "buf.build/streamingfast/firehose-gear", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, + KnownChains: []*Chain{ + { + Name: "vara-mainnet", + Aliases: []string{}, + GenesisBlockID: "fe1b4c55fd4d668101126434206571a7838a8b6b93a6d1b95d607e78e6c53763", + GenesisBlockNumber: 0, + }, + { + Name: "vara-testnet", + Aliases: []string{}, + GenesisBlockID: "525639f713f397dcf839bd022cd821f367ebcf179de7b9253531f8adbe5436d6", + GenesisBlockNumber: 0, + }, + }, + }, +}) + +func (p WellKnownProtocolList) ChainByGenesisBlock(blockNum uint64, blockID string) *Chain { + for _, protocol := range p { + for _, chain := range protocol.KnownChains { + if chain.GenesisBlockNumber == blockNum && chain.GenesisBlockID == blockID { + return chain + } + } + } + return nil +} + +func (p WellKnownProtocolList) ChainByName(name string) *Chain { + for _, protocol := range p { + for _, chain := range protocol.KnownChains { + if chain.Name == name { + return chain + } + for _, alias := range chain.Aliases { + if alias == name { + return chain + } + } + } + } + return nil +}