diff --git a/.gitignore b/.gitignore index c6d0140..44db5e9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,6 @@ .envrc .env .DS_Store -firehose-data* \ No newline at end of file +firehose-data* +/firecore +/firehose.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index f827a67..d19b81c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ Operators, you should copy/paste content of this content straight to your projec If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary. +## Unreleased + +* Add `reader-node-firehose` which creates one-blocks by consuming blocks from an already existing Firehose endpoint. This can be used to set up an indexer stack without having to run an instrumented blockchain node. + ## v1.6.1 * Bump substreams to v1.10.0: Version 1.10.0 adds a new `EndpointInfo/Info` endpoint, introduces a 3-minute default execution timeout per block, updates metering metrics with a deprecation warning, enhances `substreams init` commands, and improves wasm module caching and Prometheus tool flexibility. Full changelog: https://github.com/streamingfast/substreams/releases/tag/v1.10.0 diff --git a/cmd/apps/reader_node_firehose.go b/cmd/apps/reader_node_firehose.go new file mode 100644 index 0000000..f2b2f73 --- /dev/null +++ b/cmd/apps/reader_node_firehose.go @@ -0,0 +1,80 @@ +// Copyright 2021 dfuse Platform Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apps + +import ( + "github.com/spf13/cobra" + "github.com/spf13/viper" + firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-core/launcher" + nodeManager "github.com/streamingfast/firehose-core/node-manager" + "github.com/streamingfast/firehose-core/node-manager/app/firehose_reader" + "github.com/streamingfast/firehose-core/node-manager/metrics" + "github.com/streamingfast/logging" + "go.uber.org/zap" + "os" +) + +func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) { + appLogger, appTracer := logging.PackageLogger("reader-node-firehose", chain.LoggerPackageID("reader-node-firehose")) + + launcher.RegisterApp(rootLog, &launcher.AppDef{ + ID: "reader-node-firehose", + Title: "Reader Node (Firehose)", + Description: "Blocks reading node, consumes blocks from an already existing Firehose endpoint. This can be used to set up an indexer stack without having to run an instrumented blockchain node.", + RegisterFlags: func(cmd *cobra.Command) error { + cmd.Flags().String("reader-node-firehose-endpoint", "", "Firehose endpoint to connect to.") + cmd.Flags().String("reader-node-firehose-state", "{data-dir}/reader/state", "State file to store the cursor from the Firehose connection in.") + cmd.Flags().String("reader-node-firehose-compression", "zstd", "Firehose compression, one of 'gzip', 'zstd' or 'none'.") + cmd.Flags().Bool("reader-node-firehose-insecure", false, "Skip TLS validation when connecting to a Firehose endpoint.") + cmd.Flags().Bool("reader-node-firehose-plaintext", false, "Connect to a Firehose endpoint using a non-encrypted, plaintext connection.") + cmd.Flags().String("reader-node-firehose-api-key-env-var", "FIREHOSE_API_KEY", "Look for an API key directly in this environment variable to authenticate against endpoint (alternative to api-token-env-var)") + cmd.Flags().String("reader-node-firehose-api-token-env-var", "FIREHOSE_API_TOKEN", "Look for a JWT in this environment variable to authenticate against endpoint (alternative to api-key-env-var)") + + return nil + }, + FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) { + sfDataDir := runtime.AbsDataDir + archiveStoreURL := firecore.MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url")) + + metricID := "reader-node-firehose" + headBlockTimeDrift := metrics.NewHeadBlockTimeDrift(metricID) + headBlockNumber := metrics.NewHeadBlockNumber(metricID) + appReadiness := metrics.NewAppReadiness(metricID) + metricsAndReadinessManager := nodeManager.NewMetricsAndReadinessManager(headBlockTimeDrift, headBlockNumber, appReadiness, viper.GetDuration("reader-node-readiness-max-latency")) + return firehose_reader.New(&firehose_reader.Config{ + GRPCAddr: viper.GetString("reader-node-grpc-listen-addr"), + OneBlocksStoreURL: archiveStoreURL, + MindReadBlocksChanCapacity: viper.GetInt("reader-node-blocks-chan-capacity"), + StartBlockNum: viper.GetUint64("reader-node-start-block-num"), + StopBlockNum: viper.GetUint64("reader-node-stop-block-num"), + WorkingDir: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir")), + OneBlockSuffix: viper.GetString("reader-node-one-block-suffix"), + + FirehoseConfig: firehose_reader.FirehoseConfig{ + Endpoint: viper.GetString("reader-node-firehose-endpoint"), + StateFile: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-firehose-state")), + InsecureConn: viper.GetBool("reader-node-firehose-insecure"), + PlaintextConn: viper.GetBool("reader-node-firehose-plaintext"), + Compression: viper.GetString("reader-node-firehose-compression"), + ApiKey: os.Getenv(viper.GetString("reader-node-firehose-api-key-env-var")), + Jwt: os.Getenv(viper.GetString("reader-node-firehose-api-token-env-var")), + }, + }, &firehose_reader.Modules{ + MetricsAndReadinessManager: metricsAndReadinessManager, + }, appLogger, appTracer), nil + }, + }) +} diff --git a/cmd/main.go b/cmd/main.go index d0774a0..1fd3953 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -101,6 +101,7 @@ func Main[B firecore.Block](chain *firecore.Chain[B]) { registerCommonFlags(chain) apps.RegisterReaderNodeApp(chain, rootLog) apps.RegisterReaderNodeStdinApp(chain, rootLog) + apps.RegisterReaderNodeFirehoseApp(chain, rootLog) apps.RegisterMergerApp(rootLog) apps.RegisterRelayerApp(rootLog) apps.RegisterFirehoseApp(chain, rootLog) diff --git a/node-manager/app/firehose_reader/app.go b/node-manager/app/firehose_reader/app.go new file mode 100644 index 0000000..4f1c044 --- /dev/null +++ b/node-manager/app/firehose_reader/app.go @@ -0,0 +1,151 @@ +// Copyright 2019 dfuse Platform Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package firehose_reader + +import ( + "fmt" + "github.com/streamingfast/bstream/blockstream" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + dgrpcserver "github.com/streamingfast/dgrpc/server" + dgrpcfactory "github.com/streamingfast/dgrpc/server/factory" + nodeManager "github.com/streamingfast/firehose-core/node-manager" + "github.com/streamingfast/firehose-core/node-manager/mindreader" + "github.com/streamingfast/logging" + pbheadinfo "github.com/streamingfast/pbgo/sf/headinfo/v1" + "github.com/streamingfast/shutter" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type Config struct { + GRPCAddr string + OneBlocksStoreURL string + OneBlockSuffix string + MindReadBlocksChanCapacity int + StartBlockNum uint64 + StopBlockNum uint64 + WorkingDir string + LogToZap bool + DebugDeepMind bool + FirehoseConfig FirehoseConfig +} + +type FirehoseConfig struct { + Endpoint string + StateFile string + PlaintextConn bool + InsecureConn bool + ApiKey string + Jwt string + Compression string +} + +type Modules struct { + MetricsAndReadinessManager *nodeManager.MetricsAndReadinessManager + RegisterGRPCService func(server grpc.ServiceRegistrar) error +} + +type App struct { + *shutter.Shutter + Config *Config + ReadyFunc func() + modules *Modules + zlogger *zap.Logger + tracer logging.Tracer +} + +func New(c *Config, modules *Modules, zlogger *zap.Logger, tracer logging.Tracer) *App { + n := &App{ + Shutter: shutter.New(), + Config: c, + ReadyFunc: func() {}, + modules: modules, + zlogger: zlogger, + tracer: tracer, + } + return n +} + +func (a *App) Run() error { + a.zlogger.Info("launching reader-node-firehose app (reading from firehose)", zap.Reflect("config", a.Config)) + + gs := dgrpcfactory.ServerFromOptions(dgrpcserver.WithLogger(a.zlogger)) + + blockStreamServer := blockstream.NewUnmanagedServer( + blockstream.ServerOptionWithLogger(a.zlogger), + blockstream.ServerOptionWithBuffer(1), + ) + + firehoseReader, err := NewFirehoseReader(a.Config.FirehoseConfig, a.zlogger) + if err != nil { + return err + } + + a.zlogger.Info("launching reader log plugin") + mindreaderLogPlugin, err := mindreader.NewMindReaderPlugin( + a.Config.OneBlocksStoreURL, + a.Config.WorkingDir, + firehoseReader.NoopConsoleReader, + a.Config.StartBlockNum, + a.Config.StopBlockNum, + a.Config.MindReadBlocksChanCapacity, + a.modules.MetricsAndReadinessManager.UpdateHeadBlock, + func(_ error) {}, + a.Config.OneBlockSuffix, + blockStreamServer, + a.zlogger, + a.tracer, + ) + if err != nil { + return err + } + + a.zlogger.Debug("configuring shutter") + mindreaderLogPlugin.OnTerminated(a.Shutdown) + a.OnTerminating(mindreaderLogPlugin.Shutdown) + + serviceRegistrar := gs.ServiceRegistrar() + pbheadinfo.RegisterHeadInfoServer(serviceRegistrar, blockStreamServer) + pbbstream.RegisterBlockStreamServer(serviceRegistrar, blockStreamServer) + + if a.modules.RegisterGRPCService != nil { + err := a.modules.RegisterGRPCService(gs.ServiceRegistrar()) + if err != nil { + return fmt.Errorf("register extra grpc service: %w", err) + } + } + gs.OnTerminated(a.Shutdown) + go gs.Launch(a.Config.GRPCAddr) + + a.zlogger.Debug("launching firehose reader") + err = firehoseReader.Launch(a.Config.StartBlockNum, a.Config.StopBlockNum, a.Config.FirehoseConfig.StateFile) + if err != nil { + return err + } + + a.zlogger.Debug("running reader log plugin") + mindreaderLogPlugin.Launch() + go a.modules.MetricsAndReadinessManager.Launch() + + return nil +} + +func (a *App) OnReady(f func()) { + a.ReadyFunc = f +} + +func (a *App) IsReady() bool { + return true +} diff --git a/node-manager/app/firehose_reader/console_reader.go b/node-manager/app/firehose_reader/console_reader.go new file mode 100644 index 0000000..7c153a2 --- /dev/null +++ b/node-manager/app/firehose_reader/console_reader.go @@ -0,0 +1,144 @@ +package firehose_reader + +import ( + "context" + "errors" + "fmt" + "github.com/mostynb/go-grpc-compression/zstd" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/firehose-core/firehose/client" + "github.com/streamingfast/firehose-core/node-manager/mindreader" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" + "os" + "time" +) + +type FirehoseReader struct { + firehoseClient pbfirehose.StreamClient + firehoseStream pbfirehose.Stream_BlocksClient + closeFunc func() error + callOpts []grpc.CallOption + zlogger *zap.Logger + cursorStateFile string + cursor string + stats *firehoseReaderStats +} + +func NewFirehoseReader(config FirehoseConfig, zlogger *zap.Logger) (*FirehoseReader, error) { + + firehoseClient, closeFunc, callOpts, err := client.NewFirehoseClient(config.Endpoint, config.Jwt, config.ApiKey, config.InsecureConn, config.PlaintextConn) + if err != nil { + return nil, err + } + + switch config.Compression { + case "gzip": + callOpts = append(callOpts, grpc.UseCompressor(gzip.Name)) + case "zstd": + callOpts = append(callOpts, grpc.UseCompressor(zstd.Name)) + case "none": + default: + return nil, fmt.Errorf("invalid compression: %q, must be one of 'gzip', 'zstd' or 'none'", config.Compression) + } + + res := &FirehoseReader{ + firehoseClient: firehoseClient, + closeFunc: closeFunc, + callOpts: callOpts, + zlogger: zlogger, + stats: newFirehoseReaderStats(), + } + + return res, nil +} + +func (f *FirehoseReader) Launch(startBlock, stopBlock uint64, cursorFile string) error { + + cursor, err := os.ReadFile(cursorFile) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("unable to read cursor file: %w", err) + } + + if len(cursor) > 0 { + f.zlogger.Info("found state file, continuing previous run", zap.String("cursor", string(cursor)), zap.String("state_file", cursorFile)) + } + + stream, err := f.firehoseClient.Blocks(context.Background(), &pbfirehose.Request{ + StartBlockNum: int64(startBlock), + Cursor: string(cursor), + StopBlockNum: stopBlock, + FinalBlocksOnly: false, + }, f.callOpts...) + if err != nil { + return fmt.Errorf("failed to request block stream from Firehose: %w", err) + } + + f.firehoseStream = stream + f.cursorStateFile = cursorFile + f.stats.StartPeriodicLogToZap(context.Background(), f.zlogger, 10*time.Second) + + return nil +} + +func (f *FirehoseReader) NoopConsoleReader(_ chan string) (mindreader.ConsolerReader, error) { + return f, nil +} + +func (f *FirehoseReader) ReadBlock() (obj *pbbstream.Block, err error) { + + res, err := f.firehoseStream.Recv() + if err != nil { + return nil, err + } + + // We don't write the current cursor here, but the one from the previous block. In case an error happens downstream, + // we need to ensure that the current block is included after a restart. + err = f.writeCursor() + if err != nil { + return nil, err + } + f.cursor = res.Cursor + + BlockReadCount.Inc() + f.stats.lastBlock = pbbstream.BlockRef{ + Num: res.Metadata.Num, + Id: res.Metadata.Id, + } + + return &pbbstream.Block{ + Number: res.Metadata.Num, + Id: res.Metadata.Id, + ParentId: res.Metadata.ParentId, + Timestamp: res.Metadata.Time, + LibNum: res.Metadata.LibNum, + ParentNum: res.Metadata.ParentNum, + Payload: res.Block, + }, nil +} + +func (f *FirehoseReader) Done() <-chan interface{} { + //TODO implement me + panic("implement me") +} + +func (f *FirehoseReader) Close() error { + _ = f.writeCursor() + f.stats.StopPeriodicLogToZap() + return f.closeFunc() +} + +func (f *FirehoseReader) writeCursor() error { + if f.cursor == "" { + return nil + } + + err := os.WriteFile(f.cursorStateFile, []byte(f.cursor), 0644) + if err != nil { + return fmt.Errorf("failed to write cursor to state file: %w", err) + } + + return nil +} diff --git a/node-manager/app/firehose_reader/metrics.go b/node-manager/app/firehose_reader/metrics.go new file mode 100644 index 0000000..68677cf --- /dev/null +++ b/node-manager/app/firehose_reader/metrics.go @@ -0,0 +1,11 @@ +package firehose_reader + +import "github.com/streamingfast/dmetrics" + +var metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("reader_node_firehose")) + +func init() { + metrics.Register() +} + +var BlockReadCount = metrics.NewCounter("block_read_count", "The number of blocks read by the Firehose reader") diff --git a/node-manager/app/firehose_reader/reader_stats.go b/node-manager/app/firehose_reader/reader_stats.go new file mode 100644 index 0000000..2df596f --- /dev/null +++ b/node-manager/app/firehose_reader/reader_stats.go @@ -0,0 +1,55 @@ +package firehose_reader + +import ( + "context" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dmetrics" + "go.uber.org/zap" + "time" +) + +type firehoseReaderStats struct { + lastBlock pbbstream.BlockRef + blockRate *dmetrics.AvgRatePromCounter + + cancelPeriodicLogger context.CancelFunc +} + +func newFirehoseReaderStats() *firehoseReaderStats { + return &firehoseReaderStats{ + lastBlock: pbbstream.BlockRef{}, + blockRate: dmetrics.MustNewAvgRateFromPromCounter(BlockReadCount, 1*time.Second, 30*time.Second, "blocks"), + } +} + +func (s *firehoseReaderStats) StartPeriodicLogToZap(ctx context.Context, logger *zap.Logger, logEach time.Duration) { + ctx, s.cancelPeriodicLogger = context.WithCancel(ctx) + + go func() { + ticker := time.NewTicker(logEach) + for { + select { + case <-ticker.C: + logger.Info("reader node statistics", s.ZapFields()...) + case <-ctx.Done(): + return + } + } + }() +} + +func (s *firehoseReaderStats) StopPeriodicLogToZap() { + if s.cancelPeriodicLogger != nil { + s.cancelPeriodicLogger() + } +} + +func (s *firehoseReaderStats) ZapFields() []zap.Field { + fields := []zap.Field{ + zap.Stringer("block_rate", s.blockRate), + zap.Uint64("last_block_num", s.lastBlock.Num), + zap.String("last_block_id", s.lastBlock.Id), + } + + return fields +}