From f8c80efed06579b7307c4613b1ab815b54726e8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Mon, 26 Aug 2024 14:54:43 -0400 Subject: [PATCH] add ignore-advertise-validation flag --- CHANGELOG.md | 1 + chain.go | 2 +- cmd/apps/start.go | 1 + cmd/main.go | 1 + firehose/info/endpoint_info.go | 49 +++++++++++++++++++++++++++------- firehose/info/info_filler.go | 29 +++++++++++++------- 6 files changed, 64 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bb0c56..cecbd41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s - `advertise-chain-aliases` Alternate names for that chain (optional) - `advertise-block-features` List of features describing the blocks (optional) - `advertise-block-id-encoding` Encoding format of the block ID [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_BASE64URL, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] (required, unless the block type is in the "well-known" list) + - `ignore-advertise-validation` Runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail. * Add a well-known list of chains (hard-coded in `wellknown/chains.go` to help automatically determine the 'advertise' flag values). Users are encouraged to propose Pull Requests to add more chains to the list. * The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service. diff --git a/chain.go b/chain.go index 09bac18..ae83714 100644 --- a/chain.go +++ b/chain.go @@ -158,7 +158,7 @@ type Chain[B Block] struct { // InfoResponseFiller is a function that fills the `pbfirehose.InfoResponse` from the first streamable block of the chain. // It can validate that we are on the right chain by checking against a known hash, or populate missing fields. - InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse) error + InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error } type ToolsConfig[B Block] struct { diff --git a/cmd/apps/start.go b/cmd/apps/start.go index 9af3fd9..72673ef 100644 --- a/cmd/apps/start.go +++ b/cmd/apps/start.go @@ -95,6 +95,7 @@ func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string, blockIDEncoding, sflags.MustGetStringSlice(cmd, "advertise-block-features"), bstream.GetProtocolFirstStreamableBlock, + !sflags.MustGetBool(cmd, "ignore-advertise-validation"), chain.InfoResponseFiller, rootLog, ) diff --git a/cmd/main.go b/cmd/main.go index cf1c713..e3baa45 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -175,6 +175,7 @@ func registerCommonFlags[B firecore.Block](chain *firecore.Chain[B]) { cmd.Flags().String("advertise-chain-name", "", "[firehose,substreams-tier1] Chain name to advertise in the Info Endpoint. Required but it may be inferred from the genesis blocks.") cmd.Flags().StringSlice("advertise-chain-aliases", nil, "[firehose,substreams-tier1] List of chain name aliases to advertise in the Info Endpoint. If unset, it may be inferred from the genesis blocks.") cmd.Flags().StringSlice("advertise-block-features", nil, "[firehose,substreams-tier1] List of block features to advertise in the Info Endpoint. If unset, it may be inferred from the genesis block.") + cmd.Flags().Bool("ignore-advertise-validation", false, "[firehose,substreams-tier1] When true, runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail.") acceptedEncodings := make([]string, len(pbfirehose.InfoResponse_BlockIdEncoding_value)-1) i := 0 diff --git a/firehose/info/endpoint_info.go b/firehose/info/endpoint_info.go index e0d0a19..58ead2f 100644 --- a/firehose/info/endpoint_info.go +++ b/firehose/info/endpoint_info.go @@ -12,12 +12,14 @@ import ( "github.com/streamingfast/dstore" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) type InfoServer struct { sync.Mutex - responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error + validate bool + responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error response *pbfirehose.InfoResponse ready chan struct{} initDone bool @@ -40,7 +42,8 @@ func NewInfoServer( blockIDEncoding pbfirehose.InfoResponse_BlockIdEncoding, blockFeatures []string, firstStreamableBlock uint64, - responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error, + validate bool, + responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error, logger *zap.Logger, ) *InfoServer { @@ -55,6 +58,7 @@ func NewInfoServer( return &InfoServer{ responseFiller: responseFiller, response: resp, + validate: validate, ready: make(chan struct{}), logger: logger, } @@ -90,7 +94,6 @@ func (s *InfoServer) Init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc return err } - close(s.ready) return nil } @@ -142,9 +145,12 @@ func (s *InfoServer) getBlockFromOneBlockStore(ctx context.Context, blockNum uin // init tries to fetch the first streamable block from the different sources and fills the response with it // returns an error if it is incomplete +// it can be called only once func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) error { ctx, cancel := context.WithCancel(ctx) - defer cancel() + if s.validate { + defer cancel() + } ch := make(chan *pbbstream.Block) @@ -177,19 +183,43 @@ func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc case <-ctx.Done(): return case <-time.After(5 * time.Second): - logger.Warn("waiting to read the first_streamable_block before starting firehose/substreams endpoints", + loglevel := zapcore.WarnLevel + if !s.validate { + loglevel = zapcore.DebugLevel + } + logger.Log(loglevel, "waiting to read the first_streamable_block before starting firehose/substreams endpoints", zap.Uint64("first_streamable_block", s.response.FirstStreamableBlockNum), - zap.Stringer("merged_blocks_store", mergedBlocksStore.BaseURL()), // , zap.String("one_block_store", oneBlockStore.String()) - zap.Stringer("one_block_store", oneBlockStore.BaseURL()), // , zap.String("one_block_store", oneBlockStore.String()) + zap.Stringer("merged_blocks_store", mergedBlocksStore.BaseURL()), + zap.Stringer("one_block_store", oneBlockStore.BaseURL()), ) } } }() + if !s.validate { + // in this case we don't wait for an answer, but we still try to fill the response + go func() { + select { + case blk := <-ch: + if err := s.responseFiller(blk, s.response, s.validate); err != nil { + logger.Warn("unable to fill and validate info response", zap.Error(err)) + } + case <-ctx.Done(): + } + if err := validateInfoResponse(s.response); err != nil { + logger.Warn("info response", zap.Error(err)) + } + close(s.ready) + }() + + cancel() + return nil + } + select { case blk := <-ch: - if err := s.responseFiller(blk, s.response); err != nil { - return err + if err := s.responseFiller(blk, s.response, s.validate); err != nil { + return fmt.Errorf("%w -- use --ignore-advertise-validation to skip these checks", err) } case <-ctx.Done(): } @@ -198,5 +228,6 @@ func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc return err } + close(s.ready) return nil } diff --git a/firehose/info/info_filler.go b/firehose/info/info_filler.go index db95207..437be60 100644 --- a/firehose/info/info_filler.go +++ b/firehose/info/info_filler.go @@ -8,15 +8,33 @@ import ( pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" ) -var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse) error { +var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error { resp.FirstStreamableBlockId = firstStreamableBlock.Id + for _, protocol := range wellknown.WellKnownProtocols { + if protocol.BlockType == firstStreamableBlock.Payload.TypeUrl { + resp.BlockIdEncoding = protocol.BytesEncoding + break + } + } + + if !validate { + if resp.ChainName == "" { + // still try to fill the chain name if it is not given + if chain := wellknown.WellKnownProtocols.ChainByGenesisBlock(firstStreamableBlock.Number, firstStreamableBlock.Id); chain != nil { + resp.ChainName = chain.Name + resp.ChainNameAliases = chain.Aliases + } + } + return nil + } + if resp.ChainName != "" { if chain := wellknown.WellKnownProtocols.ChainByName(resp.ChainName); chain != nil { if firstStreamableBlock.Number == chain.GenesisBlockNumber && chain.GenesisBlockID != firstStreamableBlock.Id { // we don't check if the firstStreamableBlock is something other than our well-known genesis block return fmt.Errorf("chain name defined in flag: %q inconsistent with the genesis block ID %q (expected: %q)", resp.ChainName, ox(firstStreamableBlock.Id), ox(chain.GenesisBlockID)) } - resp.ChainName = chain.Name + resp.ChainName = chain.Name // ensure we use the canonical name if the user provided one of the aliases resp.ChainNameAliases = chain.Aliases } else if chain := wellknown.WellKnownProtocols.ChainByGenesisBlock(firstStreamableBlock.Number, firstStreamableBlock.Id); chain != nil { return fmt.Errorf("chain name defined in flag: %q inconsistent with the one discovered from genesis block %q", resp.ChainName, chain.Name) @@ -28,13 +46,6 @@ var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp } } - for _, protocol := range wellknown.WellKnownProtocols { - if protocol.BlockType == firstStreamableBlock.Payload.TypeUrl { - resp.BlockIdEncoding = protocol.BytesEncoding - break - } - } - // Extra validation for ethereum blocks if firstStreamableBlock.Payload.TypeUrl == "type.googleapis.com/sf.ethereum.type.v2.Block" { var seenDetailLevel bool