Skip to content

Commit

Permalink
Merge pull request #60 from streamingfast/feature/info
Browse files Browse the repository at this point in the history
add sf.firehose.v2.EndpointInfo/Info endpoint and logic
  • Loading branch information
sduchesneau authored Aug 26, 2024
2 parents ad3d137 + 2518419 commit a5050bd
Show file tree
Hide file tree
Showing 20 changed files with 703 additions and 28 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
.vscode
/build
/dist
.envrc
Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s

## Unreleased

* Add `sf.firehose.v2.EndpointInfo/Info` service on Firehose and `sf.substreams.rpc.v2.EndpointInfo/Info` to Substreams endpoints. This involves the following new flags:
- `advertise-chain-name` Canonical name of the chain according to https://thegraph.com/docs/en/developing/supported-networks/ (required, unless it is in the "well-known" list)
- `advertise-chain-aliases` Alternate names for that chain (optional)
- `advertise-block-features` List of features describing the blocks (optional)
- `advertise-block-id-encoding` Encoding format of the block ID [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_BASE64URL, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] (required, unless the block type is in the "well-known" list)
- `ignore-advertise-validation` Runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail.

* Add a well-known list of chains (hard-coded in `wellknown/chains.go` to help automatically determine the 'advertise' flag values). Users are encouraged to propose Pull Requests to add more chains to the list.
* The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service.
It validates the following on a well-known chain:
- if the first-streamable-block Num/ID match the genesis block of a known chain, e.g. `matic`, it will refuse another value for `advertise-chain-name` than `matic` or one of its aliases (`polygon`)
- If the first-streamable-block does not match any known chain, it will require the `advertise-chain-name` to be non-empty
- If the first-streamable-block type is unknown (i.e. not ethereum, solana, near, cosmos, bitcoin...), it will require the user to provide `advertise-chain-name` as well as `advertise-block-id-encoding`

* Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module.
* Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling

Expand Down
9 changes: 9 additions & 0 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"runtime/debug"
"strings"

pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"github.com/streamingfast/substreams/wasm"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -154,6 +155,10 @@ type Chain[B Block] struct {
DefaultBlockType string

RegisterSubstreamsExtensions func() (wasm.WASMExtensioner, error)

// InfoResponseFiller is a function that fills the `pbfirehose.InfoResponse` from the first streamable block of the chain.
// It can validate that we are on the right chain by checking against a known hash, or populate missing fields.
InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error
}

type ToolsConfig[B Block] struct {
Expand Down Expand Up @@ -261,6 +266,10 @@ func (c *Chain[B]) Validate() {
err = multierr.Append(err, fmt.Errorf("field 'BlockIndexerFactories' must have at most one element"))
}

if c.InfoResponseFiller == nil {
err = multierr.Append(err, fmt.Errorf("field 'InfoResponseFiller' must be set"))
}

for key, indexerFactory := range c.BlockIndexerFactories {
if indexerFactory == nil {
err = multierr.Append(err, fmt.Errorf("entry %q for field 'BlockIndexerFactories' must be non-nil", key))
Expand Down
1 change: 1 addition & 0 deletions cmd/apps/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *za
HeadBlockNumberMetric: headBlockNumMetric,
TransformRegistry: registry,
CheckPendingShutdown: runtime.IsPendingShutdown,
InfoServer: runtime.InfoServer,
}), nil
},
})
Expand Down
24 changes: 21 additions & 3 deletions cmd/apps/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dmetering"
firecore "github.com/streamingfast/firehose-core"
info "github.com/streamingfast/firehose-core/firehose/info"
"github.com/streamingfast/firehose-core/launcher"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
tracing "github.com/streamingfast/sf-tracing"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand All @@ -45,7 +47,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st
configFile := sflags.MustGetString(cmd, "config-file")
rootLog.Info(fmt.Sprintf("starting Firehose on %s with config file '%s'", chain.LongName, configFile))

err = start(cmd, dataDir, args, rootLog)
err = start(cmd, dataDir, args, chain, rootLog)
if err != nil {
return fmt.Errorf("unable to launch: %w", err)
}
Expand All @@ -55,7 +57,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st
}
}

func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logger) (err error) {
func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string, chain *firecore.Chain[B], rootLog *zap.Logger) (err error) {
dataDirAbs, err := filepath.Abs(dataDir)
if err != nil {
return fmt.Errorf("unable to setup directory structure: %w", err)
Expand All @@ -82,7 +84,23 @@ func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logge
}()
dmetering.SetDefaultEmitter(eventEmitter)

launch := launcher.NewLauncher(rootLog, dataDirAbs)
blockIDEncoding := pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET
if enc := sflags.MustGetString(cmd, "advertise-block-id-encoding"); enc != "" {
blockIDEncoding = pbfirehose.InfoResponse_BlockIdEncoding(pbfirehose.InfoResponse_BlockIdEncoding_value[enc])
}

infoServer := info.NewInfoServer(
sflags.MustGetString(cmd, "advertise-chain-name"),
sflags.MustGetStringSlice(cmd, "advertise-chain-aliases"),
blockIDEncoding,
sflags.MustGetStringSlice(cmd, "advertise-block-features"),
bstream.GetProtocolFirstStreamableBlock,
!sflags.MustGetBool(cmd, "ignore-advertise-validation"),
chain.InfoResponseFiller,
rootLog,
)

launch := launcher.NewLauncher(rootLog, dataDirAbs, infoServer)
rootLog.Debug("launcher created")

runByDefault := func(app string) bool {
Expand Down
1 change: 1 addition & 0 deletions cmd/apps/substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
HeadTimeDriftMetric: ss1HeadTimeDriftmetric,
HeadBlockNumberMetric: ss1HeadBlockNumMetric,
CheckPendingShutDown: runtime.IsPendingShutdown,
InfoServer: runtime.InfoServer,
}), nil
},
})
Expand Down
2 changes: 2 additions & 0 deletions cmd/firecore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
firecore "github.com/streamingfast/firehose-core"
fhCMD "github.com/streamingfast/firehose-core/cmd"
info "github.com/streamingfast/firehose-core/firehose/info"
)

func main() {
Expand All @@ -17,6 +18,7 @@ func main() {
Version: version,
BlockFactory: func() firecore.Block { return new(pbbstream.Block) },
ConsoleReaderFactory: firecore.NewConsoleReader,
InfoResponseFiller: info.DefaultInfoResponseFiller,
Tools: &firecore.ToolsConfig[*pbbstream.Block]{},
})
}
Expand Down
16 changes: 16 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/streamingfast/firehose-core/cmd/tools"
"github.com/streamingfast/firehose-core/launcher"
paymentGatewayMetering "github.com/streamingfast/payment-gateway/metering"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"

"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand Down Expand Up @@ -171,6 +172,21 @@ func registerCommonFlags[B firecore.Block](chain *firecore.Chain[B]) {
cmd.Flags().String("common-forked-blocks-store-url", firecore.ForkedBlocksStoreURL, "[COMMON] Store URL where to read/write forked block files that we want to keep.")
cmd.Flags().String("common-live-blocks-addr", firecore.RelayerServingAddr, "[COMMON] gRPC endpoint to get real-time blocks.")

cmd.Flags().String("advertise-chain-name", "", "[firehose,substreams-tier1] Chain name to advertise in the Info Endpoint. Required but it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-chain-aliases", nil, "[firehose,substreams-tier1] List of chain name aliases to advertise in the Info Endpoint. If unset, it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-block-features", nil, "[firehose,substreams-tier1] List of block features to advertise in the Info Endpoint. If unset, it may be inferred from the genesis block.")
cmd.Flags().Bool("ignore-advertise-validation", false, "[firehose,substreams-tier1] When true, runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail.")

acceptedEncodings := make([]string, len(pbfirehose.InfoResponse_BlockIdEncoding_value)-1)
i := 0
for encoding := range pbfirehose.InfoResponse_BlockIdEncoding_value {
if encoding != "BLOCK_ID_ENCODING_UNSET" {
acceptedEncodings[i] = encoding
i++
}
}
cmd.Flags().String("advertise-block-id-encoding", "", fmt.Sprintf("[firehose,substreams-tier1] Block ID encoding type to advertise in the Info Endpoint (%s). If unset, it may be inferred from the genesis block.", strings.Join(acceptedEncodings, ", ")))

cmd.Flags().String("common-index-store-url", firecore.IndexStoreURL, "[COMMON] Store URL where to read/write index files (if used on the chain).")
cmd.Flags().IntSlice("common-index-block-sizes", []int{100000, 10000, 1000, 100}, "Index bundle sizes that that are considered valid when looking for block indexes")

Expand Down
4 changes: 4 additions & 0 deletions cmd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func setupCmd(cmd *cobra.Command, binaryName string) error {
return fmt.Errorf("invalid flag %s in config file under command %s", k, subCommand)
}

// Keep compatibility with config files, allow empty value to unset the flag
if v == nil {
v = ""
}
viper.SetDefault(flag.viperKey, v)

// For root command, we want to keep compatibility for `viper.GetXXX("global-<flag>")` to work with config loaded value
Expand Down
9 changes: 9 additions & 0 deletions firehose/app/firehose/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/firehose"
"github.com/streamingfast/firehose-core/firehose/info"
"github.com/streamingfast/firehose-core/firehose/metrics"
"github.com/streamingfast/firehose-core/firehose/server"
"github.com/streamingfast/logging"
Expand Down Expand Up @@ -64,6 +65,7 @@ type Modules struct {
TransformRegistry *transform.Registry
RegisterServiceExtension RegisterServiceExtensionFunc
CheckPendingShutdown func() bool
InfoServer *info.InfoServer
}

type App struct {
Expand Down Expand Up @@ -158,6 +160,7 @@ func (a *App) Run() error {
a.IsReady,
a.config.GRPCListenAddr,
a.config.ServiceDiscoveryURL,
a.modules.InfoServer,
a.config.ServerOptions...,
)

Expand Down Expand Up @@ -186,6 +189,12 @@ func (a *App) Run() error {
}
}

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
if err := a.modules.InfoServer.Init(ctx, forkableHub, mergedBlocksStore, oneBlocksStore, a.logger); err != nil {
a.Shutdown(fmt.Errorf("cannot initialize info server: %w", err))
}

a.logger.Info("launching gRPC firehoseServer", zap.Bool("live_support", withLive))
a.isReady.CAS(false, true)
firehoseServer.Launch()
Expand Down
Loading

0 comments on commit a5050bd

Please sign in to comment.