Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add sf.firehose.v2.EndpointInfo/Info endpoint and logic #60

Merged
merged 7 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
.vscode
/build
/dist
.envrc
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s

## Unreleased

* Add `sf.firehose.v2.EndpointInfo/Info` service on Firehose and `sf.substreams.rpc.v2.EndpointInfo/Info` to Substreams endpoints. This involves the following new flags:
sduchesneau marked this conversation as resolved.
Show resolved Hide resolved
- `advertise-chain-name` Canonical name of the chain according to https://thegraph.com/docs/en/developing/supported-networks/ (required, unless it is in the "well-known" list)
- `advertise-chain-aliases` Alternate names for that chain (optional)
- `advertise-block-features` List of features describing the blocks (optional)
- `advertise-block-id-encoding` Encoding format of the block ID [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_BASE64URL, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] (required, unless the block type is in the "well-known" list)

* Add a well-known list of chains (hard-coded in `wellknown/chains.go` to help automatically determine the 'advertise' flag values)
* The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also talk about how a well-known chain is determined, what checks we perform to infer it.


* Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module.
* Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling

Expand Down
9 changes: 9 additions & 0 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"runtime/debug"
"strings"

pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"github.com/streamingfast/substreams/wasm"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -154,6 +155,10 @@ type Chain[B Block] struct {
DefaultBlockType string

RegisterSubstreamsExtensions func() (wasm.WASMExtensioner, error)

// InfoResponseFiller is a function that fills the `pbfirehose.InfoResponse` from the first streamable block of the chain.
// It can validate that we are on the right chain by checking against a known hash, or populate missing fields.
InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse) error
}

type ToolsConfig[B Block] struct {
Expand Down Expand Up @@ -261,6 +266,10 @@ func (c *Chain[B]) Validate() {
err = multierr.Append(err, fmt.Errorf("field 'BlockIndexerFactories' must have at most one element"))
}

if c.InfoResponseFiller == nil {
err = multierr.Append(err, fmt.Errorf("field 'InfoResponseFiller' must be set"))
}

for key, indexerFactory := range c.BlockIndexerFactories {
if indexerFactory == nil {
err = multierr.Append(err, fmt.Errorf("entry %q for field 'BlockIndexerFactories' must be non-nil", key))
Expand Down
1 change: 1 addition & 0 deletions cmd/apps/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *za
HeadBlockNumberMetric: headBlockNumMetric,
TransformRegistry: registry,
CheckPendingShutdown: runtime.IsPendingShutdown,
InfoServer: runtime.InfoServer,
}), nil
},
})
Expand Down
23 changes: 20 additions & 3 deletions cmd/apps/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dmetering"
firecore "github.com/streamingfast/firehose-core"
info "github.com/streamingfast/firehose-core/firehose/info"
"github.com/streamingfast/firehose-core/launcher"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
tracing "github.com/streamingfast/sf-tracing"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand All @@ -45,7 +47,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st
configFile := sflags.MustGetString(cmd, "config-file")
rootLog.Info(fmt.Sprintf("starting Firehose on %s with config file '%s'", chain.LongName, configFile))

err = start(cmd, dataDir, args, rootLog)
err = start(cmd, dataDir, args, chain, rootLog)
if err != nil {
return fmt.Errorf("unable to launch: %w", err)
}
Expand All @@ -55,7 +57,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st
}
}

func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logger) (err error) {
func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string, chain *firecore.Chain[B], rootLog *zap.Logger) (err error) {
dataDirAbs, err := filepath.Abs(dataDir)
if err != nil {
return fmt.Errorf("unable to setup directory structure: %w", err)
Expand All @@ -82,7 +84,22 @@ func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logge
}()
dmetering.SetDefaultEmitter(eventEmitter)

launch := launcher.NewLauncher(rootLog, dataDirAbs)
blockIDEncoding := pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET
if enc := sflags.MustGetString(cmd, "advertise-block-id-encoding"); enc != "" {
blockIDEncoding = pbfirehose.InfoResponse_BlockIdEncoding(pbfirehose.InfoResponse_BlockIdEncoding_value[enc])
}

infoServer := info.NewInfoServer(
sflags.MustGetString(cmd, "advertise-chain-name"),
sflags.MustGetStringSlice(cmd, "advertise-chain-aliases"),
blockIDEncoding,
sflags.MustGetStringSlice(cmd, "advertise-block-features"),
bstream.GetProtocolFirstStreamableBlock,
chain.InfoResponseFiller,
rootLog,
)

launch := launcher.NewLauncher(rootLog, dataDirAbs, infoServer)
rootLog.Debug("launcher created")

runByDefault := func(app string) bool {
Expand Down
1 change: 1 addition & 0 deletions cmd/apps/substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
HeadTimeDriftMetric: ss1HeadTimeDriftmetric,
HeadBlockNumberMetric: ss1HeadBlockNumMetric,
CheckPendingShutDown: runtime.IsPendingShutdown,
InfoServer: runtime.InfoServer,
}), nil
},
})
Expand Down
2 changes: 2 additions & 0 deletions cmd/firecore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
firecore "github.com/streamingfast/firehose-core"
fhCMD "github.com/streamingfast/firehose-core/cmd"
info "github.com/streamingfast/firehose-core/firehose/info"
)

func main() {
Expand All @@ -17,6 +18,7 @@ func main() {
Version: version,
BlockFactory: func() firecore.Block { return new(pbbstream.Block) },
ConsoleReaderFactory: firecore.NewConsoleReader,
InfoResponseFiller: info.DefaultInfoResponseFiller,
Tools: &firecore.ToolsConfig[*pbbstream.Block]{},
})
}
Expand Down
15 changes: 15 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/streamingfast/firehose-core/cmd/tools"
"github.com/streamingfast/firehose-core/launcher"
paymentGatewayMetering "github.com/streamingfast/payment-gateway/metering"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"

"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand Down Expand Up @@ -171,6 +172,20 @@ func registerCommonFlags[B firecore.Block](chain *firecore.Chain[B]) {
cmd.Flags().String("common-forked-blocks-store-url", firecore.ForkedBlocksStoreURL, "[COMMON] Store URL where to read/write forked block files that we want to keep.")
cmd.Flags().String("common-live-blocks-addr", firecore.RelayerServingAddr, "[COMMON] gRPC endpoint to get real-time blocks.")

cmd.Flags().String("advertise-chain-name", "", "[firehose,substreams-tier1] Chain name to advertise in the Info Endpoint. Required but it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-chain-aliases", nil, "[firehose,substreams-tier1] List of chain name aliases to advertise in the Info Endpoint. If unset, it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-block-features", nil, "[firehose,substreams-tier1] List of block features to advertise in the Info Endpoint. If unset, it may be inferred from the genesis block.")

acceptedEncodings := make([]string, len(pbfirehose.InfoResponse_BlockIdEncoding_value)-1)
i := 0
for encoding := range pbfirehose.InfoResponse_BlockIdEncoding_value {
if encoding != "BLOCK_ID_ENCODING_UNSET" {
acceptedEncodings[i] = encoding
i++
}
}
cmd.Flags().String("advertise-block-id-encoding", "", fmt.Sprintf("[firehose,substreams-tier1] Block ID encoding type to advertise in the Info Endpoint (%s). If unset, it may be inferred from the genesis block.", strings.Join(acceptedEncodings, ", ")))

cmd.Flags().String("common-index-store-url", firecore.IndexStoreURL, "[COMMON] Store URL where to read/write index files (if used on the chain).")
cmd.Flags().IntSlice("common-index-block-sizes", []int{100000, 10000, 1000, 100}, "Index bundle sizes that that are considered valid when looking for block indexes")

Expand Down
11 changes: 11 additions & 0 deletions firehose/app/firehose/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/firehose"
"github.com/streamingfast/firehose-core/firehose/info"
"github.com/streamingfast/firehose-core/firehose/metrics"
"github.com/streamingfast/firehose-core/firehose/server"
"github.com/streamingfast/logging"
Expand Down Expand Up @@ -64,6 +65,7 @@ type Modules struct {
TransformRegistry *transform.Registry
RegisterServiceExtension RegisterServiceExtensionFunc
CheckPendingShutdown func() bool
InfoServer *info.InfoServer
}

type App struct {
Expand Down Expand Up @@ -158,6 +160,7 @@ func (a *App) Run() error {
a.IsReady,
a.config.GRPCListenAddr,
a.config.ServiceDiscoveryURL,
a.modules.InfoServer,
a.config.ServerOptions...,
)

Expand Down Expand Up @@ -186,6 +189,14 @@ func (a *App) Run() error {
}
}

go func() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
if err := a.modules.InfoServer.Init(ctx, forkableHub, mergedBlocksStore, oneBlocksStore, a.logger); err != nil {
a.Shutdown(fmt.Errorf("cannot initialize info server: %w", err))
}
}()

a.logger.Info("launching gRPC firehoseServer", zap.Bool("live_support", withLive))
a.isReady.CAS(false, true)
firehoseServer.Launch()
Expand Down
Loading
Loading