Skip to content

Commit

Permalink
relayer: if only one source, the WithFailOnUnlinkableBlocks option wi…
Browse files Browse the repository at this point in the history
…ll have a tolerance of 1 unlinkable block. ie: it will fail right away if it receives an unlinkable block
  • Loading branch information
colindickson committed Apr 25, 2024
1 parent 4298312 commit 5d9af0b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s
* Added `--merger-delete-threads` to customize the number of threads the merger will use to delete files. It's recommended to increase this when using Ceph as S3 storage provider to 25 or higher (due to performance issues with deletes the merger might otherwise not be able to delete one-block files fast enough).
* Added `--substreams-tier2-max-concurrent-requests` to limit the number of concurrent requests to the tier2 substreams service.

* If relayer is started with a single source, it will have reduced tolerance for missing blocks. This is to prevent the relayer from falling behind when the source is not producing blocks.

## v1.2.5

* Fixed `tools check merged-blocks` default range when `-r <range>` is not provided to now be `[0, +∞]` (was previously `[HEAD, +∞]`).
Expand Down
6 changes: 5 additions & 1 deletion cmd/apps/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ func RegisterRelayerApp(rootLog *zap.Logger) {
FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
sfDataDir := runtime.AbsDataDir

sourcesAddr := viper.GetStringSlice("relayer-source")
singleReaderMode := len(sourcesAddr) <= 1

return relayer.New(&relayer.Config{
SourcesAddr: viper.GetStringSlice("relayer-source"),
SourcesAddr: sourcesAddr,
OneBlocksURL: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url")),
GRPCListenAddr: viper.GetString("relayer-grpc-listen-addr"),
MaxSourceLatency: viper.GetDuration("relayer-max-source-latency"),
SingleReaderMode: singleReaderMode,
}), nil
},
})
Expand Down
2 changes: 2 additions & 0 deletions relayer/app/relayer/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
SourceRequestBurst int
MaxSourceLatency time.Duration
OneBlocksURL string
SingleReaderMode bool
}

func (c *Config) ZapFields() []zap.Field {
Expand Down Expand Up @@ -92,6 +93,7 @@ func (a *App) Run() error {
liveSourceFactory,
oneBlocksSourceFactory,
a.config.GRPCListenAddr,
a.config.SingleReaderMode,
)

a.OnTerminating(a.relayer.Shutdown)
Expand Down
17 changes: 14 additions & 3 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewRelayer(
liveSourceFactory bstream.SourceFactory,
oneBlocksSourceFactory bstream.SourceFromNumFactoryWithSkipFunc,
grpcListenAddr string,
singleReaderMode bool,
) *Relayer {
r := &Relayer{
Shutter: shutter.New(),
Expand All @@ -64,14 +65,24 @@ func NewRelayer(
gs := dgrpcfactory.ServerFromOptions()
pbhealth.RegisterHealthServer(gs.ServiceRegistrar(), r)

options := []forkable.Option{
forkable.EnsureAllBlocksTriggerLongestChain(), // send every forked block too
forkable.WithFilters(bstream.StepNew),
}

if singleReaderMode {
options = append(options, forkable.WithFailOnUnlinkableBlocks(1, 10*time.Second))
} else {
options = append(options, forkable.WithFailOnUnlinkableBlocks(20, time.Minute))
}

forkableHub := hub.NewForkableHub(
r.liveSourceFactory,
r.oneBlocksSourceFactory,
10,
forkable.EnsureAllBlocksTriggerLongestChain(), // send every forked block too
forkable.WithFilters(bstream.StepNew),
forkable.WithFailOnUnlinkableBlocks(20, time.Minute),
options...,
)

r.hub = forkableHub
gs.OnTerminated(r.Shutdown)
r.blockStreamServer = r.hub.NewBlockstreamServer(gs)
Expand Down

0 comments on commit 5d9af0b

Please sign in to comment.