Skip to content

Commit

Permalink
new relayer change
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnaudBger committed May 31, 2024
1 parent 57807d5 commit e220900
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 48 deletions.
2 changes: 0 additions & 2 deletions cmd/apps/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ func RegisterRelayerApp(rootLog *zap.Logger) {
sfDataDir := runtime.AbsDataDir

sourcesAddr := viper.GetStringSlice("relayer-source")
singleReaderMode := len(sourcesAddr) <= 1

return relayer.New(&relayer.Config{
SourcesAddr: sourcesAddr,
OneBlocksURL: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url")),
GRPCListenAddr: viper.GetString("relayer-grpc-listen-addr"),
MaxSourceLatency: viper.GetDuration("relayer-max-source-latency"),
SingleReaderMode: singleReaderMode,
}), nil
},
})
Expand Down
10 changes: 1 addition & 9 deletions firehose/app/firehose/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,7 @@ func (a *App) Run() error {
)
})

oneBlocksSourceFactory := bstream.SourceFromNumFactoryWithSkipFunc(func(num uint64, h bstream.Handler, skipFunc func(string) bool) bstream.Source {
src, err := bstream.NewOneBlocksSource(num, oneBlocksStore, h, bstream.OneBlocksSourceWithSkipperFunc(skipFunc))
if err != nil {
return nil
}
return src
})

forkableHub = hub.NewForkableHub(liveSourceFactory, oneBlocksSourceFactory, 500)
forkableHub = hub.NewForkableHub(liveSourceFactory, 500, oneBlocksStore)
forkableHub.OnTerminated(a.Shutdown)

go forkableHub.Run()
Expand Down
12 changes: 8 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ require (
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.18.0
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/api v0.172.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
Expand All @@ -228,3 +228,7 @@ replace (
github.com/bytecodealliance/wasmtime-go/v4 => github.com/streamingfast/wasmtime-go/v4 v4.0.0-freemem3
github.com/jhump/protoreflect => github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d
)

replace github.com/streamingfast/bstream => /Users/arnaudberger/StreamingFast/bstream

replace github.com/streamingfast/substreams => /Users/arnaudberger/StreamingFast/substreams
18 changes: 8 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,6 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/streamingfast/bstream v0.0.2-0.20240430194002-d05d5d5d6c93 h1:4KOtbEDDrso0I2S7ZwFGflNr93pUxSvg3U807TZ+8sQ=
github.com/streamingfast/bstream v0.0.2-0.20240430194002-d05d5d5d6c93/go.mod h1:08GVb+DXyz6jVNIsbf+2zlaC81UeEGu5o1h49KrSR3Y=
github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d h1:9tsEt2tLCp94CW6MyJZY+Rw6+t0WH2kioBR6ucO6P/E=
github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d/go.mod h1:og+6lDBPLZ24lbF/YISmVsSduZUZwXSmJGD3pZ/sW2Y=
github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU=
Expand Down Expand Up @@ -845,8 +843,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -912,13 +910,13 @@ golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -930,8 +928,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
11 changes: 1 addition & 10 deletions relayer/app/relayer/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type Config struct {
SourceRequestBurst int
MaxSourceLatency time.Duration
OneBlocksURL string
SingleReaderMode bool
}

func (c *Config) ZapFields() []zap.Field {
Expand Down Expand Up @@ -80,20 +79,12 @@ func (a *App) Run() error {
a.config.SourceRequestBurst,
)
})
oneBlocksSourceFactory := bstream.SourceFromNumFactoryWithSkipFunc(func(num uint64, h bstream.Handler, skipFunc func(idSuffix string) bool) bstream.Source {
src, err := bstream.NewOneBlocksSource(num, oneBlocksStore, h, bstream.OneBlocksSourceWithSkipperFunc(skipFunc))
if err != nil {
return nil
}
return src
})

zlog.Info("starting relayer", a.config.ZapFields()...)
a.relayer = relayer.NewRelayer(
liveSourceFactory,
oneBlocksSourceFactory,
a.config.GRPCListenAddr,
a.config.SingleReaderMode,
oneBlocksStore,
)

a.OnTerminating(a.relayer.Shutdown)
Expand Down
20 changes: 7 additions & 13 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"strings"
"time"

"github.com/streamingfast/dstore"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"

"github.com/streamingfast/bstream"
Expand Down Expand Up @@ -51,15 +53,13 @@ type Relayer struct {

func NewRelayer(
liveSourceFactory bstream.SourceFactory,
oneBlocksSourceFactory bstream.SourceFromNumFactoryWithSkipFunc,
grpcListenAddr string,
singleReaderMode bool,
oneBlocksStore dstore.Store,
) *Relayer {
r := &Relayer{
Shutter: shutter.New(),
grpcListenAddr: grpcListenAddr,
liveSourceFactory: liveSourceFactory,
oneBlocksSourceFactory: oneBlocksSourceFactory,
Shutter: shutter.New(),
grpcListenAddr: grpcListenAddr,
liveSourceFactory: liveSourceFactory,
}

gs := dgrpcfactory.ServerFromOptions()
Expand All @@ -70,16 +70,10 @@ func NewRelayer(
forkable.WithFilters(bstream.StepNew),
}

if singleReaderMode {
options = append(options, forkable.WithFailOnUnlinkableBlocks(1, 10*time.Second))
} else {
options = append(options, forkable.WithFailOnUnlinkableBlocks(20, time.Minute))
}

forkableHub := hub.NewForkableHub(
r.liveSourceFactory,
r.oneBlocksSourceFactory,
10,
oneBlocksStore,
options...,
)

Expand Down

0 comments on commit e220900

Please sign in to comment.