Skip to content

Commit

Permalink
add sf.firehose.v2.EndpointInfo/Info endpoint and logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Aug 21, 2024
1 parent ad3d137 commit 51b2c61
Show file tree
Hide file tree
Showing 15 changed files with 303 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
.vscode
/build
/dist
.envrc
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s

## Unreleased

* Add `sf.firehose.v2.EndpointInfo/Info` service on Firehose and Substreams endpoints. This involves the following new flags:
- `advertise-chain-name` Canonical name of the chain, from the list here: https://thegraph.com/docs/en/developing/supported-networks/ (required)
- `advertise-chain-aliases` Alternate names for that chain (optional)
- `advertise-block-features` Only required for ethereum blocks, automatically discovered if run from `firehose-ethereum` program
- `advertise-block-id-encoding` Required, one of [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX]

* The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service.

* Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module.
* Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling

Expand Down
9 changes: 9 additions & 0 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"runtime/debug"
"strings"

pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"github.com/streamingfast/substreams/wasm"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -154,6 +155,10 @@ type Chain[B Block] struct {
DefaultBlockType string

RegisterSubstreamsExtensions func() (wasm.WASMExtensioner, error)

// InfoResponseFiller is a function that fills the `pbfirehose.InfoResponse` from the first streamable block of the chain.
// It can validate that we are on the right chain by checking against a known hash, or populate missing fields.
InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse) error
}

type ToolsConfig[B Block] struct {
Expand Down Expand Up @@ -261,6 +266,10 @@ func (c *Chain[B]) Validate() {
err = multierr.Append(err, fmt.Errorf("field 'BlockIndexerFactories' must have at most one element"))
}

if c.InfoResponseFiller == nil {
err = multierr.Append(err, fmt.Errorf("field 'InfoResponseFiller' must be set"))
}

for key, indexerFactory := range c.BlockIndexerFactories {
if indexerFactory == nil {
err = multierr.Append(err, fmt.Errorf("entry %q for field 'BlockIndexerFactories' must be non-nil", key))
Expand Down
1 change: 1 addition & 0 deletions cmd/apps/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *za
HeadBlockNumberMetric: headBlockNumMetric,
TransformRegistry: registry,
CheckPendingShutdown: runtime.IsPendingShutdown,
InfoServer: runtime.InfoServer,
}), nil
},
})
Expand Down
22 changes: 19 additions & 3 deletions cmd/apps/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dmetering"
firecore "github.com/streamingfast/firehose-core"
info "github.com/streamingfast/firehose-core/firehose/info"
"github.com/streamingfast/firehose-core/launcher"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
tracing "github.com/streamingfast/sf-tracing"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand All @@ -45,7 +47,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st
configFile := sflags.MustGetString(cmd, "config-file")
rootLog.Info(fmt.Sprintf("starting Firehose on %s with config file '%s'", chain.LongName, configFile))

err = start(cmd, dataDir, args, rootLog)
err = start(cmd, dataDir, args, chain, rootLog)
if err != nil {
return fmt.Errorf("unable to launch: %w", err)
}
Expand All @@ -55,7 +57,7 @@ func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName st
}
}

func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logger) (err error) {
func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string, chain *firecore.Chain[B], rootLog *zap.Logger) (err error) {
dataDirAbs, err := filepath.Abs(dataDir)
if err != nil {
return fmt.Errorf("unable to setup directory structure: %w", err)
Expand All @@ -82,7 +84,21 @@ func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logge
}()
dmetering.SetDefaultEmitter(eventEmitter)

launch := launcher.NewLauncher(rootLog, dataDirAbs)
blockIDEncoding := pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET
if enc := sflags.MustGetString(cmd, "advertise-block-id-encoding"); enc != "" {
blockIDEncoding = pbfirehose.InfoResponse_BlockIdEncoding(pbfirehose.InfoResponse_BlockIdEncoding_value[enc])
}

infoServer := info.NewInfoServer(
sflags.MustGetString(cmd, "advertise-chain-name"),
sflags.MustGetStringSlice(cmd, "advertise-chain-aliases"),
blockIDEncoding,
sflags.MustGetStringSlice(cmd, "advertise-block-features"),
bstream.GetProtocolFirstStreamableBlock,
chain.InfoResponseFiller,
)

launch := launcher.NewLauncher(rootLog, dataDirAbs, infoServer)
rootLog.Debug("launcher created")

runByDefault := func(app string) bool {
Expand Down
2 changes: 2 additions & 0 deletions cmd/firecore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
firecore "github.com/streamingfast/firehose-core"
fhCMD "github.com/streamingfast/firehose-core/cmd"
info "github.com/streamingfast/firehose-core/firehose/info"
)

func main() {
Expand All @@ -17,6 +18,7 @@ func main() {
Version: version,
BlockFactory: func() firecore.Block { return new(pbbstream.Block) },
ConsoleReaderFactory: firecore.NewConsoleReader,
InfoResponseFiller: info.DefaultInfoResponseFiller,
Tools: &firecore.ToolsConfig[*pbbstream.Block]{},
})
}
Expand Down
15 changes: 15 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/streamingfast/firehose-core/cmd/tools"
"github.com/streamingfast/firehose-core/launcher"
paymentGatewayMetering "github.com/streamingfast/payment-gateway/metering"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"

"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand Down Expand Up @@ -171,6 +172,20 @@ func registerCommonFlags[B firecore.Block](chain *firecore.Chain[B]) {
cmd.Flags().String("common-forked-blocks-store-url", firecore.ForkedBlocksStoreURL, "[COMMON] Store URL where to read/write forked block files that we want to keep.")
cmd.Flags().String("common-live-blocks-addr", firecore.RelayerServingAddr, "[COMMON] gRPC endpoint to get real-time blocks.")

cmd.Flags().String("advertise-chain-name", "", "[firehose,substreams-tier1] Chain name to advertise in the Info Endpoint. Required but it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-chain-aliases", nil, "[firehose,substreams-tier1] List of chain name aliases to advertise in the Info Endpoint. If unset, it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-block-features", nil, "[firehose,substreams-tier1] List of block features to advertise in the Info Endpoint. If unset, it may be inferred from the genesis block.")

acceptedEncodings := make([]string, len(pbfirehose.InfoResponse_BlockIdEncoding_value)-1)
i := 0
for encoding := range pbfirehose.InfoResponse_BlockIdEncoding_value {
if encoding != "BLOCK_ID_ENCODING_UNSET" {
acceptedEncodings[i] = encoding
i++
}
}
cmd.Flags().String("advertise-block-id-encoding", "", fmt.Sprintf("[firehose,substreams-tier1] Block ID encoding type to advertise in the Info Endpoint (%s). If unset, it may be inferred from the genesis block.", strings.Join(acceptedEncodings, ", ")))

cmd.Flags().String("common-index-store-url", firecore.IndexStoreURL, "[COMMON] Store URL where to read/write index files (if used on the chain).")
cmd.Flags().IntSlice("common-index-block-sizes", []int{100000, 10000, 1000, 100}, "Index bundle sizes that that are considered valid when looking for block indexes")

Expand Down
11 changes: 11 additions & 0 deletions firehose/app/firehose/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/firehose"
"github.com/streamingfast/firehose-core/firehose/info"
"github.com/streamingfast/firehose-core/firehose/metrics"
"github.com/streamingfast/firehose-core/firehose/server"
"github.com/streamingfast/logging"
Expand Down Expand Up @@ -64,6 +65,7 @@ type Modules struct {
TransformRegistry *transform.Registry
RegisterServiceExtension RegisterServiceExtensionFunc
CheckPendingShutdown func() bool
InfoServer *info.InfoServer
}

type App struct {
Expand Down Expand Up @@ -158,6 +160,7 @@ func (a *App) Run() error {
a.IsReady,
a.config.GRPCListenAddr,
a.config.ServiceDiscoveryURL,
a.modules.InfoServer,
a.config.ServerOptions...,
)

Expand Down Expand Up @@ -186,6 +189,14 @@ func (a *App) Run() error {
}
}

go func() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
if err := a.modules.InfoServer.Init(ctx, forkableHub, mergedBlocksStore, oneBlocksStore, a.logger); err != nil {
a.Shutdown(fmt.Errorf("cannot initialize info server: %w", err))
}
}()

a.logger.Info("launching gRPC firehoseServer", zap.Bool("live_support", withLive))
a.isReady.CAS(false, true)
firehoseServer.Launch()
Expand Down
165 changes: 165 additions & 0 deletions firehose/info/endpoint_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package info

import (
"context"
"fmt"
"sync"
"time"

"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/hub"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dstore"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.uber.org/zap"
)

type InfoServer struct {
responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error
response *pbfirehose.InfoResponse
ready chan struct{}
once sync.Once
}

func (s *InfoServer) Info(ctx context.Context, request *pbfirehose.InfoRequest) (*pbfirehose.InfoResponse, error) {
select {
case <-s.ready:
return s.response, nil
default:
return nil, fmt.Errorf("info server not ready")
}
}

func NewInfoServer(
chainName string,
chainNameAliases []string,
blockIDEncoding pbfirehose.InfoResponse_BlockIdEncoding,
blockFeatures []string,
firstStreamableBlock uint64,
responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error,
) *InfoServer {

resp := &pbfirehose.InfoResponse{
ChainName: chainName,
ChainNameAliases: chainNameAliases,
BlockIdEncoding: blockIDEncoding,
BlockFeatures: blockFeatures,
FirstStreamableBlockNum: firstStreamableBlock,
}

return &InfoServer{
responseFiller: responseFiller,
response: resp,
ready: make(chan struct{}),
}
}

func validateInfoResponse(resp *pbfirehose.InfoResponse) error {
switch {
case resp.ChainName == "":
return fmt.Errorf("chain name is not set")
case resp.BlockIdEncoding == pbfirehose.InfoResponse_BLOCK_ID_ENCODING_UNSET:
return fmt.Errorf("block id encoding is not set")
case resp.FirstStreamableBlockId == "":
return fmt.Errorf("first streamable block id is not set")
}

return nil
}

// multiple apps (firehose, substreams...) can initialize the same server, we only need one
func (s *InfoServer) Init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) (err error) {
s.once.Do(func() { err = s.init(ctx, fhub, mergedBlocksStore, oneBlockStore, logger) })
return
}

func (s *InfoServer) getBlockFromMergedBlocksStore(ctx context.Context, blockNum uint64, mergedBlocksStore dstore.Store) *pbbstream.Block {
for {
if ctx.Err() != nil {
return nil
}

block, err := bstream.FetchBlockFromMergedBlocksStore(ctx, blockNum, mergedBlocksStore)
if err != nil {
continue
}
return block
}
}

func (s *InfoServer) getBlockFromForkableHub(ctx context.Context, blockNum uint64, forkableHub *hub.ForkableHub) *pbbstream.Block {
for {
if ctx.Err() != nil {
return nil
}

block := forkableHub.GetBlock(s.response.FirstStreamableBlockNum, "")
if block == nil {
time.Sleep(time.Millisecond * 500)
continue
}
return block
}

}

func (s *InfoServer) getBlockFromOneBlockStore(ctx context.Context, blockNum uint64, oneBlockStore dstore.Store) *pbbstream.Block {
for {
if ctx.Err() != nil {
return nil
}

block, err := bstream.FetchBlockFromOneBlockStore(ctx, blockNum, "", oneBlockStore)
if err != nil {
continue
}
return block
}
}

// init tries to fetch the first streamable block from the different sources and fills the response with it
// returns an error if it is incomplete
func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

ch := make(chan *pbbstream.Block)

if fhub != nil {
go func() {
select {
case ch <- s.getBlockFromForkableHub(ctx, s.response.FirstStreamableBlockNum, fhub):
case <-ctx.Done():
}
}()
}

go func() {
select {
case ch <- s.getBlockFromMergedBlocksStore(ctx, s.response.FirstStreamableBlockNum, mergedBlocksStore):
case <-ctx.Done():
}
}()

go func() {
select {
case ch <- s.getBlockFromOneBlockStore(ctx, s.response.FirstStreamableBlockNum, oneBlockStore):
case <-ctx.Done():
}
}()

select {
case blk := <-ch:
if err := s.responseFiller(blk, s.response); err != nil {
return err
}
case <-ctx.Done():
}

if err := validateInfoResponse(s.response); err != nil {
return err
}

close(s.ready)
return nil
}
Loading

0 comments on commit 51b2c61

Please sign in to comment.