From 51b2c612ed2818461b4566bacbbe0d65c5c1297d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Wed, 21 Aug 2024 16:22:18 -0400 Subject: [PATCH 1/7] add sf.firehose.v2.EndpointInfo/Info endpoint and logic --- .gitignore | 1 + CHANGELOG.md | 8 ++ chain.go | 9 ++ cmd/apps/firehose.go | 1 + cmd/apps/start.go | 22 ++++- cmd/firecore/main.go | 2 + cmd/main.go | 15 +++ firehose/app/firehose/app.go | 11 +++ firehose/info/endpoint_info.go | 165 +++++++++++++++++++++++++++++++++ firehose/info/info_filler.go | 60 ++++++++++++ firehose/server/server.go | 3 + go.mod | 2 +- go.sum | 4 +- launcher/launcher.go | 4 +- launcher/runtime.go | 3 + 15 files changed, 303 insertions(+), 7 deletions(-) create mode 100644 firehose/info/endpoint_info.go create mode 100644 firehose/info/info_filler.go diff --git a/.gitignore b/.gitignore index 85d77f0..c6d0140 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea +.vscode /build /dist .envrc diff --git a/CHANGELOG.md b/CHANGELOG.md index 1eb4f6c..196cb37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,14 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s ## Unreleased +* Add `sf.firehose.v2.EndpointInfo/Info` service on Firehose and Substreams endpoints. This involves the following new flags: + - `advertise-chain-name` Canonical name of the chain, from the list here: https://thegraph.com/docs/en/developing/supported-networks/ (required) + - `advertise-chain-aliases` Alternate names for that chain (optional) + - `advertise-block-features` Only required for ethereum blocks, automatically discovered if run from `firehose-ethereum` program + - `advertise-block-id-encoding` Required, one of [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] + +* The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service. + * Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module. * Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling diff --git a/chain.go b/chain.go index 789c6e5..09bac18 100644 --- a/chain.go +++ b/chain.go @@ -6,6 +6,7 @@ import ( "runtime/debug" "strings" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "github.com/streamingfast/substreams/wasm" "github.com/spf13/cobra" @@ -154,6 +155,10 @@ type Chain[B Block] struct { DefaultBlockType string RegisterSubstreamsExtensions func() (wasm.WASMExtensioner, error) + + // InfoResponseFiller is a function that fills the `pbfirehose.InfoResponse` from the first streamable block of the chain. + // It can validate that we are on the right chain by checking against a known hash, or populate missing fields. + InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse) error } type ToolsConfig[B Block] struct { @@ -261,6 +266,10 @@ func (c *Chain[B]) Validate() { err = multierr.Append(err, fmt.Errorf("field 'BlockIndexerFactories' must have at most one element")) } + if c.InfoResponseFiller == nil { + err = multierr.Append(err, fmt.Errorf("field 'InfoResponseFiller' must be set")) + } + for key, indexerFactory := range c.BlockIndexerFactories { if indexerFactory == nil { err = multierr.Append(err, fmt.Errorf("entry %q for field 'BlockIndexerFactories' must be non-nil", key)) diff --git a/cmd/apps/firehose.go b/cmd/apps/firehose.go index 4585299..de38dee 100644 --- a/cmd/apps/firehose.go +++ b/cmd/apps/firehose.go @@ -101,6 +101,7 @@ func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *za HeadBlockNumberMetric: headBlockNumMetric, TransformRegistry: registry, CheckPendingShutdown: runtime.IsPendingShutdown, + InfoServer: runtime.InfoServer, }), nil }, }) diff --git a/cmd/apps/start.go b/cmd/apps/start.go index b191290..2a3d551 100644 --- a/cmd/apps/start.go +++ b/cmd/apps/start.go @@ -26,7 +26,9 @@ import ( "github.com/streamingfast/cli/sflags" "github.com/streamingfast/dmetering" firecore "github.com/streamingfast/firehose-core" + info "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/firehose-core/launcher" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" tracing "github.com/streamingfast/sf-tracing" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -45,7 +47,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st configFile := sflags.MustGetString(cmd, "config-file") rootLog.Info(fmt.Sprintf("starting Firehose on %s with config file '%s'", chain.LongName, configFile)) - err = start(cmd, dataDir, args, rootLog) + err = start(cmd, dataDir, args, chain, rootLog) if err != nil { return fmt.Errorf("unable to launch: %w", err) } @@ -55,7 +57,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st } } -func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logger) (err error) { +func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string, chain *firecore.Chain[B], rootLog *zap.Logger) (err error) { dataDirAbs, err := filepath.Abs(dataDir) if err != nil { return fmt.Errorf("unable to setup directory structure: %w", err) @@ -82,7 +84,21 @@ func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logge }() dmetering.SetDefaultEmitter(eventEmitter) - launch := launcher.NewLauncher(rootLog, dataDirAbs) + blockIDEncoding := pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET + if enc := sflags.MustGetString(cmd, "advertise-block-id-encoding"); enc != "" { + blockIDEncoding = pbfirehose.InfoResponse_BlockIdEncoding(pbfirehose.InfoResponse_BlockIdEncoding_value[enc]) + } + + infoServer := info.NewInfoServer( + sflags.MustGetString(cmd, "advertise-chain-name"), + sflags.MustGetStringSlice(cmd, "advertise-chain-aliases"), + blockIDEncoding, + sflags.MustGetStringSlice(cmd, "advertise-block-features"), + bstream.GetProtocolFirstStreamableBlock, + chain.InfoResponseFiller, + ) + + launch := launcher.NewLauncher(rootLog, dataDirAbs, infoServer) rootLog.Debug("launcher created") runByDefault := func(app string) bool { diff --git a/cmd/firecore/main.go b/cmd/firecore/main.go index 3eb14cf..be062c7 100644 --- a/cmd/firecore/main.go +++ b/cmd/firecore/main.go @@ -4,6 +4,7 @@ import ( pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" firecore "github.com/streamingfast/firehose-core" fhCMD "github.com/streamingfast/firehose-core/cmd" + info "github.com/streamingfast/firehose-core/firehose/info" ) func main() { @@ -17,6 +18,7 @@ func main() { Version: version, BlockFactory: func() firecore.Block { return new(pbbstream.Block) }, ConsoleReaderFactory: firecore.NewConsoleReader, + InfoResponseFiller: info.DefaultInfoResponseFiller, Tools: &firecore.ToolsConfig[*pbbstream.Block]{}, }) } diff --git a/cmd/main.go b/cmd/main.go index 7c0793d..cf1c713 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -23,6 +23,7 @@ import ( "github.com/streamingfast/firehose-core/cmd/tools" "github.com/streamingfast/firehose-core/launcher" paymentGatewayMetering "github.com/streamingfast/payment-gateway/metering" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "github.com/streamingfast/logging" "go.uber.org/zap" @@ -171,6 +172,20 @@ func registerCommonFlags[B firecore.Block](chain *firecore.Chain[B]) { cmd.Flags().String("common-forked-blocks-store-url", firecore.ForkedBlocksStoreURL, "[COMMON] Store URL where to read/write forked block files that we want to keep.") cmd.Flags().String("common-live-blocks-addr", firecore.RelayerServingAddr, "[COMMON] gRPC endpoint to get real-time blocks.") + cmd.Flags().String("advertise-chain-name", "", "[firehose,substreams-tier1] Chain name to advertise in the Info Endpoint. Required but it may be inferred from the genesis blocks.") + cmd.Flags().StringSlice("advertise-chain-aliases", nil, "[firehose,substreams-tier1] List of chain name aliases to advertise in the Info Endpoint. If unset, it may be inferred from the genesis blocks.") + cmd.Flags().StringSlice("advertise-block-features", nil, "[firehose,substreams-tier1] List of block features to advertise in the Info Endpoint. If unset, it may be inferred from the genesis block.") + + acceptedEncodings := make([]string, len(pbfirehose.InfoResponse_BlockIdEncoding_value)-1) + i := 0 + for encoding := range pbfirehose.InfoResponse_BlockIdEncoding_value { + if encoding != "BLOCK_ID_ENCODING_UNSET" { + acceptedEncodings[i] = encoding + i++ + } + } + cmd.Flags().String("advertise-block-id-encoding", "", fmt.Sprintf("[firehose,substreams-tier1] Block ID encoding type to advertise in the Info Endpoint (%s). If unset, it may be inferred from the genesis block.", strings.Join(acceptedEncodings, ", "))) + cmd.Flags().String("common-index-store-url", firecore.IndexStoreURL, "[COMMON] Store URL where to read/write index files (if used on the chain).") cmd.Flags().IntSlice("common-index-block-sizes", []int{100000, 10000, 1000, 100}, "Index bundle sizes that that are considered valid when looking for block indexes") diff --git a/firehose/app/firehose/app.go b/firehose/app/firehose/app.go index aa02b52..f9b57c4 100644 --- a/firehose/app/firehose/app.go +++ b/firehose/app/firehose/app.go @@ -31,6 +31,7 @@ import ( "github.com/streamingfast/dstore" firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-core/firehose" + "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/firehose-core/firehose/metrics" "github.com/streamingfast/firehose-core/firehose/server" "github.com/streamingfast/logging" @@ -64,6 +65,7 @@ type Modules struct { TransformRegistry *transform.Registry RegisterServiceExtension RegisterServiceExtensionFunc CheckPendingShutdown func() bool + InfoServer *info.InfoServer } type App struct { @@ -158,6 +160,7 @@ func (a *App) Run() error { a.IsReady, a.config.GRPCListenAddr, a.config.ServiceDiscoveryURL, + a.modules.InfoServer, a.config.ServerOptions..., ) @@ -186,6 +189,14 @@ func (a *App) Run() error { } } + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + if err := a.modules.InfoServer.Init(ctx, forkableHub, mergedBlocksStore, oneBlocksStore, a.logger); err != nil { + a.Shutdown(fmt.Errorf("cannot initialize info server: %w", err)) + } + }() + a.logger.Info("launching gRPC firehoseServer", zap.Bool("live_support", withLive)) a.isReady.CAS(false, true) firehoseServer.Launch() diff --git a/firehose/info/endpoint_info.go b/firehose/info/endpoint_info.go new file mode 100644 index 0000000..d1c76eb --- /dev/null +++ b/firehose/info/endpoint_info.go @@ -0,0 +1,165 @@ +package info + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/streamingfast/bstream" + "github.com/streamingfast/bstream/hub" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dstore" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" + "go.uber.org/zap" +) + +type InfoServer struct { + responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error + response *pbfirehose.InfoResponse + ready chan struct{} + once sync.Once +} + +func (s *InfoServer) Info(ctx context.Context, request *pbfirehose.InfoRequest) (*pbfirehose.InfoResponse, error) { + select { + case <-s.ready: + return s.response, nil + default: + return nil, fmt.Errorf("info server not ready") + } +} + +func NewInfoServer( + chainName string, + chainNameAliases []string, + blockIDEncoding pbfirehose.InfoResponse_BlockIdEncoding, + blockFeatures []string, + firstStreamableBlock uint64, + responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error, +) *InfoServer { + + resp := &pbfirehose.InfoResponse{ + ChainName: chainName, + ChainNameAliases: chainNameAliases, + BlockIdEncoding: blockIDEncoding, + BlockFeatures: blockFeatures, + FirstStreamableBlockNum: firstStreamableBlock, + } + + return &InfoServer{ + responseFiller: responseFiller, + response: resp, + ready: make(chan struct{}), + } +} + +func validateInfoResponse(resp *pbfirehose.InfoResponse) error { + switch { + case resp.ChainName == "": + return fmt.Errorf("chain name is not set") + case resp.BlockIdEncoding == pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET: + return fmt.Errorf("block id encoding is not set") + case resp.FirstStreamableBlockId == "": + return fmt.Errorf("first streamable block id is not set") + } + + return nil +} + +// multiple apps (firehose, substreams...) can initialize the same server, we only need one +func (s *InfoServer) Init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) (err error) { + s.once.Do(func() { err = s.init(ctx, fhub, mergedBlocksStore, oneBlockStore, logger) }) + return +} + +func (s *InfoServer) getBlockFromMergedBlocksStore(ctx context.Context, blockNum uint64, mergedBlocksStore dstore.Store) *pbbstream.Block { + for { + if ctx.Err() != nil { + return nil + } + + block, err := bstream.FetchBlockFromMergedBlocksStore(ctx, blockNum, mergedBlocksStore) + if err != nil { + continue + } + return block + } +} + +func (s *InfoServer) getBlockFromForkableHub(ctx context.Context, blockNum uint64, forkableHub *hub.ForkableHub) *pbbstream.Block { + for { + if ctx.Err() != nil { + return nil + } + + block := forkableHub.GetBlock(s.response.FirstStreamableBlockNum, "") + if block == nil { + time.Sleep(time.Millisecond * 500) + continue + } + return block + } + +} + +func (s *InfoServer) getBlockFromOneBlockStore(ctx context.Context, blockNum uint64, oneBlockStore dstore.Store) *pbbstream.Block { + for { + if ctx.Err() != nil { + return nil + } + + block, err := bstream.FetchBlockFromOneBlockStore(ctx, blockNum, "", oneBlockStore) + if err != nil { + continue + } + return block + } +} + +// init tries to fetch the first streamable block from the different sources and fills the response with it +// returns an error if it is incomplete +func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + ch := make(chan *pbbstream.Block) + + if fhub != nil { + go func() { + select { + case ch <- s.getBlockFromForkableHub(ctx, s.response.FirstStreamableBlockNum, fhub): + case <-ctx.Done(): + } + }() + } + + go func() { + select { + case ch <- s.getBlockFromMergedBlocksStore(ctx, s.response.FirstStreamableBlockNum, mergedBlocksStore): + case <-ctx.Done(): + } + }() + + go func() { + select { + case ch <- s.getBlockFromOneBlockStore(ctx, s.response.FirstStreamableBlockNum, oneBlockStore): + case <-ctx.Done(): + } + }() + + select { + case blk := <-ch: + if err := s.responseFiller(blk, s.response); err != nil { + return err + } + case <-ctx.Done(): + } + + if err := validateInfoResponse(s.response); err != nil { + return err + } + + close(s.ready) + return nil +} diff --git a/firehose/info/info_filler.go b/firehose/info/info_filler.go new file mode 100644 index 0000000..674853f --- /dev/null +++ b/firehose/info/info_filler.go @@ -0,0 +1,60 @@ +package info + +import ( + "fmt" + + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" +) + +var DefaultInfoResponseFiller = func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { + resp.FirstStreamableBlockId = block.Id + + switch block.Payload.TypeUrl { + case "type.googleapis.com/sf.antelope.type.v1.Block": + return fillInfoResponseForAntelope(block, resp) + + case "type.googleapis.com/sf.ethereum.type.v2.Block": + return fillInfoResponseForEthereum(block, resp) + + case "type.googleapis.com/sf.cosmos.type.v1.Block": + return fillInfoResponseForCosmos(block, resp) + + case "type.googleapis.com/sf.solana.type.v1.Block": + return fillInfoResponseForSolana(block, resp) + } + + return nil +} + +// this is a simple helper, a full implementation would live in github.com/streamingfast/firehose-ethereum +func fillInfoResponseForEthereum(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { + resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX + var seenBlockType bool + for _, feature := range resp.BlockFeatures { + if feature == "extended" || feature == "base" || feature == "hybrid" { + seenBlockType = true + break + } + } + if !seenBlockType { + return fmt.Errorf("invalid block features, missing 'base', 'extended' or 'hybrid'") + } + return nil +} + +// this is a simple helper, a full implementation would live in github.com/pinax-network/firehose-antelope +func fillInfoResponseForAntelope(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { + resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX + return nil +} + +func fillInfoResponseForCosmos(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { + resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX + return nil +} + +func fillInfoResponseForSolana(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { + resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_BASE58 + return nil +} diff --git a/firehose/server/server.go b/firehose/server/server.go index 0a9a208..f4b52c4 100644 --- a/firehose/server/server.go +++ b/firehose/server/server.go @@ -16,6 +16,7 @@ import ( "github.com/streamingfast/dmetrics" firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-core/firehose" + "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/firehose-core/firehose/rate" pbfirehoseV1 "github.com/streamingfast/pbgo/sf/firehose/v1" pbfirehoseV2 "github.com/streamingfast/pbgo/sf/firehose/v2" @@ -61,6 +62,7 @@ func New( isReady func(context.Context) bool, listenAddr string, serviceDiscoveryURL *url.URL, + infoServer *info.InfoServer, opts ...Option, ) *Server { initFunc := func(ctx context.Context, _ *pbfirehoseV2.Request) context.Context { @@ -140,6 +142,7 @@ func New( if blockGetter != nil { pbfirehoseV2.RegisterFetchServer(gs, s) } + pbfirehoseV2.RegisterEndpointInfoServer(gs, infoServer) pbfirehoseV2.RegisterStreamServer(gs, s) pbfirehoseV1.RegisterStreamServer(gs, NewFirehoseProxyV1ToV2(s)) // compatibility with firehose }) diff --git a/go.mod b/go.mod index 893fcf8..6e64aa2 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 - github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d + github.com/streamingfast/pbgo v0.0.6-0.20240821201153-468db4096ff0 github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 github.com/streamingfast/substreams v1.9.4-0.20240812210000-635f7bcba6cf github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 6b6666c..df9d1e2 100644 --- a/go.sum +++ b/go.sum @@ -572,8 +572,8 @@ github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef h1:9IVFHR github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef/go.mod h1:cq8CvbZ3ioFmGrHokSAJalS0lC+pVXLKhITScItUGXY= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 h1:bliib3pAObbM+6cKYQFa8axbCY/x6RczQZrOxdM7OZA= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2/go.mod h1:DsnLrpKZ3DIDL6FmYVuxbC44fXvQdY7aCdSLMpbqZ8Q= -github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d h1:rgXXfBFlQ9C8casyay7UL53VSGR6JoUnhqGv4h6lhxM= -github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= +github.com/streamingfast/pbgo v0.0.6-0.20240821201153-468db4096ff0 h1:kfzU2GvvWt+RQtrHvfqv7zdRA4xv/3AusccIGG3+roM= +github.com/streamingfast/pbgo v0.0.6-0.20240821201153-468db4096ff0/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d h1:33VIARqUqBUKXJcuQoOS1rVSms54tgxhhNCmrLptpLg= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d/go.mod h1:aBJivEdekmFWYSQ29EE/fN9IanJWJXbtjy3ky0XD/jE= github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90 h1:94HllkX4ttYVilo8ZJv05b5z8JiMmqBvv4+Jdgk/+2A= diff --git a/launcher/launcher.go b/launcher/launcher.go index 4c8f086..bffca60 100644 --- a/launcher/launcher.go +++ b/launcher/launcher.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/shutter" "go.uber.org/atomic" "go.uber.org/zap" @@ -42,7 +43,7 @@ type Launcher struct { logger *zap.Logger } -func NewLauncher(logger *zap.Logger, absDataDir string) *Launcher { +func NewLauncher(logger *zap.Logger, absDataDir string, infoServer *info.InfoServer) *Launcher { l := &Launcher{ shutter: shutter.New(), apps: make(map[string]App), @@ -53,6 +54,7 @@ func NewLauncher(logger *zap.Logger, absDataDir string) *Launcher { l.runtime = &Runtime{ AbsDataDir: absDataDir, + InfoServer: infoServer, IsPendingShutdown: func() bool { return l.hasBeenSignaled.Load() }, diff --git a/launcher/runtime.go b/launcher/runtime.go index a5df1b3..271189e 100644 --- a/launcher/runtime.go +++ b/launcher/runtime.go @@ -1,7 +1,10 @@ package launcher +import "github.com/streamingfast/firehose-core/firehose/info" + type Runtime struct { AbsDataDir string + InfoServer *info.InfoServer // IsPendingShutdown is a function that is going to return true as soon as the initial SIGINT signal is // received which can be used to turn a healthz monitor as unhealthy so that a load balancer can From a6f3c439557b76e6459cbd756c14a3fec27ccc2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Fri, 23 Aug 2024 11:23:35 -0400 Subject: [PATCH 2/7] rework well-known chains and info endpoint discovery mechanism --- CHANGELOG.md | 7 +- cmd/apps/start.go | 1 + firehose/info/endpoint_info.go | 18 ++ firehose/info/info_filler.go | 66 +++----- go.mod | 2 +- go.sum | 4 +- proto/generator/generator.go | 17 +- well-known/chains.go | 297 +++++++++++++++++++++++++++++++++ 8 files changed, 350 insertions(+), 62 deletions(-) create mode 100644 well-known/chains.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 196cb37..fc13de8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,11 +11,12 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s ## Unreleased * Add `sf.firehose.v2.EndpointInfo/Info` service on Firehose and Substreams endpoints. This involves the following new flags: - - `advertise-chain-name` Canonical name of the chain, from the list here: https://thegraph.com/docs/en/developing/supported-networks/ (required) + - `advertise-chain-name` Canonical name of the chain according to https://thegraph.com/docs/en/developing/supported-networks/ (required, unless it is in the "well-known" list) - `advertise-chain-aliases` Alternate names for that chain (optional) - - `advertise-block-features` Only required for ethereum blocks, automatically discovered if run from `firehose-ethereum` program - - `advertise-block-id-encoding` Required, one of [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] + - `advertise-block-features` List of features describing the blocks (optional) + - `advertise-block-id-encoding` Encoding format of the block ID [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_BASE64URL, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] (required, unless the block type is in the "well-known" list) +* Add a well-known list of chains (hard-coded in `wellknown/chains.go` to help automatically determine the 'advertise' flag values) * The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service. * Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module. diff --git a/cmd/apps/start.go b/cmd/apps/start.go index 2a3d551..9af3fd9 100644 --- a/cmd/apps/start.go +++ b/cmd/apps/start.go @@ -96,6 +96,7 @@ func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string, sflags.MustGetStringSlice(cmd, "advertise-block-features"), bstream.GetProtocolFirstStreamableBlock, chain.InfoResponseFiller, + rootLog, ) launch := launcher.NewLauncher(rootLog, dataDirAbs, infoServer) diff --git a/firehose/info/endpoint_info.go b/firehose/info/endpoint_info.go index d1c76eb..7b09feb 100644 --- a/firehose/info/endpoint_info.go +++ b/firehose/info/endpoint_info.go @@ -19,6 +19,7 @@ type InfoServer struct { response *pbfirehose.InfoResponse ready chan struct{} once sync.Once + logger *zap.Logger } func (s *InfoServer) Info(ctx context.Context, request *pbfirehose.InfoRequest) (*pbfirehose.InfoResponse, error) { @@ -37,6 +38,7 @@ func NewInfoServer( blockFeatures []string, firstStreamableBlock uint64, responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error, + logger *zap.Logger, ) *InfoServer { resp := &pbfirehose.InfoResponse{ @@ -51,6 +53,7 @@ func NewInfoServer( responseFiller: responseFiller, response: resp, ready: make(chan struct{}), + logger: logger, } } @@ -148,6 +151,21 @@ func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc } }() + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Second): + logger.Warn("waiting to read the first_streamable_block before starting firehose/substreams endpoints", + zap.Uint64("first_streamable_block", s.response.FirstStreamableBlockNum), + zap.Stringer("merged_blocks_store", mergedBlocksStore.BaseURL()), // , zap.String("one_block_store", oneBlockStore.String()) + zap.Stringer("one_block_store", oneBlockStore.BaseURL()), // , zap.String("one_block_store", oneBlockStore.String()) + ) + } + } + }() + select { case blk := <-ch: if err := s.responseFiller(blk, s.response); err != nil { diff --git a/firehose/info/info_filler.go b/firehose/info/info_filler.go index 674853f..73932f0 100644 --- a/firehose/info/info_filler.go +++ b/firehose/info/info_filler.go @@ -4,57 +4,39 @@ import ( "fmt" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + wellknown "github.com/streamingfast/firehose-core/well-known" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" ) -var DefaultInfoResponseFiller = func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { - resp.FirstStreamableBlockId = block.Id - - switch block.Payload.TypeUrl { - case "type.googleapis.com/sf.antelope.type.v1.Block": - return fillInfoResponseForAntelope(block, resp) - - case "type.googleapis.com/sf.ethereum.type.v2.Block": - return fillInfoResponseForEthereum(block, resp) - - case "type.googleapis.com/sf.cosmos.type.v1.Block": - return fillInfoResponseForCosmos(block, resp) - - case "type.googleapis.com/sf.solana.type.v1.Block": - return fillInfoResponseForSolana(block, resp) +var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse) error { + resp.FirstStreamableBlockId = firstStreamableBlock.Id + + if resp.ChainName != "" { + if chain := wellknown.WellKnownProtocols.ChainByName(resp.ChainName); chain != nil { + if firstStreamableBlock.Number == chain.GenesisBlockNumber && chain.GenesisBlockID != firstStreamableBlock.Id { // we don't check if the firstStreamableBlock is something other than our well-known genesis block + return fmt.Errorf("chain name defined in flag: %q inconsistent with the genesis block ID %q (expected: %q)", resp.ChainName, ox(firstStreamableBlock.Id), ox(chain.GenesisBlockID)) + } + resp.ChainName = chain.Name + resp.ChainNameAliases = chain.Aliases + } else if chain := wellknown.WellKnownProtocols.ChainByGenesisBlock(firstStreamableBlock.Number, firstStreamableBlock.Id); chain != nil { + return fmt.Errorf("chain name defined in flag: %q inconsistent with the one discovered from genesis block %q", resp.ChainName, chain.Name) + } + } else { + if chain := wellknown.WellKnownProtocols.ChainByGenesisBlock(firstStreamableBlock.Number, firstStreamableBlock.Id); chain != nil { + resp.ChainName = chain.Name + resp.ChainNameAliases = chain.Aliases + } } - return nil -} - -// this is a simple helper, a full implementation would live in github.com/streamingfast/firehose-ethereum -func fillInfoResponseForEthereum(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { - resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX - var seenBlockType bool - for _, feature := range resp.BlockFeatures { - if feature == "extended" || feature == "base" || feature == "hybrid" { - seenBlockType = true + for _, protocol := range wellknown.WellKnownProtocols { + if protocol.BlockType == firstStreamableBlock.Payload.TypeUrl { + resp.BlockIdEncoding = protocol.BytesEncoding break } } - if !seenBlockType { - return fmt.Errorf("invalid block features, missing 'base', 'extended' or 'hybrid'") - } - return nil -} - -// this is a simple helper, a full implementation would live in github.com/pinax-network/firehose-antelope -func fillInfoResponseForAntelope(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { - resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX return nil } -func fillInfoResponseForCosmos(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { - resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX - return nil -} - -func fillInfoResponseForSolana(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { - resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_BASE58 - return nil +func ox(s string) string { + return "0x" + s } diff --git a/go.mod b/go.mod index 6e64aa2..59f6925 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 - github.com/streamingfast/pbgo v0.0.6-0.20240821201153-468db4096ff0 + github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 github.com/streamingfast/substreams v1.9.4-0.20240812210000-635f7bcba6cf github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index df9d1e2..36dae1b 100644 --- a/go.sum +++ b/go.sum @@ -572,8 +572,8 @@ github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef h1:9IVFHR github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef/go.mod h1:cq8CvbZ3ioFmGrHokSAJalS0lC+pVXLKhITScItUGXY= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 h1:bliib3pAObbM+6cKYQFa8axbCY/x6RczQZrOxdM7OZA= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2/go.mod h1:DsnLrpKZ3DIDL6FmYVuxbC44fXvQdY7aCdSLMpbqZ8Q= -github.com/streamingfast/pbgo v0.0.6-0.20240821201153-468db4096ff0 h1:kfzU2GvvWt+RQtrHvfqv7zdRA4xv/3AusccIGG3+roM= -github.com/streamingfast/pbgo v0.0.6-0.20240821201153-468db4096ff0/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= +github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb h1:Xqt4ned9ELmQMKcg7cFbm56MKG2gBjnE1M+2HObOs6w= +github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d h1:33VIARqUqBUKXJcuQoOS1rVSms54tgxhhNCmrLptpLg= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d/go.mod h1:aBJivEdekmFWYSQ29EE/fN9IanJWJXbtjy3ky0XD/jE= github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90 h1:94HllkX4ttYVilo8ZJv05b5z8JiMmqBvv4+Jdgk/+2A= diff --git a/proto/generator/generator.go b/proto/generator/generator.go index c96d51f..375d1d8 100644 --- a/proto/generator/generator.go +++ b/proto/generator/generator.go @@ -20,25 +20,13 @@ import ( connect "connectrpc.com/connect" "github.com/iancoleman/strcase" "github.com/streamingfast/cli" + wellknown "github.com/streamingfast/firehose-core/well-known" "google.golang.org/protobuf/proto" ) //go:embed *.gotmpl var templates embed.FS -var wellKnownProtoRepos = []string{ - "buf.build/streamingfast/firehose-ethereum", - "buf.build/streamingfast/firehose-near", - "buf.build/streamingfast/firehose-solana", - "buf.build/streamingfast/firehose-bitcoin", - "buf.build/pinax/firehose-antelope", - "buf.build/pinax/firehose-arweave", - "buf.build/pinax/firehose-beacon", - "buf.build/streamingfast/firehose-starknet", - "buf.build/streamingfast/firehose-cosmos", - "buf.build/streamingfast/firehose-gear", -} - func main() { cli.Ensure(len(os.Args) == 3, "go run ./generator ") @@ -58,7 +46,8 @@ func main() { var protofiles []ProtoFile - for _, wellKnownProtoRepo := range wellKnownProtoRepos { + for _, protocol := range wellknown.WellKnownProtocols { + wellKnownProtoRepo := protocol.BufBuildURL request := connect.NewRequest(&reflectv1beta1.GetFileDescriptorSetRequest{ Module: wellKnownProtoRepo, }) diff --git a/well-known/chains.go b/well-known/chains.go new file mode 100644 index 0000000..b266032 --- /dev/null +++ b/well-known/chains.go @@ -0,0 +1,297 @@ +package wellknown + +import ( + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" +) + +type WellKnownProtocol struct { + Name string + BlockType string + BufBuildURL string + BytesEncoding pbfirehose.InfoResponse_BlockIdEncoding + KnownChains []*Chain +} + +type Chain struct { + Name string + Aliases []string + // Genesis block here is actually the "lowest possible" first streamable block through firehose blocks. + // In most cases, it matches the "genesis block" of the chain. + GenesisBlockID string + GenesisBlockNumber uint64 +} + +type WellKnownProtocolList []WellKnownProtocol + +var WellKnownProtocols = WellKnownProtocolList([]WellKnownProtocol{ + { + Name: "ethereum", + BlockType: "type.googleapis.com/sf.ethereum.type.v2.Block", + BufBuildURL: "buf.build/streamingfast/firehose-ethereum", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, + KnownChains: []*Chain{ + { + Name: "mainnet", + Aliases: []string{"ethereum"}, + GenesisBlockID: "d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", + GenesisBlockNumber: 0, + }, + { + Name: "sepolia", + Aliases: []string{}, + GenesisBlockID: "25a5cc106eea7138acab33231d7160d69cb777ee0c2c553fcddf5138993e6dd9", + GenesisBlockNumber: 0, + }, + { + Name: "holesky", + Aliases: []string{}, + GenesisBlockID: "b5f7f912443c940f21fd611f12828d75b534364ed9e95ca4e307729a4661bde4", + GenesisBlockNumber: 0, + }, + { + Name: "matic", + Aliases: []string{"polygon"}, + GenesisBlockID: "a9c28ce2141b56c474f1dc504bee9b01eb1bd7d1a507580d5519d4437a97de1b", + GenesisBlockNumber: 0, + }, + { + Name: "bsc", + Aliases: []string{"bnb", "bsc-mainnet"}, + GenesisBlockID: "0d21840abff46b96c84b2ac9e10e4f5cdaeb5693cb665db62a2f3b02d2d57b5b", + GenesisBlockNumber: 0, + }, + { + Name: "optimism", + Aliases: []string{}, + GenesisBlockID: "7ca38a1916c42007829c55e69d3e9a73265554b586a499015373241b8a3fa48b", + GenesisBlockNumber: 0, + }, + { + Name: "optimism-sepolia", + Aliases: []string{}, + GenesisBlockID: "102de6ffb001480cc9b8b548fd05c34cd4f46ae4aa91759393db90ea0409887d", + GenesisBlockNumber: 0, + }, + { + Name: "chapel", + Aliases: []string{"bsc-chapel", "bsc-testnet"}, + GenesisBlockID: "6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", + GenesisBlockNumber: 0, + }, + { + Name: "arbitrum-one", + Aliases: []string{"arb-one", "arbitrum"}, + GenesisBlockID: "7ee576b35482195fc49205cec9af72ce14f003b9ae69f6ba0faef4514be8b442", + GenesisBlockNumber: 0, + }, + // We do not auto-discover avalanche because the genesis block ID is the same as their testnet fuji and we can't differentiate them + //{ + // Name: "avalanche", + // Aliases: []string{"avax"}, + // GenesisBlockID: "31ced5b9beb7f8782b014660da0cb18cc409f121f408186886e1ca3e8eeca96b", + // GenesisBlockNumber: 0, + //}, + }, + }, + { + Name: "near", + BlockType: "type.googleapis.com/sf.near.type.v1.Block", + BufBuildURL: "buf.build/streamingfast/firehose-near", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_BASE58, + KnownChains: []*Chain{ + { + Name: "near-mainnet", + Aliases: []string{"near"}, + GenesisBlockID: "CFAAJTVsw5y4GmMKNmuTNybxFJtapKcrarsTh5TPUyQf", + GenesisBlockNumber: 9820214, + }, + { + Name: "near-testnet", + Aliases: []string{}, + GenesisBlockID: "fQURSjwQKZn8F98ayQjpndh85msJBu12FBkUY1gc5WA", + GenesisBlockNumber: 42376923, + }, + }, + }, + { + Name: "solana", + BlockType: "type.googleapis.com/sf.solana.type.v1.Block", + BufBuildURL: "buf.build/streamingfast/firehose-solana", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_BASE58, + KnownChains: []*Chain{ + { + Name: "solana-mainnet-beta", + Aliases: []string{"solana", "solana-mainnet"}, + GenesisBlockID: "4sGjMW1sUnHzSxGspuhpqLDx6wiyjNtZAMdL4VZHirAn", + GenesisBlockNumber: 0, + }, + }, + }, + { + Name: "bitcoin", + BlockType: "type.googleapis.com/sf.bitcoin.type.v1.Block", + BufBuildURL: "buf.build/streamingfast/firehose-bitcoin", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, + KnownChains: []*Chain{ + { + Name: "btc", + Aliases: []string{"bitcoin"}, + GenesisBlockID: "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f", + GenesisBlockNumber: 0, + }, + }, + }, + { + Name: "antelope", + BlockType: "type.googleapis.com/sf.antelope.type.v1.Block", + BufBuildURL: "buf.build/pinax/firehose-antelope", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, + KnownChains: []*Chain{ + { + Name: "eos", + Aliases: []string{"eos-mainnet"}, + GenesisBlockID: "0000000267f3e2284b482f3afc2e724be1d6cbc1804532ec62d4e7af47c30693", + GenesisBlockNumber: 2, // even though the genesis block is 1, it is never available through firehose/substreams + }, + { + Name: "kylin", + Aliases: []string{}, + GenesisBlockID: "00000002a1ec7ae214b9e43a904b6c010fb1260c9e8a12e5967bdbe451152a07", + GenesisBlockNumber: 2, // even though the genesis block is 1, it is never available through firehose/substreams + }, + { + Name: "jungle4", + Aliases: []string{}, + GenesisBlockID: "00000002d61d836f51657f886a5bc55b18a731f7eace6565784328fbd051fc90", + GenesisBlockNumber: 2, // even though the genesis block is 1, it is never available through firehose/substreams + }, + }, + }, + { + Name: "arweave", + BlockType: "type.googleapis.com/sf.arweave.type.v1.Block", + BufBuildURL: "buf.build/pinax/firehose-arweave", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, // even though the usual encoding is base64url, firehose blocks are written with the hex-encoded version + KnownChains: []*Chain{ + { + Name: "arweave", + Aliases: []string{}, + GenesisBlockID: "ef0214ecaa252020230a5325719dfc2d9cec86123bc46926dad0c2251ed6be17b7112528dbe678fb2d31d6e6a0951244", + GenesisBlockNumber: 0, + }, + }, + }, + { + Name: "beacon", + BlockType: "type.googleapis.com/sf.beacon.type.v1.Block", + BufBuildURL: "buf.build/pinax/firehose-beacon", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_0X_HEX, + KnownChains: []*Chain{ + { + Name: "mainnet-cl", + Aliases: []string{}, + GenesisBlockID: "0x4d611d5b93fdab69013a7f0a2f961caca0c853f87cfe9595fe50038163079360", + GenesisBlockNumber: 0, + }, + { + Name: "sepolia-cl", + Aliases: []string{}, + GenesisBlockID: "0xfb9b64fe445f76696407e1e3cc390371edff147bf712db86db6197d4b31ede43", + GenesisBlockNumber: 0, + }, + { + Name: "holesky-cl", + Aliases: []string{}, + GenesisBlockID: "0xab09edd9380f8451c3ff5c809821174a36dce606fea8b5ea35ea936915dbf889", + GenesisBlockNumber: 0, + }, + }, + }, + { + Name: "starknet", + BlockType: "type.googleapis.com/sf.starknet.type.v1.Block", + BufBuildURL: "buf.build/streamingfast/firehose-starknet", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_0X_HEX, + KnownChains: []*Chain{ + { + Name: "starknet-mainnet", + Aliases: []string{}, + GenesisBlockID: "0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943", + GenesisBlockNumber: 0, + }, + { + Name: "starknet-testnet", + Aliases: []string{}, + GenesisBlockID: "0x5c627d4aeb51280058bed93c7889bce78114d63baad1be0f0aeb32496d5f19c", + GenesisBlockNumber: 0, + }, + }, + }, + { + Name: "cosmos", + BlockType: "type.googleapis.com/sf.cosmos.type.v2.Block", + BufBuildURL: "buf.build/streamingfast/firehose-cosmos", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, + KnownChains: []*Chain{ + { + Name: "injective-mainnet", + Aliases: []string{}, + GenesisBlockID: "24c9714291a999b952859ee02ec9b233394fe743b07ea3578d432a4a2707b6af", + GenesisBlockNumber: 1, + }, + { + Name: "injective-testnet", + Aliases: []string{}, + GenesisBlockID: "a9effb99c7bc3ba8c18a487ffffd800c137bc2b2f47f73c350f3ca27077044a1", + GenesisBlockNumber: 37368800, // Not the real genesis block, but the other blocks are lost on the testnet + }, + }, + }, + { + Name: "gear", + BlockType: "type.googleapis.com/sf.gear.type.v1.Block", + BufBuildURL: "buf.build/streamingfast/firehose-gear", + BytesEncoding: pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX, + KnownChains: []*Chain{ + { + Name: "vara-mainnet", + Aliases: []string{}, + GenesisBlockID: "fe1b4c55fd4d668101126434206571a7838a8b6b93a6d1b95d607e78e6c53763", + GenesisBlockNumber: 0, + }, + { + Name: "vara-testnet", + Aliases: []string{}, + GenesisBlockID: "525639f713f397dcf839bd022cd821f367ebcf179de7b9253531f8adbe5436d6", + GenesisBlockNumber: 0, + }, + }, + }, +}) + +func (p WellKnownProtocolList) ChainByGenesisBlock(blockNum uint64, blockID string) *Chain { + for _, protocol := range p { + for _, chain := range protocol.KnownChains { + if chain.GenesisBlockNumber == blockNum && chain.GenesisBlockID == blockID { + return chain + } + } + } + return nil +} + +func (p WellKnownProtocolList) ChainByName(name string) *Chain { + for _, protocol := range p { + for _, chain := range protocol.KnownChains { + if chain.Name == name { + return chain + } + for _, alias := range chain.Aliases { + if alias == name { + return chain + } + } + } + } + return nil +} From 7a5541bcac0106fc1168b9a4f3df3f8c2b8a4313 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Fri, 23 Aug 2024 13:53:41 -0400 Subject: [PATCH 3/7] add info endpoint to substreams too --- CHANGELOG.md | 2 +- cmd/apps/substreams_tier1.go | 1 + firehose/info/endpoint_info.go | 27 ++++++++++++++++++++++----- go.mod | 2 +- go.sum | 8 ++++---- 5 files changed, 29 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc13de8..dc28793 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s ## Unreleased -* Add `sf.firehose.v2.EndpointInfo/Info` service on Firehose and Substreams endpoints. This involves the following new flags: +* Add `sf.firehose.v2.EndpointInfo/Info` service on Firehose and `sf.substreams.rpc.v2.EndpointInfo/Info` to Substreams endpoints. This involves the following new flags: - `advertise-chain-name` Canonical name of the chain according to https://thegraph.com/docs/en/developing/supported-networks/ (required, unless it is in the "well-known" list) - `advertise-chain-aliases` Alternate names for that chain (optional) - `advertise-block-features` List of features describing the blocks (optional) diff --git a/cmd/apps/substreams_tier1.go b/cmd/apps/substreams_tier1.go index 0044fb3..7c80028 100644 --- a/cmd/apps/substreams_tier1.go +++ b/cmd/apps/substreams_tier1.go @@ -150,6 +150,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root HeadTimeDriftMetric: ss1HeadTimeDriftmetric, HeadBlockNumberMetric: ss1HeadBlockNumMetric, CheckPendingShutDown: runtime.IsPendingShutdown, + InfoServer: runtime.InfoServer, }), nil }, }) diff --git a/firehose/info/endpoint_info.go b/firehose/info/endpoint_info.go index 7b09feb..6c884ac 100644 --- a/firehose/info/endpoint_info.go +++ b/firehose/info/endpoint_info.go @@ -15,10 +15,13 @@ import ( ) type InfoServer struct { + sync.Mutex + responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error response *pbfirehose.InfoResponse ready chan struct{} - once sync.Once + initDone bool + initError error logger *zap.Logger } @@ -71,9 +74,24 @@ func validateInfoResponse(resp *pbfirehose.InfoResponse) error { } // multiple apps (firehose, substreams...) can initialize the same server, we only need one -func (s *InfoServer) Init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) (err error) { - s.once.Do(func() { err = s.init(ctx, fhub, mergedBlocksStore, oneBlockStore, logger) }) - return +func (s *InfoServer) Init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) error { + s.Lock() + defer func() { + s.initDone = true + s.Unlock() + }() + + if s.initDone { + return s.initError + } + + if err := s.init(ctx, fhub, mergedBlocksStore, oneBlockStore, logger); err != nil { + s.initError = err + return err + } + + close(s.ready) + return nil } func (s *InfoServer) getBlockFromMergedBlocksStore(ctx context.Context, blockNum uint64, mergedBlocksStore dstore.Store) *pbbstream.Block { @@ -178,6 +196,5 @@ func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc return err } - close(s.ready) return nil } diff --git a/go.mod b/go.mod index 59f6925..c2bad1e 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 - github.com/streamingfast/substreams v1.9.4-0.20240812210000-635f7bcba6cf + github.com/streamingfast/substreams v1.9.4-0.20240823175139-fee92bc72fad github.com/stretchr/testify v1.8.4 github.com/test-go/testify v1.1.4 go.uber.org/multierr v1.10.0 diff --git a/go.sum b/go.sum index 36dae1b..9c39e77 100644 --- a/go.sum +++ b/go.sum @@ -441,8 +441,8 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI= github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= -github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM= -github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0= +github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= @@ -582,8 +582,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4= -github.com/streamingfast/substreams v1.9.4-0.20240812210000-635f7bcba6cf h1:/5LEFtd/ws7Gl4Di3mMaZYbgasRC1ooK3einImpmVsg= -github.com/streamingfast/substreams v1.9.4-0.20240812210000-635f7bcba6cf/go.mod h1:Q/h8Mxe+MKVZqU9wIpMxLKZHb0hLIACZvDiBnR+IVyI= +github.com/streamingfast/substreams v1.9.4-0.20240823175139-fee92bc72fad h1:Js7hQE7ZwhLaBgy8Hd9/mZDLiqdgY/QbMwlAXBfSf9w= +github.com/streamingfast/substreams v1.9.4-0.20240823175139-fee92bc72fad/go.mod h1:GchLx+0trEb9E9QvPBbWwY2rVsJkpcw66ibpLn0OtVE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= From f7083140f3ca72c089055f96393be3d95b115221 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Fri, 23 Aug 2024 14:14:52 -0400 Subject: [PATCH 4/7] add extra validation so that ethereum chains aren't served from firecore without setting the 'base/extended/hybrid' feature --- firehose/info/info_filler.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/firehose/info/info_filler.go b/firehose/info/info_filler.go index 73932f0..db95207 100644 --- a/firehose/info/info_filler.go +++ b/firehose/info/info_filler.go @@ -34,6 +34,21 @@ var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp break } } + + // Extra validation for ethereum blocks + if firstStreamableBlock.Payload.TypeUrl == "type.googleapis.com/sf.ethereum.type.v2.Block" { + var seenDetailLevel bool + for _, feature := range resp.BlockFeatures { + if feature == "base" || feature == "extended" || feature == "hybrid" { + seenDetailLevel = true + break + } + } + if !seenDetailLevel { + return fmt.Errorf("ethereum blocks are used without setting detail level in 'advertise-block-features': expected one of 'base', 'extended' or 'hybrid' (or use 'firehose-ethereum' binary instead to serve this chain and get automatic detection/validation)") + } + } + return nil } From 7cebed07a97ec328b11b46e6a025303b79ea9705 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Fri, 23 Aug 2024 15:01:51 -0400 Subject: [PATCH 5/7] fix yaml config file parsing, a few fixes and comments --- CHANGELOG.md | 6 +++++- cmd/setup.go | 4 ++++ firehose/info/endpoint_info.go | 2 ++ launcher/config.go | 4 ++-- well-known/chains.go | 7 ++++++- 5 files changed, 19 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc28793..8bb0c56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,8 +16,12 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s - `advertise-block-features` List of features describing the blocks (optional) - `advertise-block-id-encoding` Encoding format of the block ID [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_BASE64URL, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] (required, unless the block type is in the "well-known" list) -* Add a well-known list of chains (hard-coded in `wellknown/chains.go` to help automatically determine the 'advertise' flag values) +* Add a well-known list of chains (hard-coded in `wellknown/chains.go` to help automatically determine the 'advertise' flag values). Users are encouraged to propose Pull Requests to add more chains to the list. * The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service. + It validates the following on a well-known chain: + - if the first-streamable-block Num/ID match the genesis block of a known chain, e.g. `matic`, it will refuse another value for `advertise-chain-name` than `matic` or one of its aliases (`polygon`) + - If the first-streamable-block does not match any known chain, it will require the `advertise-chain-name` to be non-empty + - If the first-streamable-block type is unknown (i.e. not ethereum, solana, near, cosmos, bitcoin...), it will require the user to provide `advertise-chain-name` as well as `advertise-block-id-encoding` * Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module. * Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling diff --git a/cmd/setup.go b/cmd/setup.go index a609703..bfbbf39 100644 --- a/cmd/setup.go +++ b/cmd/setup.go @@ -51,6 +51,10 @@ func setupCmd(cmd *cobra.Command, binaryName string) error { return fmt.Errorf("invalid flag %s in config file under command %s", k, subCommand) } + // Keep compatibility with config files, allow empty value to unset the flag + if v == nil { + v = "" + } viper.SetDefault(flag.viperKey, v) // For root command, we want to keep compatibility for `viper.GetXXX("global-")` to work with config loaded value diff --git a/firehose/info/endpoint_info.go b/firehose/info/endpoint_info.go index 6c884ac..e0d0a19 100644 --- a/firehose/info/endpoint_info.go +++ b/firehose/info/endpoint_info.go @@ -102,6 +102,7 @@ func (s *InfoServer) getBlockFromMergedBlocksStore(ctx context.Context, blockNum block, err := bstream.FetchBlockFromMergedBlocksStore(ctx, blockNum, mergedBlocksStore) if err != nil { + time.Sleep(time.Millisecond * 500) continue } return block @@ -132,6 +133,7 @@ func (s *InfoServer) getBlockFromOneBlockStore(ctx context.Context, blockNum uin block, err := bstream.FetchBlockFromOneBlockStore(ctx, blockNum, "", oneBlockStore) if err != nil { + time.Sleep(time.Millisecond * 500) continue } return block diff --git a/launcher/config.go b/launcher/config.go index 6f7871e..b74406c 100644 --- a/launcher/config.go +++ b/launcher/config.go @@ -10,8 +10,8 @@ import ( var Config map[string]*CommandConfig type CommandConfig struct { - Args []string `json:"args"` - Flags map[string]string `json:"flags"` + Args []string `json:"args"` + Flags map[string]any `json:"flags"` } // Load reads a YAML config, and sets the global DfuseConfig variable diff --git a/well-known/chains.go b/well-known/chains.go index b266032..1b85ec3 100644 --- a/well-known/chains.go +++ b/well-known/chains.go @@ -13,10 +13,15 @@ type WellKnownProtocol struct { } type Chain struct { - Name string + // Canonical name, from https://thegraph.com/docs/en/developing/supported-networks/ + Name string + // Aliases are other names that can be used to refer to the chain, for example 'polygon' is a popular name for the chain 'matic' Aliases []string // Genesis block here is actually the "lowest possible" first streamable block through firehose blocks. // In most cases, it matches the "genesis block" of the chain. + // It must match the value of the `sf.bstream.v1.Block.id` field (https://github.com/streamingfast/bstream/blob/develop/proto/sf/bstream/v1/bstream.proto#L71) + // and it follows the encoding specified in the `BytesEncoding` field of the WellKnownProtocol + // You can generally get the genesis block ID by running `firecore tools print merged-blocks ` on the merged-blocks GenesisBlockID string GenesisBlockNumber uint64 } From f8c80efed06579b7307c4613b1ab815b54726e8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Mon, 26 Aug 2024 14:54:43 -0400 Subject: [PATCH 6/7] add ignore-advertise-validation flag --- CHANGELOG.md | 1 + chain.go | 2 +- cmd/apps/start.go | 1 + cmd/main.go | 1 + firehose/info/endpoint_info.go | 49 +++++++++++++++++++++++++++------- firehose/info/info_filler.go | 29 +++++++++++++------- 6 files changed, 64 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bb0c56..cecbd41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s - `advertise-chain-aliases` Alternate names for that chain (optional) - `advertise-block-features` List of features describing the blocks (optional) - `advertise-block-id-encoding` Encoding format of the block ID [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_BASE64URL, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] (required, unless the block type is in the "well-known" list) + - `ignore-advertise-validation` Runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail. * Add a well-known list of chains (hard-coded in `wellknown/chains.go` to help automatically determine the 'advertise' flag values). Users are encouraged to propose Pull Requests to add more chains to the list. * The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service. diff --git a/chain.go b/chain.go index 09bac18..ae83714 100644 --- a/chain.go +++ b/chain.go @@ -158,7 +158,7 @@ type Chain[B Block] struct { // InfoResponseFiller is a function that fills the `pbfirehose.InfoResponse` from the first streamable block of the chain. // It can validate that we are on the right chain by checking against a known hash, or populate missing fields. - InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse) error + InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error } type ToolsConfig[B Block] struct { diff --git a/cmd/apps/start.go b/cmd/apps/start.go index 9af3fd9..72673ef 100644 --- a/cmd/apps/start.go +++ b/cmd/apps/start.go @@ -95,6 +95,7 @@ func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string, blockIDEncoding, sflags.MustGetStringSlice(cmd, "advertise-block-features"), bstream.GetProtocolFirstStreamableBlock, + !sflags.MustGetBool(cmd, "ignore-advertise-validation"), chain.InfoResponseFiller, rootLog, ) diff --git a/cmd/main.go b/cmd/main.go index cf1c713..e3baa45 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -175,6 +175,7 @@ func registerCommonFlags[B firecore.Block](chain *firecore.Chain[B]) { cmd.Flags().String("advertise-chain-name", "", "[firehose,substreams-tier1] Chain name to advertise in the Info Endpoint. Required but it may be inferred from the genesis blocks.") cmd.Flags().StringSlice("advertise-chain-aliases", nil, "[firehose,substreams-tier1] List of chain name aliases to advertise in the Info Endpoint. If unset, it may be inferred from the genesis blocks.") cmd.Flags().StringSlice("advertise-block-features", nil, "[firehose,substreams-tier1] List of block features to advertise in the Info Endpoint. If unset, it may be inferred from the genesis block.") + cmd.Flags().Bool("ignore-advertise-validation", false, "[firehose,substreams-tier1] When true, runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail.") acceptedEncodings := make([]string, len(pbfirehose.InfoResponse_BlockIdEncoding_value)-1) i := 0 diff --git a/firehose/info/endpoint_info.go b/firehose/info/endpoint_info.go index e0d0a19..58ead2f 100644 --- a/firehose/info/endpoint_info.go +++ b/firehose/info/endpoint_info.go @@ -12,12 +12,14 @@ import ( "github.com/streamingfast/dstore" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) type InfoServer struct { sync.Mutex - responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error + validate bool + responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error response *pbfirehose.InfoResponse ready chan struct{} initDone bool @@ -40,7 +42,8 @@ func NewInfoServer( blockIDEncoding pbfirehose.InfoResponse_BlockIdEncoding, blockFeatures []string, firstStreamableBlock uint64, - responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error, + validate bool, + responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error, logger *zap.Logger, ) *InfoServer { @@ -55,6 +58,7 @@ func NewInfoServer( return &InfoServer{ responseFiller: responseFiller, response: resp, + validate: validate, ready: make(chan struct{}), logger: logger, } @@ -90,7 +94,6 @@ func (s *InfoServer) Init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc return err } - close(s.ready) return nil } @@ -142,9 +145,12 @@ func (s *InfoServer) getBlockFromOneBlockStore(ctx context.Context, blockNum uin // init tries to fetch the first streamable block from the different sources and fills the response with it // returns an error if it is incomplete +// it can be called only once func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) error { ctx, cancel := context.WithCancel(ctx) - defer cancel() + if s.validate { + defer cancel() + } ch := make(chan *pbbstream.Block) @@ -177,19 +183,43 @@ func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc case <-ctx.Done(): return case <-time.After(5 * time.Second): - logger.Warn("waiting to read the first_streamable_block before starting firehose/substreams endpoints", + loglevel := zapcore.WarnLevel + if !s.validate { + loglevel = zapcore.DebugLevel + } + logger.Log(loglevel, "waiting to read the first_streamable_block before starting firehose/substreams endpoints", zap.Uint64("first_streamable_block", s.response.FirstStreamableBlockNum), - zap.Stringer("merged_blocks_store", mergedBlocksStore.BaseURL()), // , zap.String("one_block_store", oneBlockStore.String()) - zap.Stringer("one_block_store", oneBlockStore.BaseURL()), // , zap.String("one_block_store", oneBlockStore.String()) + zap.Stringer("merged_blocks_store", mergedBlocksStore.BaseURL()), + zap.Stringer("one_block_store", oneBlockStore.BaseURL()), ) } } }() + if !s.validate { + // in this case we don't wait for an answer, but we still try to fill the response + go func() { + select { + case blk := <-ch: + if err := s.responseFiller(blk, s.response, s.validate); err != nil { + logger.Warn("unable to fill and validate info response", zap.Error(err)) + } + case <-ctx.Done(): + } + if err := validateInfoResponse(s.response); err != nil { + logger.Warn("info response", zap.Error(err)) + } + close(s.ready) + }() + + cancel() + return nil + } + select { case blk := <-ch: - if err := s.responseFiller(blk, s.response); err != nil { - return err + if err := s.responseFiller(blk, s.response, s.validate); err != nil { + return fmt.Errorf("%w -- use --ignore-advertise-validation to skip these checks", err) } case <-ctx.Done(): } @@ -198,5 +228,6 @@ func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc return err } + close(s.ready) return nil } diff --git a/firehose/info/info_filler.go b/firehose/info/info_filler.go index db95207..437be60 100644 --- a/firehose/info/info_filler.go +++ b/firehose/info/info_filler.go @@ -8,15 +8,33 @@ import ( pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" ) -var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse) error { +var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error { resp.FirstStreamableBlockId = firstStreamableBlock.Id + for _, protocol := range wellknown.WellKnownProtocols { + if protocol.BlockType == firstStreamableBlock.Payload.TypeUrl { + resp.BlockIdEncoding = protocol.BytesEncoding + break + } + } + + if !validate { + if resp.ChainName == "" { + // still try to fill the chain name if it is not given + if chain := wellknown.WellKnownProtocols.ChainByGenesisBlock(firstStreamableBlock.Number, firstStreamableBlock.Id); chain != nil { + resp.ChainName = chain.Name + resp.ChainNameAliases = chain.Aliases + } + } + return nil + } + if resp.ChainName != "" { if chain := wellknown.WellKnownProtocols.ChainByName(resp.ChainName); chain != nil { if firstStreamableBlock.Number == chain.GenesisBlockNumber && chain.GenesisBlockID != firstStreamableBlock.Id { // we don't check if the firstStreamableBlock is something other than our well-known genesis block return fmt.Errorf("chain name defined in flag: %q inconsistent with the genesis block ID %q (expected: %q)", resp.ChainName, ox(firstStreamableBlock.Id), ox(chain.GenesisBlockID)) } - resp.ChainName = chain.Name + resp.ChainName = chain.Name // ensure we use the canonical name if the user provided one of the aliases resp.ChainNameAliases = chain.Aliases } else if chain := wellknown.WellKnownProtocols.ChainByGenesisBlock(firstStreamableBlock.Number, firstStreamableBlock.Id); chain != nil { return fmt.Errorf("chain name defined in flag: %q inconsistent with the one discovered from genesis block %q", resp.ChainName, chain.Name) @@ -28,13 +46,6 @@ var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp } } - for _, protocol := range wellknown.WellKnownProtocols { - if protocol.BlockType == firstStreamableBlock.Payload.TypeUrl { - resp.BlockIdEncoding = protocol.BytesEncoding - break - } - } - // Extra validation for ethereum blocks if firstStreamableBlock.Payload.TypeUrl == "type.googleapis.com/sf.ethereum.type.v2.Block" { var seenDetailLevel bool From 25184196bf07c57f1b4dab247a2b7e49045bb065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Mon, 26 Aug 2024 15:18:07 -0400 Subject: [PATCH 7/7] fix handling of ctx cancel in advertise server --- firehose/app/firehose/app.go | 12 +++++------- firehose/info/endpoint_info.go | 8 ++++---- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/firehose/app/firehose/app.go b/firehose/app/firehose/app.go index f9b57c4..7474a78 100644 --- a/firehose/app/firehose/app.go +++ b/firehose/app/firehose/app.go @@ -189,13 +189,11 @@ func (a *App) Run() error { } } - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - defer cancel() - if err := a.modules.InfoServer.Init(ctx, forkableHub, mergedBlocksStore, oneBlocksStore, a.logger); err != nil { - a.Shutdown(fmt.Errorf("cannot initialize info server: %w", err)) - } - }() + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + if err := a.modules.InfoServer.Init(ctx, forkableHub, mergedBlocksStore, oneBlocksStore, a.logger); err != nil { + a.Shutdown(fmt.Errorf("cannot initialize info server: %w", err)) + } a.logger.Info("launching gRPC firehoseServer", zap.Bool("live_support", withLive)) a.isReady.CAS(false, true) diff --git a/firehose/info/endpoint_info.go b/firehose/info/endpoint_info.go index 58ead2f..f98b329 100644 --- a/firehose/info/endpoint_info.go +++ b/firehose/info/endpoint_info.go @@ -148,9 +148,7 @@ func (s *InfoServer) getBlockFromOneBlockStore(ctx context.Context, blockNum uin // it can be called only once func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) error { ctx, cancel := context.WithCancel(ctx) - if s.validate { - defer cancel() - } + // cancel is later and depends on s.validate ch := make(chan *pbbstream.Block) @@ -199,6 +197,7 @@ func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc if !s.validate { // in this case we don't wait for an answer, but we still try to fill the response go func() { + defer cancel() select { case blk := <-ch: if err := s.responseFiller(blk, s.response, s.validate); err != nil { @@ -210,11 +209,12 @@ func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc logger.Warn("info response", zap.Error(err)) } close(s.ready) + cancel() }() - cancel() return nil } + defer cancel() select { case blk := <-ch: