Skip to content

Commit

Permalink
Merge branch 'develop' into feature/node-manager-env-on-restart
Browse files Browse the repository at this point in the history
  • Loading branch information
maoueh authored Oct 2, 2024
2 parents f83568e + 0b386ef commit 64789da
Show file tree
Hide file tree
Showing 40 changed files with 1,063 additions and 145 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
.vscode
/build
/dist
.envrc
Expand Down
59 changes: 55 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,65 @@ Operators, you should copy/paste content of this content straight to your projec

If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary.

## Unreleased
## v1.6.4

* [Operator] Node Manager HTTP `/v1/resume` call now accepts `extra-env=<key>=<value>&extra-env=<keyN>=<valueN>` enabling to override environment variables for the next restart **only**. Use `curl -XPOST "http://localhost:10011/v1/resume?sync=true&extra-env=NODE_DEBUG=true"` (change `localhost:10011` accordingly to your setup).
### Substreams fixes

* Fixed an(other) issue where multiple stores running on the same stage with different initialBlocks will fail to proress (and hang)

## v1.6.3

### Substreams fixes

* Fix "cannot resolve 'old cursor' from files in passthrough mode" error on some requests with an old cursor
* Fix handling of 'special case' substreams module with only "params" as its input: should not skip this execution (used in graph-node for head tracking)
-> empty files in module cache with hash `d3b1920483180cbcd2fd10abcabbee431146f4c8` should be deleted for consistency
* Fix bug where some invalid cursors may be sent (with 'LIB' being above the block being sent) and add safeguard/loggin if the bug appears again
* Fix panic in the whole tier2 process when stores go above the size limit while being read from "kvops" cached changes

### Core fixes

* fix: reader-node-stdin not shutting down after receiving an EOF

## v1.6.2

### Core

* [Operator] The flag `--advertise-block-id-encoding` now accepts shorter form: `hex`, `base64`, etc. The older longer form `BLOCK_ID_ENCODING_HEX` is still supported but we suggested using the shorter form from now on.

### Substreams v1.10.2

> **Note** Since a bug that affected substreams with "skipping blocks" was corrected in this release, any previously produced substreams cache should be considered as possibly corrupted and be eventually replaced
* Substreams: fix bad handling of modules with multiple inputs when only one of them is filtered, resulting in bad outputs in production-mode.
* Substreams: fix stalling on some substreams with stores and mappers with different start block numbers on the same stage
* Substreams: fix 'development mode' and LIVE mode executing some modules that should be skipped

## v1.6.1

* Bump substreams to v1.10.0: Version 1.10.0 adds a new `EndpointInfo/Info` endpoint, introduces a 3-minute default execution timeout per block, updates metering metrics with a deprecation warning, enhances `substreams init` commands, and improves wasm module caching and Prometheus tool flexibility. Full changelog: https://github.com/streamingfast/substreams/releases/tag/v1.10.0
* Metering update: more detailed metering with addition of new metrics. *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead

## v1.6.0

* Add `sf.firehose.v2.EndpointInfo/Info` service on Firehose and `sf.substreams.rpc.v2.EndpointInfo/Info` to Substreams endpoints. This involves the following new flags:
- `advertise-chain-name` Canonical name of the chain according to https://thegraph.com/docs/en/developing/supported-networks/ (required, unless it is in the "well-known" list)
- `advertise-chain-aliases` Alternate names for that chain (optional)
- `advertise-block-features` List of features describing the blocks (optional)
- `advertise-block-id-encoding` Encoding format of the block ID [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_BASE64URL, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] (required, unless the block type is in the "well-known" list)
- `ignore-advertise-validation` Runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail.

> This is **not** persistent upon restart
* Add a well-known list of chains (hard-coded in `wellknown/chains.go` to help automatically determine the 'advertise' flag values). Users are encouraged to propose Pull Requests to add more chains to the list.
* The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service.
It validates the following on a well-known chain:
- if the first-streamable-block Num/ID match the genesis block of a known chain, e.g. `matic`, it will refuse another value for `advertise-chain-name` than `matic` or one of its aliases (`polygon`)
- If the first-streamable-block does not match any known chain, it will require the `advertise-chain-name` to be non-empty
- If the first-streamable-block type is unknown (i.e. not ethereum, solana, near, cosmos, bitcoin...), it will require the user to provide `advertise-chain-name` as well as `advertise-block-id-encoding`

* Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero first streamable block via `--common-first-streamable-block`. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module.
* Substreams: add `--common-tmp-dir` flag and activate local caching of pre-compiled WASM modules through wazero feature
* Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module.
* Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling
* Metering update: more detailed metering with addition of new metrics (`live_uncompressed_read_bytes`, `live_uncompressed_read_forked_bytes`, `file_uncompressed_read_bytes`, `file_uncompressed_read_forked_bytes`, `file_compressed_read_forked_bytes`, `file_compressed_read_bytes`). *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead.

## v1.5.7

Expand Down
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Or download the latest Release from https://github.com/streamingfast/firehose-co

## Running directly

* firehose-core can run one or many of the following components:
* `firehose-core` can run one or many of the following components:
- reader-node
- merger
- relayer
Expand All @@ -33,6 +33,17 @@ start:

* Run it with `firecore start --config-file=./firehose.yaml` or set an empty value for config-file (`--config-file=`) to use the default values.

### Development

For development purposes, the easiest set up is to install the [dummy-blockchain](https://github.com/streamingfast/dummy-blockchain) and then use the `./devel/standard/start.sh` script we provide in the repository that launches a full fledged `firehose-core` instance backed by this dummy blockchain:

```
# Needed only once, if you don't have the binary locally already
go install github.com/streamingfast/dummy-blockchain@latest
# The -c cleans any previous data, remove to keep data upon restarts
./devel/standard/start.sh -c
```

## Using as a library

Expand All @@ -41,7 +52,6 @@ For chains that implement "firehose block filters" and extensions like "eth_call
* [firehose-ethereum](https://github.com/streamingfast/firehose-ethereum)
* [firehose-antelope](https://github.com/pinax-network/firehose-antelope)


### Philosophy

Firehose maintenance cost comes from two sides. First, there is the chain integration that needs to be maintained. This is done within the chain's code directly by the chain's core developers. The second side of things is the maintenance of the Golang part of the Firehose stack.
Expand Down
2 changes: 1 addition & 1 deletion blockpoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream
}

if skipped {
return nil, fmt.Errorf("block %d was skipped and sould not have been requested", blkNum)
return nil, fmt.Errorf("block %d was skipped and should not have been requested", blkNum)
}

if p.forceFinalityAfterBlocks != nil {
Expand Down
9 changes: 9 additions & 0 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"runtime/debug"
"strings"

pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"github.com/streamingfast/substreams/wasm"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -154,6 +155,10 @@ type Chain[B Block] struct {
DefaultBlockType string

RegisterSubstreamsExtensions func() (wasm.WASMExtensioner, error)

// InfoResponseFiller is a function that fills the `pbfirehose.InfoResponse` from the first streamable block of the chain.
// It can validate that we are on the right chain by checking against a known hash, or populate missing fields.
InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error
}

type ToolsConfig[B Block] struct {
Expand Down Expand Up @@ -261,6 +266,10 @@ func (c *Chain[B]) Validate() {
err = multierr.Append(err, fmt.Errorf("field 'BlockIndexerFactories' must have at most one element"))
}

if c.InfoResponseFiller == nil {
err = multierr.Append(err, fmt.Errorf("field 'InfoResponseFiller' must be set"))
}

for key, indexerFactory := range c.BlockIndexerFactories {
if indexerFactory == nil {
err = multierr.Append(err, fmt.Errorf("entry %q for field 'BlockIndexerFactories' must be non-nil", key))
Expand Down
1 change: 1 addition & 0 deletions cmd/apps/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *za
HeadBlockNumberMetric: headBlockNumMetric,
TransformRegistry: registry,
CheckPendingShutdown: runtime.IsPendingShutdown,
InfoServer: runtime.InfoServer,
}), nil
},
})
Expand Down
33 changes: 30 additions & 3 deletions cmd/apps/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dmetering"
firecore "github.com/streamingfast/firehose-core"
info "github.com/streamingfast/firehose-core/firehose/info"
"github.com/streamingfast/firehose-core/launcher"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
tracing "github.com/streamingfast/sf-tracing"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand All @@ -45,7 +47,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st
configFile := sflags.MustGetString(cmd, "config-file")
rootLog.Info(fmt.Sprintf("starting Firehose on %s with config file '%s'", chain.LongName, configFile))

err = start(cmd, dataDir, args, rootLog)
err = start(cmd, dataDir, args, chain, rootLog)
if err != nil {
return fmt.Errorf("unable to launch: %w", err)
}
Expand All @@ -55,7 +57,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st
}
}

func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logger) (err error) {
func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string, chain *firecore.Chain[B], rootLog *zap.Logger) (err error) {
dataDirAbs, err := filepath.Abs(dataDir)
if err != nil {
return fmt.Errorf("unable to setup directory structure: %w", err)
Expand All @@ -82,7 +84,32 @@ func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logge
}()
dmetering.SetDefaultEmitter(eventEmitter)

launch := launcher.NewLauncher(rootLog, dataDirAbs)
blockIDEncoding := pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET
if enc := sflags.MustGetString(cmd, "advertise-block-id-encoding"); enc != "" {
v, found := pbfirehose.InfoResponse_BlockIdEncoding_value[enc]
if !found {
longCandidate := "BLOCK_ID_ENCODING_" + strings.ToUpper(enc)
v, found = pbfirehose.InfoResponse_BlockIdEncoding_value[longCandidate]
if !found {
return fmt.Errorf("invalid block id encoding: %s", enc)
}
}

blockIDEncoding = pbfirehose.InfoResponse_BlockIdEncoding(v)
}

infoServer := info.NewInfoServer(
sflags.MustGetString(cmd, "advertise-chain-name"),
sflags.MustGetStringSlice(cmd, "advertise-chain-aliases"),
blockIDEncoding,
sflags.MustGetStringSlice(cmd, "advertise-block-features"),
bstream.GetProtocolFirstStreamableBlock,
!sflags.MustGetBool(cmd, "ignore-advertise-validation"),
chain.InfoResponseFiller,
rootLog,
)

launch := launcher.NewLauncher(rootLog, dataDirAbs, infoServer)
rootLog.Debug("launcher created")

runByDefault := func(app string) bool {
Expand Down
7 changes: 7 additions & 0 deletions cmd/apps/substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
wasmExtensions = exts
}

tmpDir, err := firecore.GetTmpDir(sfDataDir)
if err != nil {
return nil, fmt.Errorf("getting temporary directory: %w", err)
}

return app.NewTier1(appLogger,
&app.Tier1Config{
MeteringConfig: GetCommonMeteringPluginValue(),
Expand All @@ -128,6 +133,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
OneBlocksStoreURL: oneBlocksStoreURL,
ForkedBlocksStoreURL: forkedBlocksStoreURL,
BlockStreamAddr: blockstreamAddr,
TmpDir: tmpDir,

StateStoreURL: stateStoreURL,
StateStoreDefaultTag: stateStoreDefaultTag,
Expand All @@ -150,6 +156,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
HeadTimeDriftMetric: ss1HeadTimeDriftmetric,
HeadBlockNumberMetric: ss1HeadBlockNumMetric,
CheckPendingShutDown: runtime.IsPendingShutdown,
InfoServer: runtime.InfoServer,
}), nil
},
})
Expand Down
6 changes: 6 additions & 0 deletions cmd/apps/substreams_tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func RegisterSubstreamsTier2App[B firecore.Block](chain *firecore.Chain[B], root
wasmExtensions = exts
}

tmpDir, err := firecore.GetTmpDir(runtime.AbsDataDir)
if err != nil {
return nil, fmt.Errorf("getting temporary directory: %w", err)
}

return app.NewTier2(appLogger,
&app.Tier2Config{
Tracing: tracing,
Expand All @@ -90,6 +95,7 @@ func RegisterSubstreamsTier2App[B firecore.Block](chain *firecore.Chain[B], root
ServiceDiscoveryURL: serviceDiscoveryURL,
WASMExtensions: wasmExtensions,
BlockExecutionTimeout: executionTimeout,
TmpDir: tmpDir,

MaximumConcurrentRequests: maximumConcurrentRequests,
}, &app.Tier2Modules{
Expand Down
2 changes: 2 additions & 0 deletions cmd/firecore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
firecore "github.com/streamingfast/firehose-core"
fhCMD "github.com/streamingfast/firehose-core/cmd"
info "github.com/streamingfast/firehose-core/firehose/info"
)

func main() {
Expand All @@ -17,6 +18,7 @@ func main() {
Version: version,
BlockFactory: func() firecore.Block { return new(pbbstream.Block) },
ConsoleReaderFactory: firecore.NewConsoleReader,
InfoResponseFiller: info.DefaultInfoResponseFiller,
Tools: &firecore.ToolsConfig[*pbbstream.Block]{},
})
}
Expand Down
31 changes: 29 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/streamingfast/firehose-core/cmd/tools"
"github.com/streamingfast/firehose-core/launcher"
paymentGatewayMetering "github.com/streamingfast/payment-gateway/metering"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"

"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand Down Expand Up @@ -72,7 +73,7 @@ func Main[B firecore.Block](chain *firecore.Chain[B]) {
flags.StringP("config-file", "c", "./firehose.yaml", "Configuration file to use. No config file loaded if set to an empty string.")

flags.String("log-format", "text", "Format for logging to stdout. Either 'text' or 'stackdriver'")
flags.Bool("log-to-file", true, "Also write logs to {data-dir}/firehose.log.json ")
flags.Bool("log-to-file", true, fmt.Sprintf("Also write logs to {data-dir}/%s ", launcher.DefaultLogFile))
flags.String("log-level-switcher-listen-addr", "localhost:1065", cli.FlagDescription(`
If non-empty, a JSON based HTTP server will listen on this address to let you switch the default logging level
of all registered loggers to a different one on the fly. This enables switching to debug level on
Expand Down Expand Up @@ -170,9 +171,31 @@ func registerCommonFlags[B firecore.Block](chain *firecore.Chain[B]) {
cmd.Flags().String("common-merged-blocks-store-url", firecore.MergedBlocksStoreURL, "[COMMON] Store URL where to read/write merged blocks.")
cmd.Flags().String("common-forked-blocks-store-url", firecore.ForkedBlocksStoreURL, "[COMMON] Store URL where to read/write forked block files that we want to keep.")
cmd.Flags().String("common-live-blocks-addr", firecore.RelayerServingAddr, "[COMMON] gRPC endpoint to get real-time blocks.")
cmd.Flags().String("common-tmp-dir", firecore.TmpDir, "[COMMON] Local directory to store temporary files")

cmd.Flags().String("advertise-chain-name", "", "[firehose,substreams-tier1] Chain name to advertise in the Info Endpoint. Required but it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-chain-aliases", nil, "[firehose,substreams-tier1] List of chain name aliases to advertise in the Info Endpoint. If unset, it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-block-features", nil, "[firehose,substreams-tier1] List of block features to advertise in the Info Endpoint. If unset, it may be inferred from the genesis block.")
cmd.Flags().Bool("ignore-advertise-validation", false, "[firehose,substreams-tier1] When true, runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail.")

acceptedEncodings := make([]string, len(pbfirehose.InfoResponse_BlockIdEncoding_value)-1)
i := 0
for encoding := range pbfirehose.InfoResponse_BlockIdEncoding_value {
if encoding != "BLOCK_ID_ENCODING_UNSET" {
acceptedEncodings[i] = strings.ToLower(strings.Replace(encoding, "BLOCK_ID_ENCODING_", "", 1))
i++
}
}
cmd.Flags().String("advertise-block-id-encoding", "", FlagMultilineDescription(`
[firehose,substreams-tier1] Block ID encoding type to advertise in the Info Endpoint. If unset, it may be inferred
from the genesis block. Accepted encodings are: %s.
Previous versions accepted 'BLOCK_ID_ENCODING_<upper(encoding)>' as well, this still works but we
recommend using the short form lowercase version.
`, strings.Join(acceptedEncodings, ", ")))

cmd.Flags().String("common-index-store-url", firecore.IndexStoreURL, "[COMMON] Store URL where to read/write index files (if used on the chain).")
cmd.Flags().IntSlice("common-index-block-sizes", []int{100000, 10000, 1000, 100}, "Index bundle sizes that that are considered valid when looking for block indexes")
cmd.Flags().IntSlice("common-index-block-sizes", []int{100000, 10000, 1000, 100}, "[COMMON] Index bundle sizes that that are considered valid when looking for block indexes")

cmd.Flags().Bool("common-blocks-cache-enabled", false, cli.FlagDescription(`
[COMMON] Use a disk cache to store the blocks data to disk and instead of keeping it in RAM. By enabling this, block's Protobuf content, in bytes,
Expand Down Expand Up @@ -235,3 +258,7 @@ Additional help topics:{{range .Commands}}{{if .IsAdditionalHelpTopicCommand}}
Use "{{.CommandPath}} [command] --help" for more information about a command.{{end}}
`

func FlagMultilineDescription(input string, args ...any) string {
return cli.Dedent(fmt.Sprintf(input, args...))
}
4 changes: 4 additions & 0 deletions cmd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func setupCmd(cmd *cobra.Command, binaryName string) error {
return fmt.Errorf("invalid flag %s in config file under command %s", k, subCommand)
}

// Keep compatibility with config files, allow empty value to unset the flag
if v == nil {
v = ""
}
viper.SetDefault(flag.viperKey, v)

// For root command, we want to keep compatibility for `viper.GetXXX("global-<flag>")` to work with config loaded value
Expand Down
2 changes: 0 additions & 2 deletions cmd/tools/mergeblock/tools_merge_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ func NewToolsMergeBlocksCmd[B firecore.Block](chain *firecore.Chain[B], zlog *za
RunE: runMergeBlocksE(zlog),
}

cmd.Flags().String("force-block-type", "", "When set, will force the block type to the given value.")

return cmd
}

Expand Down
1 change: 1 addition & 0 deletions cmd/tools/mergeblock/tools_upgrade_merged_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func getMergedBlockUpgrader(tweakFunc func(block *pbbstream.Block) (*pbbstream.B
LowBlockNum: firecore.LowBoundary(start),
StopBlockNum: stop,
TweakBlock: tweakFunc,
Logger: rootLog,
}
stream := stream.New(nil, sourceStore, nil, int64(start), writer, stream.WithFinalBlocksOnly())

Expand Down
Loading

0 comments on commit 64789da

Please sign in to comment.