Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bm powered sink #2

Merged
merged 8 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 158 additions & 27 deletions cmd/substreams-sink-noop/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package main

import (
"connectrpc.com/connect"
"context"
"crypto/sha256"
"fmt"
pbbmsrv "github.com/streamingfast/blockmeta-service/server/pb/sf/blockmeta/v2"
"github.com/streamingfast/blockmeta-service/server/pb/sf/blockmeta/v2/pbbmsrvconnect"
"gopkg.in/yaml.v3"
"hash"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -54,6 +62,9 @@ func main() {
flags.String("state-store", "./state.yaml", "Output path where to store latest received cursor, if empty, cursor will not be persisted")
flags.String("api-listen-addr", ":8080", "Rest API to manage deployment")
flags.Uint64("print-output-data-hash-interval", 0, "If non-zero, will hash the output for quickly comparing for differences")
flags.Uint64("follow-head-substreams-segment", 1000, "")
flags.String("follow-head-blockmeta-url", "", "Block meta URL to follow head block, when provided, the sink enable the follow head mode (if block range not provided)")
flags.Uint64("follow-head-reversible-segment", 100, "Segment size for reversible block")
}),
PersistentFlags(func(flags *pflag.FlagSet) {
flags.String("metrics-listen-addr", ":9102", "If non-empty, the process will listen on this address to server Prometheus metrics")
Expand All @@ -65,13 +76,10 @@ func main() {
)
}

func run(cmd *cobra.Command, args []string) error {
app := shutter.New()
const ApiKeyHeader = "x-api-key"

ctx, cancelApp := context.WithCancel(cmd.Context())
app.OnTerminating(func(_ error) {
cancelApp()
})
func run(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

endpoint := args[0]
manifestPath := args[1]
Expand All @@ -81,6 +89,108 @@ func run(cmd *cobra.Command, args []string) error {
blockRangeArg = args[3]
}

var err error
blockmetaUrl := sflags.MustGetString(cmd, "follow-head-blockmeta-url")
substreamsSegmentSize := sflags.MustGetUint64(cmd, "follow-head-substreams-segment")
reversibleSegmentSize := sflags.MustGetUint64(cmd, "follow-head-reversible-segment")
var blockmetaClient pbbmsrvconnect.BlockClient
if blockmetaUrl != "" {
blockmetaClient = pbbmsrvconnect.NewBlockClient(http.DefaultClient, blockmetaUrl)
}

signalHandler, isSignaled, _ := cli.SetupSignalHandler(0*time.Second, zlog)
sessionCounter := uint64(0)
stateStorePath := sflags.MustGetString(cmd, "state-store")
var sleepingDuration time.Duration
for {
if blockmetaClient != nil {
for {
select {
case <-ctx.Done():
return nil
case <-signalHandler:
return nil
case <-time.After(sleepingDuration):
// continue
}
sleepingDuration = 5 * time.Second

blockRangeArg, err = computeBlockRangeFromHead(ctx, blockmetaClient, reversibleSegmentSize, substreamsSegmentSize, blockRangeArg)
if err != nil {
return fmt.Errorf("computing block range from head: %w", err)
}

startBlockString := strings.Split(blockRangeArg, ":")[0]
startBlock, err := strconv.Atoi(startBlockString)
if err != nil {
return fmt.Errorf("converting start block to integer: %w", err)
}

computedEndBlock := strings.Split(blockRangeArg, ":")[1]
endBlock, err := strconv.Atoi(computedEndBlock)
if err != nil {
return fmt.Errorf("converting start block to integer: %w", err)
}

cursorExisting, extractedBlockNumber, err := readBlockNumFromCursor(stateStorePath)
if err != nil {
return fmt.Errorf("reading start block from state path: %w", err)
}

if cursorExisting {
startBlock = int(extractedBlockNumber)
}

if startBlock < endBlock-1 {
break
}

zlog.Info("retrying block range computation", zap.Uint64("session_counter", sessionCounter), zap.Int("start_block_computed", startBlock), zap.Int("end_block_computed", endBlock))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "retrying" message is unclear, it looks like something failed, but it didn't. Also, start_block and end_block will always be equal, no need to print them both...
Replace with something like:
"waiting for head block to reach next threshold", zap.Uint64("target", startBlock+substreamsSegmentSize + reversibleSegmentSize), zap.Uint64("current_head", ... this would be a lot more useful to the guy trying to figure out what's going on!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, printed every 5 seconds may be too many. Use a counter and only zlog.Info one when counter % 6 == 0 (so around 30 seconds)
Another approach is to use zlog.Check() to specify debug level at runtime: always zap.DebugLevel until counter%6==0, then use zap.InfoLevel.

}
}

zlog.Info("starting sink session", zap.Uint64("session_counter", sessionCounter))
err = runSink(cmd, blockRangeArg, endpoint, manifestPath, moduleName, zlog, tracer, signalHandler, stateStorePath)
if err != nil {
return err
}

if blockmetaClient == nil {
return nil
}

if isSignaled.Load() {
return nil
}

sessionCounter += 1
zlog.Info("sleeping until next session", zap.Uint64("session_counter", sessionCounter))
}
}

func readBlockNumFromCursor(stateStorePath string) (cursorExisting bool, startBlock uint64, err error) {
content, err := os.ReadFile(stateStorePath)
if err != nil {
if os.IsNotExist(err) {
return false, 0, nil
}
return false, 0, fmt.Errorf("reading cursor state file: %w", err)
}

state := syncState{}
if err = yaml.Unmarshal(content, &state); err != nil {
return false, 0, fmt.Errorf("unmarshal state file %q: %w", stateStorePath, err)
}

return true, state.Block.Number, nil
}
func runSink(cmd *cobra.Command, blockRangeArg string, endpoint string, manifestPath string, moduleName string, zlog *zap.Logger, tracer logging.Tracer, signalHandler <-chan os.Signal, stateStorePath string) error {
app := shutter.New()
ctx, cancelApp := context.WithCancel(cmd.Context())
app.OnTerminating(func(_ error) {
cancelApp()
})

baseSinker, err := sink.NewFromViper(cmd, sink.IgnoreOutputModuleType, endpoint, manifestPath, moduleName, blockRangeArg, zlog, tracer,
sink.WithBlockDataBuffer(0),
)
Expand All @@ -94,10 +204,11 @@ func run(cmd *cobra.Command, args []string) error {

apiListenAddr := sflags.MustGetString(cmd, "api-listen-addr")
cleanState := sflags.MustGetBool(cmd, "clean")
stateStorePath := sflags.MustGetString(cmd, "state-store")
blockRange := sinker.BlockRange()

zlog.Info("consuming substreams",
managementApi := NewManager(apiListenAddr)

zlog.Info("start new substreams consumption session",
zap.String("substreams_endpoint", endpoint),
zap.String("manifest_path", manifestPath),
zap.String("module_name", moduleName),
Expand All @@ -112,7 +223,9 @@ func run(cmd *cobra.Command, args []string) error {

headFetcher := NewHeadTracker(headTrackerClient, headTrackerCallOpts, headTrackerHeaders)
app.OnTerminating(func(_ error) { headFetcher.Close() })
headFetcher.OnTerminated(func(err error) { app.Shutdown(err) })
headFetcher.OnTerminated(func(err error) {
app.Shutdown(err)
})

sinker.headFetcher = headFetcher

Expand All @@ -123,16 +236,21 @@ func run(cmd *cobra.Command, args []string) error {

stats := NewStats(stopBlock, headFetcher)
app.OnTerminating(func(_ error) { stats.Close() })
stats.OnTerminated(func(err error) { app.Shutdown(err) })
stats.OnTerminated(func(err error) {
app.Shutdown(err)
})

stateStore := NewStateStore(stateStorePath, func() (*sink.Cursor, bool, bool) {
return sinker.activeCursor, sinker.backprocessingCompleted, sinker.headBlockReached
return sinker.activeCursor, sinker.backprocessingCompleted, sinker.headBlockReachedMetric
})
app.OnTerminating(func(_ error) { stateStore.Close() })
stateStore.OnTerminated(func(err error) { app.Shutdown(err) })
stateStore.OnTerminated(func(err error) {
app.Shutdown(err)
})

managementApi := NewManager(apiListenAddr)
managementApi.OnTerminated(func(err error) { app.Shutdown(err) })
managementApi.OnTerminated(func(err error) {
app.Shutdown(err)
})
app.OnTerminating(func(_ error) {
if managementApi.shouldResetState {
if err := stateStore.Delete(); err != nil {
Expand All @@ -145,7 +263,6 @@ func run(cmd *cobra.Command, args []string) error {
if !cleanState {
cursor, _, err := stateStore.Read()
cli.NoError(err, "Unable to read state store")

sinker.activeCursor = sink.MustNewCursor(cursor)
}

Expand All @@ -166,31 +283,45 @@ func run(cmd *cobra.Command, args []string) error {

go sinker.Run(ctx)

zlog.Info("ready, waiting for signal to quit")

signalHandler, isSignaled, _ := cli.SetupSignalHandler(0*time.Second, zlog)
select {
case <-signalHandler:
go app.Shutdown(nil)
break
case <-app.Terminating():
zlog.Info("run terminating", zap.Bool("from_signal", isSignaled.Load()), zap.Bool("with_error", app.Err() != nil))
break
zlog.Info("run terminating", zap.Bool("with_error", app.Err() != nil))
}

zlog.Info("waiting for run termination")
select {
case <-app.Terminated():
return app.Err()
case <-time.After(30 * time.Second):
zlog.Warn("application did not terminate within 30s")
return app.Err()
}
}

func computeBlockRangeFromHead(ctx context.Context, blockmetaClient pbbmsrvconnect.BlockClient, reversibleSegmentSize uint64, substreamsSegmentSize uint64, blockRangeArg string) (string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should get the blockHead in the run() function and keep the 'computeBlockRangeFromHead' function's purpose to actually compute it (not fetch it.)
Easier to test, clearer separation of concerns, and will allow you to use the headBlockNum in the logs I asked in my last comment.

request := connect.NewRequest(&pbbmsrv.Empty{})

if err := app.Err(); err != nil {
return err
apiKey := os.Getenv("SUBSTREAMS_API_KEY")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you move this to run() function, you can check this environment variable just once, and maybe return an error that specifies that this environment variable is required especially for Blockmeta. It wouldn't be clear to a user using the substreams_api_token (JWT) why he is required to use the api_key too !

if apiKey == "" {
return "", fmt.Errorf("missing SUBSTREAMS_API_KEY environment variable")
}
request.Header().Set(ApiKeyHeader, apiKey)

zlog.Info("run terminated gracefully")
return nil
headBlock, err := blockmetaClient.Head(ctx, request)
if err != nil {
return "", fmt.Errorf("requesting head block to blockmeta service: %w", err)
}

computedEndBlock := ((headBlock.Msg.Num - reversibleSegmentSize) / substreamsSegmentSize) * substreamsSegmentSize
blockRangeArray := strings.Split(blockRangeArg, ":")
if len(blockRangeArray) != 2 {
return "", fmt.Errorf("invalid block range format")
}

//The computed block range replace the end block by a computed one
return (blockRangeArray[0] + ":" + strconv.FormatUint(computedEndBlock, 10)), nil
}

type Sinker struct {
Expand All @@ -199,7 +330,7 @@ type Sinker struct {
headFetcher *HeadTracker

activeCursor *sink.Cursor
headBlockReached bool
headBlockReachedMetric bool
outputDataHash *dataHasher
backprocessingCompleted bool
}
Expand All @@ -220,7 +351,7 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp

chainHeadBlock, found := s.headFetcher.Current()
if found && block.Num() >= chainHeadBlock.Num() {
s.headBlockReached = true
s.headBlockReachedMetric = true
HeadBlockReached.SetUint64(1)
}

Expand Down
1 change: 0 additions & 1 deletion cmd/substreams-sink-noop/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func (s *StateStore) Read() (cursor string, block bstream.BlockRef, err error) {
s.state.LastSyncedAt = s.state.LastSyncedAt.Local()
s.state.BackprocessingCompletedAt = s.state.BackprocessingCompletedAt.Local()
s.state.HeadBlockReachedAt = s.state.HeadBlockReachedAt.Local()

return s.state.Cursor, bstream.NewBlockRef(s.state.Block.ID, s.state.Block.Number), nil
}

Expand Down
32 changes: 19 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
module github.com/streamingfast/substreams-sink-noop

go 1.21
go 1.22

toolchain go1.22.0

require (
connectrpc.com/connect v1.15.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/gorilla/mux v1.8.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/streamingfast/blockmeta-service/server v0.0.0-20240305210209-2f971e09cdef
github.com/streamingfast/bstream v0.0.2-0.20231121211820-e45c1b42f472
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1
Expand All @@ -18,20 +22,20 @@ require (
github.com/streamingfast/shutter v1.5.0
github.com/streamingfast/substreams v1.3.7
github.com/streamingfast/substreams-sink v0.3.4
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.44.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
go.uber.org/zap v1.26.0
golang.org/x/oauth2 v0.15.0
golang.org/x/oauth2 v0.16.0
google.golang.org/grpc v1.61.0
google.golang.org/protobuf v1.32.0
gopkg.in/yaml.v3 v3.0.1
)

require (
cloud.google.com/go v0.111.0 // indirect
cloud.google.com/go v0.112.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.5 // indirect
cloud.google.com/go/storage v1.30.1 // indirect
cloud.google.com/go/iam v1.1.6 // indirect
cloud.google.com/go/storage v1.38.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-storage-blob-go v0.14.0 // indirect
github.com/aws/aws-sdk-go v1.44.325 // indirect
Expand All @@ -49,13 +53,14 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/envoyproxy/go-control-plane v0.11.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down Expand Up @@ -103,14 +108,15 @@ require (
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c // indirect
github.com/streamingfast/dstore v0.1.1-0.20240215171730-493ad5a0f537 // indirect
github.com/streamingfast/dstore v0.1.1-0.20240311181234-470a7a84936f // indirect
github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c // indirect
github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
go.opentelemetry.io/otel v1.23.1 // indirect
go.opentelemetry.io/otel/metric v1.23.1 // indirect
go.opentelemetry.io/otel/trace v1.23.1 // indirect
Expand All @@ -120,16 +126,16 @@ require (
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/api v0.152.0 // indirect
google.golang.org/api v0.162.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
Expand Down
Loading
Loading