diff --git a/.gitignore b/.gitignore index 85d77f0..c6d0140 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea +.vscode /build /dist .envrc diff --git a/CHANGELOG.md b/CHANGELOG.md index 1eb4f6c..196cb37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,14 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s ## Unreleased +* Add `sf.firehose.v2.EndpointInfo/Info` service on Firehose and Substreams endpoints. This involves the following new flags: + - `advertise-chain-name` Canonical name of the chain, from the list here: https://thegraph.com/docs/en/developing/supported-networks/ (required) + - `advertise-chain-aliases` Alternate names for that chain (optional) + - `advertise-block-features` Only required for ethereum blocks, automatically discovered if run from `firehose-ethereum` program + - `advertise-block-id-encoding` Required, one of [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] + +* The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service. + * Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module. * Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling diff --git a/chain.go b/chain.go index 789c6e5..09bac18 100644 --- a/chain.go +++ b/chain.go @@ -6,6 +6,7 @@ import ( "runtime/debug" "strings" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "github.com/streamingfast/substreams/wasm" "github.com/spf13/cobra" @@ -154,6 +155,10 @@ type Chain[B Block] struct { DefaultBlockType string RegisterSubstreamsExtensions func() (wasm.WASMExtensioner, error) + + // InfoResponseFiller is a function that fills the `pbfirehose.InfoResponse` from the first streamable block of the chain. + // It can validate that we are on the right chain by checking against a known hash, or populate missing fields. + InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse) error } type ToolsConfig[B Block] struct { @@ -261,6 +266,10 @@ func (c *Chain[B]) Validate() { err = multierr.Append(err, fmt.Errorf("field 'BlockIndexerFactories' must have at most one element")) } + if c.InfoResponseFiller == nil { + err = multierr.Append(err, fmt.Errorf("field 'InfoResponseFiller' must be set")) + } + for key, indexerFactory := range c.BlockIndexerFactories { if indexerFactory == nil { err = multierr.Append(err, fmt.Errorf("entry %q for field 'BlockIndexerFactories' must be non-nil", key)) diff --git a/cmd/apps/firehose.go b/cmd/apps/firehose.go index 4585299..de38dee 100644 --- a/cmd/apps/firehose.go +++ b/cmd/apps/firehose.go @@ -101,6 +101,7 @@ func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *za HeadBlockNumberMetric: headBlockNumMetric, TransformRegistry: registry, CheckPendingShutdown: runtime.IsPendingShutdown, + InfoServer: runtime.InfoServer, }), nil }, }) diff --git a/cmd/apps/start.go b/cmd/apps/start.go index b191290..2a3d551 100644 --- a/cmd/apps/start.go +++ b/cmd/apps/start.go @@ -26,7 +26,9 @@ import ( "github.com/streamingfast/cli/sflags" "github.com/streamingfast/dmetering" firecore "github.com/streamingfast/firehose-core" + info "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/firehose-core/launcher" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" tracing "github.com/streamingfast/sf-tracing" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -45,7 +47,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st configFile := sflags.MustGetString(cmd, "config-file") rootLog.Info(fmt.Sprintf("starting Firehose on %s with config file '%s'", chain.LongName, configFile)) - err = start(cmd, dataDir, args, rootLog) + err = start(cmd, dataDir, args, chain, rootLog) if err != nil { return fmt.Errorf("unable to launch: %w", err) } @@ -55,7 +57,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st } } -func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logger) (err error) { +func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string, chain *firecore.Chain[B], rootLog *zap.Logger) (err error) { dataDirAbs, err := filepath.Abs(dataDir) if err != nil { return fmt.Errorf("unable to setup directory structure: %w", err) @@ -82,7 +84,21 @@ func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logge }() dmetering.SetDefaultEmitter(eventEmitter) - launch := launcher.NewLauncher(rootLog, dataDirAbs) + blockIDEncoding := pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET + if enc := sflags.MustGetString(cmd, "advertise-block-id-encoding"); enc != "" { + blockIDEncoding = pbfirehose.InfoResponse_BlockIdEncoding(pbfirehose.InfoResponse_BlockIdEncoding_value[enc]) + } + + infoServer := info.NewInfoServer( + sflags.MustGetString(cmd, "advertise-chain-name"), + sflags.MustGetStringSlice(cmd, "advertise-chain-aliases"), + blockIDEncoding, + sflags.MustGetStringSlice(cmd, "advertise-block-features"), + bstream.GetProtocolFirstStreamableBlock, + chain.InfoResponseFiller, + ) + + launch := launcher.NewLauncher(rootLog, dataDirAbs, infoServer) rootLog.Debug("launcher created") runByDefault := func(app string) bool { diff --git a/cmd/firecore/main.go b/cmd/firecore/main.go index 3eb14cf..be062c7 100644 --- a/cmd/firecore/main.go +++ b/cmd/firecore/main.go @@ -4,6 +4,7 @@ import ( pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" firecore "github.com/streamingfast/firehose-core" fhCMD "github.com/streamingfast/firehose-core/cmd" + info "github.com/streamingfast/firehose-core/firehose/info" ) func main() { @@ -17,6 +18,7 @@ func main() { Version: version, BlockFactory: func() firecore.Block { return new(pbbstream.Block) }, ConsoleReaderFactory: firecore.NewConsoleReader, + InfoResponseFiller: info.DefaultInfoResponseFiller, Tools: &firecore.ToolsConfig[*pbbstream.Block]{}, }) } diff --git a/cmd/main.go b/cmd/main.go index 7c0793d..cf1c713 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -23,6 +23,7 @@ import ( "github.com/streamingfast/firehose-core/cmd/tools" "github.com/streamingfast/firehose-core/launcher" paymentGatewayMetering "github.com/streamingfast/payment-gateway/metering" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "github.com/streamingfast/logging" "go.uber.org/zap" @@ -171,6 +172,20 @@ func registerCommonFlags[B firecore.Block](chain *firecore.Chain[B]) { cmd.Flags().String("common-forked-blocks-store-url", firecore.ForkedBlocksStoreURL, "[COMMON] Store URL where to read/write forked block files that we want to keep.") cmd.Flags().String("common-live-blocks-addr", firecore.RelayerServingAddr, "[COMMON] gRPC endpoint to get real-time blocks.") + cmd.Flags().String("advertise-chain-name", "", "[firehose,substreams-tier1] Chain name to advertise in the Info Endpoint. Required but it may be inferred from the genesis blocks.") + cmd.Flags().StringSlice("advertise-chain-aliases", nil, "[firehose,substreams-tier1] List of chain name aliases to advertise in the Info Endpoint. If unset, it may be inferred from the genesis blocks.") + cmd.Flags().StringSlice("advertise-block-features", nil, "[firehose,substreams-tier1] List of block features to advertise in the Info Endpoint. If unset, it may be inferred from the genesis block.") + + acceptedEncodings := make([]string, len(pbfirehose.InfoResponse_BlockIdEncoding_value)-1) + i := 0 + for encoding := range pbfirehose.InfoResponse_BlockIdEncoding_value { + if encoding != "BLOCK_ID_ENCODING_UNSET" { + acceptedEncodings[i] = encoding + i++ + } + } + cmd.Flags().String("advertise-block-id-encoding", "", fmt.Sprintf("[firehose,substreams-tier1] Block ID encoding type to advertise in the Info Endpoint (%s). If unset, it may be inferred from the genesis block.", strings.Join(acceptedEncodings, ", "))) + cmd.Flags().String("common-index-store-url", firecore.IndexStoreURL, "[COMMON] Store URL where to read/write index files (if used on the chain).") cmd.Flags().IntSlice("common-index-block-sizes", []int{100000, 10000, 1000, 100}, "Index bundle sizes that that are considered valid when looking for block indexes") diff --git a/firehose/app/firehose/app.go b/firehose/app/firehose/app.go index aa02b52..f9b57c4 100644 --- a/firehose/app/firehose/app.go +++ b/firehose/app/firehose/app.go @@ -31,6 +31,7 @@ import ( "github.com/streamingfast/dstore" firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-core/firehose" + "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/firehose-core/firehose/metrics" "github.com/streamingfast/firehose-core/firehose/server" "github.com/streamingfast/logging" @@ -64,6 +65,7 @@ type Modules struct { TransformRegistry *transform.Registry RegisterServiceExtension RegisterServiceExtensionFunc CheckPendingShutdown func() bool + InfoServer *info.InfoServer } type App struct { @@ -158,6 +160,7 @@ func (a *App) Run() error { a.IsReady, a.config.GRPCListenAddr, a.config.ServiceDiscoveryURL, + a.modules.InfoServer, a.config.ServerOptions..., ) @@ -186,6 +189,14 @@ func (a *App) Run() error { } } + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + if err := a.modules.InfoServer.Init(ctx, forkableHub, mergedBlocksStore, oneBlocksStore, a.logger); err != nil { + a.Shutdown(fmt.Errorf("cannot initialize info server: %w", err)) + } + }() + a.logger.Info("launching gRPC firehoseServer", zap.Bool("live_support", withLive)) a.isReady.CAS(false, true) firehoseServer.Launch() diff --git a/firehose/info/endpoint_info.go b/firehose/info/endpoint_info.go new file mode 100644 index 0000000..d1c76eb --- /dev/null +++ b/firehose/info/endpoint_info.go @@ -0,0 +1,165 @@ +package info + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/streamingfast/bstream" + "github.com/streamingfast/bstream/hub" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dstore" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" + "go.uber.org/zap" +) + +type InfoServer struct { + responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error + response *pbfirehose.InfoResponse + ready chan struct{} + once sync.Once +} + +func (s *InfoServer) Info(ctx context.Context, request *pbfirehose.InfoRequest) (*pbfirehose.InfoResponse, error) { + select { + case <-s.ready: + return s.response, nil + default: + return nil, fmt.Errorf("info server not ready") + } +} + +func NewInfoServer( + chainName string, + chainNameAliases []string, + blockIDEncoding pbfirehose.InfoResponse_BlockIdEncoding, + blockFeatures []string, + firstStreamableBlock uint64, + responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error, +) *InfoServer { + + resp := &pbfirehose.InfoResponse{ + ChainName: chainName, + ChainNameAliases: chainNameAliases, + BlockIdEncoding: blockIDEncoding, + BlockFeatures: blockFeatures, + FirstStreamableBlockNum: firstStreamableBlock, + } + + return &InfoServer{ + responseFiller: responseFiller, + response: resp, + ready: make(chan struct{}), + } +} + +func validateInfoResponse(resp *pbfirehose.InfoResponse) error { + switch { + case resp.ChainName == "": + return fmt.Errorf("chain name is not set") + case resp.BlockIdEncoding == pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET: + return fmt.Errorf("block id encoding is not set") + case resp.FirstStreamableBlockId == "": + return fmt.Errorf("first streamable block id is not set") + } + + return nil +} + +// multiple apps (firehose, substreams...) can initialize the same server, we only need one +func (s *InfoServer) Init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) (err error) { + s.once.Do(func() { err = s.init(ctx, fhub, mergedBlocksStore, oneBlockStore, logger) }) + return +} + +func (s *InfoServer) getBlockFromMergedBlocksStore(ctx context.Context, blockNum uint64, mergedBlocksStore dstore.Store) *pbbstream.Block { + for { + if ctx.Err() != nil { + return nil + } + + block, err := bstream.FetchBlockFromMergedBlocksStore(ctx, blockNum, mergedBlocksStore) + if err != nil { + continue + } + return block + } +} + +func (s *InfoServer) getBlockFromForkableHub(ctx context.Context, blockNum uint64, forkableHub *hub.ForkableHub) *pbbstream.Block { + for { + if ctx.Err() != nil { + return nil + } + + block := forkableHub.GetBlock(s.response.FirstStreamableBlockNum, "") + if block == nil { + time.Sleep(time.Millisecond * 500) + continue + } + return block + } + +} + +func (s *InfoServer) getBlockFromOneBlockStore(ctx context.Context, blockNum uint64, oneBlockStore dstore.Store) *pbbstream.Block { + for { + if ctx.Err() != nil { + return nil + } + + block, err := bstream.FetchBlockFromOneBlockStore(ctx, blockNum, "", oneBlockStore) + if err != nil { + continue + } + return block + } +} + +// init tries to fetch the first streamable block from the different sources and fills the response with it +// returns an error if it is incomplete +func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + ch := make(chan *pbbstream.Block) + + if fhub != nil { + go func() { + select { + case ch <- s.getBlockFromForkableHub(ctx, s.response.FirstStreamableBlockNum, fhub): + case <-ctx.Done(): + } + }() + } + + go func() { + select { + case ch <- s.getBlockFromMergedBlocksStore(ctx, s.response.FirstStreamableBlockNum, mergedBlocksStore): + case <-ctx.Done(): + } + }() + + go func() { + select { + case ch <- s.getBlockFromOneBlockStore(ctx, s.response.FirstStreamableBlockNum, oneBlockStore): + case <-ctx.Done(): + } + }() + + select { + case blk := <-ch: + if err := s.responseFiller(blk, s.response); err != nil { + return err + } + case <-ctx.Done(): + } + + if err := validateInfoResponse(s.response); err != nil { + return err + } + + close(s.ready) + return nil +} diff --git a/firehose/info/info_filler.go b/firehose/info/info_filler.go new file mode 100644 index 0000000..674853f --- /dev/null +++ b/firehose/info/info_filler.go @@ -0,0 +1,60 @@ +package info + +import ( + "fmt" + + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" +) + +var DefaultInfoResponseFiller = func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { + resp.FirstStreamableBlockId = block.Id + + switch block.Payload.TypeUrl { + case "type.googleapis.com/sf.antelope.type.v1.Block": + return fillInfoResponseForAntelope(block, resp) + + case "type.googleapis.com/sf.ethereum.type.v2.Block": + return fillInfoResponseForEthereum(block, resp) + + case "type.googleapis.com/sf.cosmos.type.v1.Block": + return fillInfoResponseForCosmos(block, resp) + + case "type.googleapis.com/sf.solana.type.v1.Block": + return fillInfoResponseForSolana(block, resp) + } + + return nil +} + +// this is a simple helper, a full implementation would live in github.com/streamingfast/firehose-ethereum +func fillInfoResponseForEthereum(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { + resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX + var seenBlockType bool + for _, feature := range resp.BlockFeatures { + if feature == "extended" || feature == "base" || feature == "hybrid" { + seenBlockType = true + break + } + } + if !seenBlockType { + return fmt.Errorf("invalid block features, missing 'base', 'extended' or 'hybrid'") + } + return nil +} + +// this is a simple helper, a full implementation would live in github.com/pinax-network/firehose-antelope +func fillInfoResponseForAntelope(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { + resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX + return nil +} + +func fillInfoResponseForCosmos(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { + resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_HEX + return nil +} + +func fillInfoResponseForSolana(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error { + resp.BlockIdEncoding = pbfirehose.InfoResponse_BLOCK_ID_ENCODING_BASE58 + return nil +} diff --git a/firehose/server/server.go b/firehose/server/server.go index 0a9a208..f4b52c4 100644 --- a/firehose/server/server.go +++ b/firehose/server/server.go @@ -16,6 +16,7 @@ import ( "github.com/streamingfast/dmetrics" firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-core/firehose" + "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/firehose-core/firehose/rate" pbfirehoseV1 "github.com/streamingfast/pbgo/sf/firehose/v1" pbfirehoseV2 "github.com/streamingfast/pbgo/sf/firehose/v2" @@ -61,6 +62,7 @@ func New( isReady func(context.Context) bool, listenAddr string, serviceDiscoveryURL *url.URL, + infoServer *info.InfoServer, opts ...Option, ) *Server { initFunc := func(ctx context.Context, _ *pbfirehoseV2.Request) context.Context { @@ -140,6 +142,7 @@ func New( if blockGetter != nil { pbfirehoseV2.RegisterFetchServer(gs, s) } + pbfirehoseV2.RegisterEndpointInfoServer(gs, infoServer) pbfirehoseV2.RegisterStreamServer(gs, s) pbfirehoseV1.RegisterStreamServer(gs, NewFirehoseProxyV1ToV2(s)) // compatibility with firehose }) diff --git a/go.mod b/go.mod index 893fcf8..6e64aa2 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 - github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d + github.com/streamingfast/pbgo v0.0.6-0.20240821201153-468db4096ff0 github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 github.com/streamingfast/substreams v1.9.4-0.20240812210000-635f7bcba6cf github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 6b6666c..df9d1e2 100644 --- a/go.sum +++ b/go.sum @@ -572,8 +572,8 @@ github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef h1:9IVFHR github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef/go.mod h1:cq8CvbZ3ioFmGrHokSAJalS0lC+pVXLKhITScItUGXY= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 h1:bliib3pAObbM+6cKYQFa8axbCY/x6RczQZrOxdM7OZA= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2/go.mod h1:DsnLrpKZ3DIDL6FmYVuxbC44fXvQdY7aCdSLMpbqZ8Q= -github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d h1:rgXXfBFlQ9C8casyay7UL53VSGR6JoUnhqGv4h6lhxM= -github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= +github.com/streamingfast/pbgo v0.0.6-0.20240821201153-468db4096ff0 h1:kfzU2GvvWt+RQtrHvfqv7zdRA4xv/3AusccIGG3+roM= +github.com/streamingfast/pbgo v0.0.6-0.20240821201153-468db4096ff0/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d h1:33VIARqUqBUKXJcuQoOS1rVSms54tgxhhNCmrLptpLg= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d/go.mod h1:aBJivEdekmFWYSQ29EE/fN9IanJWJXbtjy3ky0XD/jE= github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90 h1:94HllkX4ttYVilo8ZJv05b5z8JiMmqBvv4+Jdgk/+2A= diff --git a/launcher/launcher.go b/launcher/launcher.go index 4c8f086..bffca60 100644 --- a/launcher/launcher.go +++ b/launcher/launcher.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/streamingfast/firehose-core/firehose/info" "github.com/streamingfast/shutter" "go.uber.org/atomic" "go.uber.org/zap" @@ -42,7 +43,7 @@ type Launcher struct { logger *zap.Logger } -func NewLauncher(logger *zap.Logger, absDataDir string) *Launcher { +func NewLauncher(logger *zap.Logger, absDataDir string, infoServer *info.InfoServer) *Launcher { l := &Launcher{ shutter: shutter.New(), apps: make(map[string]App), @@ -53,6 +54,7 @@ func NewLauncher(logger *zap.Logger, absDataDir string) *Launcher { l.runtime = &Runtime{ AbsDataDir: absDataDir, + InfoServer: infoServer, IsPendingShutdown: func() bool { return l.hasBeenSignaled.Load() }, diff --git a/launcher/runtime.go b/launcher/runtime.go index a5df1b3..271189e 100644 --- a/launcher/runtime.go +++ b/launcher/runtime.go @@ -1,7 +1,10 @@ package launcher +import "github.com/streamingfast/firehose-core/firehose/info" + type Runtime struct { AbsDataDir string + InfoServer *info.InfoServer // IsPendingShutdown is a function that is going to return true as soon as the initial SIGINT signal is // received which can be used to turn a healthz monitor as unhealthy so that a load balancer can