Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add sf.firehose.v2.EndpointInfo/Info endpoint and logic #60

Merged
merged 7 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
.vscode
/build
/dist
.envrc
Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s

## Unreleased

* Add `sf.firehose.v2.EndpointInfo/Info` service on Firehose and `sf.substreams.rpc.v2.EndpointInfo/Info` to Substreams endpoints. This involves the following new flags:
sduchesneau marked this conversation as resolved.
Show resolved Hide resolved
- `advertise-chain-name` Canonical name of the chain according to https://thegraph.com/docs/en/developing/supported-networks/ (required, unless it is in the "well-known" list)
- `advertise-chain-aliases` Alternate names for that chain (optional)
- `advertise-block-features` List of features describing the blocks (optional)
- `advertise-block-id-encoding` Encoding format of the block ID [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_BASE64URL, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] (required, unless the block type is in the "well-known" list)
- `ignore-advertise-validation` Runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail.

* Add a well-known list of chains (hard-coded in `wellknown/chains.go` to help automatically determine the 'advertise' flag values). Users are encouraged to propose Pull Requests to add more chains to the list.
* The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service.
It validates the following on a well-known chain:
- if the first-streamable-block Num/ID match the genesis block of a known chain, e.g. `matic`, it will refuse another value for `advertise-chain-name` than `matic` or one of its aliases (`polygon`)
- If the first-streamable-block does not match any known chain, it will require the `advertise-chain-name` to be non-empty
- If the first-streamable-block type is unknown (i.e. not ethereum, solana, near, cosmos, bitcoin...), it will require the user to provide `advertise-chain-name` as well as `advertise-block-id-encoding`

* Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module.
* Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling

Expand Down
9 changes: 9 additions & 0 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"runtime/debug"
"strings"

pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"github.com/streamingfast/substreams/wasm"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -154,6 +155,10 @@ type Chain[B Block] struct {
DefaultBlockType string

RegisterSubstreamsExtensions func() (wasm.WASMExtensioner, error)

// InfoResponseFiller is a function that fills the `pbfirehose.InfoResponse` from the first streamable block of the chain.
// It can validate that we are on the right chain by checking against a known hash, or populate missing fields.
InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error
}

type ToolsConfig[B Block] struct {
Expand Down Expand Up @@ -261,6 +266,10 @@ func (c *Chain[B]) Validate() {
err = multierr.Append(err, fmt.Errorf("field 'BlockIndexerFactories' must have at most one element"))
}

if c.InfoResponseFiller == nil {
err = multierr.Append(err, fmt.Errorf("field 'InfoResponseFiller' must be set"))
}

for key, indexerFactory := range c.BlockIndexerFactories {
if indexerFactory == nil {
err = multierr.Append(err, fmt.Errorf("entry %q for field 'BlockIndexerFactories' must be non-nil", key))
Expand Down
1 change: 1 addition & 0 deletions cmd/apps/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *za
HeadBlockNumberMetric: headBlockNumMetric,
TransformRegistry: registry,
CheckPendingShutdown: runtime.IsPendingShutdown,
InfoServer: runtime.InfoServer,
}), nil
},
})
Expand Down
24 changes: 21 additions & 3 deletions cmd/apps/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dmetering"
firecore "github.com/streamingfast/firehose-core"
info "github.com/streamingfast/firehose-core/firehose/info"
"github.com/streamingfast/firehose-core/launcher"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
tracing "github.com/streamingfast/sf-tracing"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand All @@ -45,7 +47,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st
configFile := sflags.MustGetString(cmd, "config-file")
rootLog.Info(fmt.Sprintf("starting Firehose on %s with config file '%s'", chain.LongName, configFile))

err = start(cmd, dataDir, args, rootLog)
err = start(cmd, dataDir, args, chain, rootLog)
if err != nil {
return fmt.Errorf("unable to launch: %w", err)
}
Expand All @@ -55,7 +57,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st
}
}

func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logger) (err error) {
func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string, chain *firecore.Chain[B], rootLog *zap.Logger) (err error) {
dataDirAbs, err := filepath.Abs(dataDir)
if err != nil {
return fmt.Errorf("unable to setup directory structure: %w", err)
Expand All @@ -82,7 +84,23 @@ func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logge
}()
dmetering.SetDefaultEmitter(eventEmitter)

launch := launcher.NewLauncher(rootLog, dataDirAbs)
blockIDEncoding := pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET
if enc := sflags.MustGetString(cmd, "advertise-block-id-encoding"); enc != "" {
blockIDEncoding = pbfirehose.InfoResponse_BlockIdEncoding(pbfirehose.InfoResponse_BlockIdEncoding_value[enc])
}

infoServer := info.NewInfoServer(
sflags.MustGetString(cmd, "advertise-chain-name"),
sflags.MustGetStringSlice(cmd, "advertise-chain-aliases"),
blockIDEncoding,
sflags.MustGetStringSlice(cmd, "advertise-block-features"),
bstream.GetProtocolFirstStreamableBlock,
!sflags.MustGetBool(cmd, "ignore-advertise-validation"),
chain.InfoResponseFiller,
rootLog,
)

launch := launcher.NewLauncher(rootLog, dataDirAbs, infoServer)
rootLog.Debug("launcher created")

runByDefault := func(app string) bool {
Expand Down
1 change: 1 addition & 0 deletions cmd/apps/substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
HeadTimeDriftMetric: ss1HeadTimeDriftmetric,
HeadBlockNumberMetric: ss1HeadBlockNumMetric,
CheckPendingShutDown: runtime.IsPendingShutdown,
InfoServer: runtime.InfoServer,
}), nil
},
})
Expand Down
2 changes: 2 additions & 0 deletions cmd/firecore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
firecore "github.com/streamingfast/firehose-core"
fhCMD "github.com/streamingfast/firehose-core/cmd"
info "github.com/streamingfast/firehose-core/firehose/info"
)

func main() {
Expand All @@ -17,6 +18,7 @@ func main() {
Version: version,
BlockFactory: func() firecore.Block { return new(pbbstream.Block) },
ConsoleReaderFactory: firecore.NewConsoleReader,
InfoResponseFiller: info.DefaultInfoResponseFiller,
Tools: &firecore.ToolsConfig[*pbbstream.Block]{},
})
}
Expand Down
16 changes: 16 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/streamingfast/firehose-core/cmd/tools"
"github.com/streamingfast/firehose-core/launcher"
paymentGatewayMetering "github.com/streamingfast/payment-gateway/metering"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"

"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand Down Expand Up @@ -171,6 +172,21 @@ func registerCommonFlags[B firecore.Block](chain *firecore.Chain[B]) {
cmd.Flags().String("common-forked-blocks-store-url", firecore.ForkedBlocksStoreURL, "[COMMON] Store URL where to read/write forked block files that we want to keep.")
cmd.Flags().String("common-live-blocks-addr", firecore.RelayerServingAddr, "[COMMON] gRPC endpoint to get real-time blocks.")

cmd.Flags().String("advertise-chain-name", "", "[firehose,substreams-tier1] Chain name to advertise in the Info Endpoint. Required but it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-chain-aliases", nil, "[firehose,substreams-tier1] List of chain name aliases to advertise in the Info Endpoint. If unset, it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-block-features", nil, "[firehose,substreams-tier1] List of block features to advertise in the Info Endpoint. If unset, it may be inferred from the genesis block.")
cmd.Flags().Bool("ignore-advertise-validation", false, "[firehose,substreams-tier1] When true, runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail.")

acceptedEncodings := make([]string, len(pbfirehose.InfoResponse_BlockIdEncoding_value)-1)
i := 0
for encoding := range pbfirehose.InfoResponse_BlockIdEncoding_value {
if encoding != "BLOCK_ID_ENCODING_UNSET" {
acceptedEncodings[i] = encoding
i++
}
}
cmd.Flags().String("advertise-block-id-encoding", "", fmt.Sprintf("[firehose,substreams-tier1] Block ID encoding type to advertise in the Info Endpoint (%s). If unset, it may be inferred from the genesis block.", strings.Join(acceptedEncodings, ", ")))

cmd.Flags().String("common-index-store-url", firecore.IndexStoreURL, "[COMMON] Store URL where to read/write index files (if used on the chain).")
cmd.Flags().IntSlice("common-index-block-sizes", []int{100000, 10000, 1000, 100}, "Index bundle sizes that that are considered valid when looking for block indexes")

Expand Down
4 changes: 4 additions & 0 deletions cmd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func setupCmd(cmd *cobra.Command, binaryName string) error {
return fmt.Errorf("invalid flag %s in config file under command %s", k, subCommand)
}

// Keep compatibility with config files, allow empty value to unset the flag
if v == nil {
v = ""
}
viper.SetDefault(flag.viperKey, v)

// For root command, we want to keep compatibility for `viper.GetXXX("global-<flag>")` to work with config loaded value
Expand Down
9 changes: 9 additions & 0 deletions firehose/app/firehose/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/firehose"
"github.com/streamingfast/firehose-core/firehose/info"
"github.com/streamingfast/firehose-core/firehose/metrics"
"github.com/streamingfast/firehose-core/firehose/server"
"github.com/streamingfast/logging"
Expand Down Expand Up @@ -64,6 +65,7 @@ type Modules struct {
TransformRegistry *transform.Registry
RegisterServiceExtension RegisterServiceExtensionFunc
CheckPendingShutdown func() bool
InfoServer *info.InfoServer
}

type App struct {
Expand Down Expand Up @@ -158,6 +160,7 @@ func (a *App) Run() error {
a.IsReady,
a.config.GRPCListenAddr,
a.config.ServiceDiscoveryURL,
a.modules.InfoServer,
a.config.ServerOptions...,
)

Expand Down Expand Up @@ -186,6 +189,12 @@ func (a *App) Run() error {
}
}

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
if err := a.modules.InfoServer.Init(ctx, forkableHub, mergedBlocksStore, oneBlocksStore, a.logger); err != nil {
a.Shutdown(fmt.Errorf("cannot initialize info server: %w", err))
}

a.logger.Info("launching gRPC firehoseServer", zap.Bool("live_support", withLive))
a.isReady.CAS(false, true)
firehoseServer.Launch()
Expand Down
Loading
Loading