-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/bm powered sink #2
Changes from 2 commits
0ac5353
889d178
f4953d3
46aa291
3a6aa3b
dd76af1
1914572
d18b708
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,18 @@ | ||
package main | ||
|
||
import ( | ||
"connectrpc.com/connect" | ||
"context" | ||
"crypto/sha256" | ||
"fmt" | ||
pbbmsrv "github.com/streamingfast/blockmeta-service/server/pb/sf/blockmeta/v2" | ||
"github.com/streamingfast/blockmeta-service/server/pb/sf/blockmeta/v2/pbbmsrvconnect" | ||
"gopkg.in/yaml.v3" | ||
"hash" | ||
"net/http" | ||
"os" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/spf13/cobra" | ||
|
@@ -54,6 +62,9 @@ func main() { | |
flags.String("state-store", "./state.yaml", "Output path where to store latest received cursor, if empty, cursor will not be persisted") | ||
flags.String("api-listen-addr", ":8080", "Rest API to manage deployment") | ||
flags.Uint64("print-output-data-hash-interval", 0, "If non-zero, will hash the output for quickly comparing for differences") | ||
flags.Uint64("follow-head-substreams-segment", 1000, "") | ||
flags.String("follow-head-blockmeta-url", "", "Block meta URL to follow head block, when provided, the sink enable the follow head mode (if block range not provided)") | ||
flags.Uint64("follow-head-reversible-segment", 100, "Segment size for reversible block") | ||
}), | ||
PersistentFlags(func(flags *pflag.FlagSet) { | ||
flags.String("metrics-listen-addr", ":9102", "If non-empty, the process will listen on this address to server Prometheus metrics") | ||
|
@@ -65,13 +76,10 @@ func main() { | |
) | ||
} | ||
|
||
func run(cmd *cobra.Command, args []string) error { | ||
app := shutter.New() | ||
const ApiKeyHeader = "x-api-key" | ||
|
||
ctx, cancelApp := context.WithCancel(cmd.Context()) | ||
app.OnTerminating(func(_ error) { | ||
cancelApp() | ||
}) | ||
func run(cmd *cobra.Command, args []string) error { | ||
ctx := cmd.Context() | ||
|
||
endpoint := args[0] | ||
manifestPath := args[1] | ||
|
@@ -81,6 +89,108 @@ func run(cmd *cobra.Command, args []string) error { | |
blockRangeArg = args[3] | ||
} | ||
|
||
var err error | ||
blockmetaUrl := sflags.MustGetString(cmd, "follow-head-blockmeta-url") | ||
substreamsSegmentSize := sflags.MustGetUint64(cmd, "follow-head-substreams-segment") | ||
reversibleSegmentSize := sflags.MustGetUint64(cmd, "follow-head-reversible-segment") | ||
var blockmetaClient pbbmsrvconnect.BlockClient | ||
if blockmetaUrl != "" { | ||
blockmetaClient = pbbmsrvconnect.NewBlockClient(http.DefaultClient, blockmetaUrl) | ||
} | ||
|
||
signalHandler, isSignaled, _ := cli.SetupSignalHandler(0*time.Second, zlog) | ||
sessionCounter := uint64(0) | ||
stateStorePath := sflags.MustGetString(cmd, "state-store") | ||
var sleepingDuration time.Duration | ||
for { | ||
if blockmetaClient != nil { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return nil | ||
case <-signalHandler: | ||
return nil | ||
case <-time.After(sleepingDuration): | ||
// continue | ||
} | ||
sleepingDuration = 5 * time.Second | ||
|
||
blockRangeArg, err = computeBlockRangeFromHead(ctx, blockmetaClient, reversibleSegmentSize, substreamsSegmentSize, blockRangeArg) | ||
if err != nil { | ||
return fmt.Errorf("computing block range from head: %w", err) | ||
} | ||
|
||
startBlockString := strings.Split(blockRangeArg, ":")[0] | ||
startBlock, err := strconv.Atoi(startBlockString) | ||
if err != nil { | ||
return fmt.Errorf("converting start block to integer: %w", err) | ||
} | ||
|
||
computedEndBlock := strings.Split(blockRangeArg, ":")[1] | ||
endBlock, err := strconv.Atoi(computedEndBlock) | ||
if err != nil { | ||
return fmt.Errorf("converting start block to integer: %w", err) | ||
} | ||
|
||
cursorExisting, extractedBlockNumber, err := readBlockNumFromCursor(stateStorePath) | ||
if err != nil { | ||
return fmt.Errorf("reading start block from state path: %w", err) | ||
} | ||
|
||
if cursorExisting { | ||
startBlock = int(extractedBlockNumber) | ||
} | ||
|
||
if startBlock < endBlock-1 { | ||
break | ||
} | ||
|
||
zlog.Info("retrying block range computation", zap.Uint64("session_counter", sessionCounter), zap.Int("start_block_computed", startBlock), zap.Int("end_block_computed", endBlock)) | ||
} | ||
} | ||
|
||
zlog.Info("starting sink session", zap.Uint64("session_counter", sessionCounter)) | ||
err = runSink(cmd, blockRangeArg, endpoint, manifestPath, moduleName, zlog, tracer, signalHandler, stateStorePath) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if blockmetaClient == nil { | ||
return nil | ||
} | ||
|
||
if isSignaled.Load() { | ||
return nil | ||
} | ||
|
||
sessionCounter += 1 | ||
zlog.Info("sleeping until next session", zap.Uint64("session_counter", sessionCounter)) | ||
} | ||
} | ||
|
||
func readBlockNumFromCursor(stateStorePath string) (cursorExisting bool, startBlock uint64, err error) { | ||
content, err := os.ReadFile(stateStorePath) | ||
if err != nil { | ||
if os.IsNotExist(err) { | ||
return false, 0, nil | ||
} | ||
return false, 0, fmt.Errorf("reading cursor state file: %w", err) | ||
} | ||
|
||
state := syncState{} | ||
if err = yaml.Unmarshal(content, &state); err != nil { | ||
return false, 0, fmt.Errorf("unmarshal state file %q: %w", stateStorePath, err) | ||
} | ||
|
||
return true, state.Block.Number, nil | ||
} | ||
func runSink(cmd *cobra.Command, blockRangeArg string, endpoint string, manifestPath string, moduleName string, zlog *zap.Logger, tracer logging.Tracer, signalHandler <-chan os.Signal, stateStorePath string) error { | ||
app := shutter.New() | ||
ctx, cancelApp := context.WithCancel(cmd.Context()) | ||
app.OnTerminating(func(_ error) { | ||
cancelApp() | ||
}) | ||
|
||
baseSinker, err := sink.NewFromViper(cmd, sink.IgnoreOutputModuleType, endpoint, manifestPath, moduleName, blockRangeArg, zlog, tracer, | ||
sink.WithBlockDataBuffer(0), | ||
) | ||
|
@@ -94,10 +204,11 @@ func run(cmd *cobra.Command, args []string) error { | |
|
||
apiListenAddr := sflags.MustGetString(cmd, "api-listen-addr") | ||
cleanState := sflags.MustGetBool(cmd, "clean") | ||
stateStorePath := sflags.MustGetString(cmd, "state-store") | ||
blockRange := sinker.BlockRange() | ||
|
||
zlog.Info("consuming substreams", | ||
managementApi := NewManager(apiListenAddr) | ||
|
||
zlog.Info("start new substreams consumption session", | ||
zap.String("substreams_endpoint", endpoint), | ||
zap.String("manifest_path", manifestPath), | ||
zap.String("module_name", moduleName), | ||
|
@@ -112,7 +223,9 @@ func run(cmd *cobra.Command, args []string) error { | |
|
||
headFetcher := NewHeadTracker(headTrackerClient, headTrackerCallOpts, headTrackerHeaders) | ||
app.OnTerminating(func(_ error) { headFetcher.Close() }) | ||
headFetcher.OnTerminated(func(err error) { app.Shutdown(err) }) | ||
headFetcher.OnTerminated(func(err error) { | ||
app.Shutdown(err) | ||
}) | ||
|
||
sinker.headFetcher = headFetcher | ||
|
||
|
@@ -123,16 +236,21 @@ func run(cmd *cobra.Command, args []string) error { | |
|
||
stats := NewStats(stopBlock, headFetcher) | ||
app.OnTerminating(func(_ error) { stats.Close() }) | ||
stats.OnTerminated(func(err error) { app.Shutdown(err) }) | ||
stats.OnTerminated(func(err error) { | ||
app.Shutdown(err) | ||
}) | ||
|
||
stateStore := NewStateStore(stateStorePath, func() (*sink.Cursor, bool, bool) { | ||
return sinker.activeCursor, sinker.backprocessingCompleted, sinker.headBlockReached | ||
return sinker.activeCursor, sinker.backprocessingCompleted, sinker.headBlockReachedMetric | ||
}) | ||
app.OnTerminating(func(_ error) { stateStore.Close() }) | ||
stateStore.OnTerminated(func(err error) { app.Shutdown(err) }) | ||
stateStore.OnTerminated(func(err error) { | ||
app.Shutdown(err) | ||
}) | ||
|
||
managementApi := NewManager(apiListenAddr) | ||
managementApi.OnTerminated(func(err error) { app.Shutdown(err) }) | ||
managementApi.OnTerminated(func(err error) { | ||
app.Shutdown(err) | ||
}) | ||
app.OnTerminating(func(_ error) { | ||
if managementApi.shouldResetState { | ||
if err := stateStore.Delete(); err != nil { | ||
|
@@ -145,7 +263,6 @@ func run(cmd *cobra.Command, args []string) error { | |
if !cleanState { | ||
cursor, _, err := stateStore.Read() | ||
cli.NoError(err, "Unable to read state store") | ||
|
||
sinker.activeCursor = sink.MustNewCursor(cursor) | ||
} | ||
|
||
|
@@ -166,31 +283,45 @@ func run(cmd *cobra.Command, args []string) error { | |
|
||
go sinker.Run(ctx) | ||
|
||
zlog.Info("ready, waiting for signal to quit") | ||
|
||
signalHandler, isSignaled, _ := cli.SetupSignalHandler(0*time.Second, zlog) | ||
select { | ||
case <-signalHandler: | ||
go app.Shutdown(nil) | ||
break | ||
case <-app.Terminating(): | ||
zlog.Info("run terminating", zap.Bool("from_signal", isSignaled.Load()), zap.Bool("with_error", app.Err() != nil)) | ||
break | ||
zlog.Info("run terminating", zap.Bool("with_error", app.Err() != nil)) | ||
} | ||
|
||
zlog.Info("waiting for run termination") | ||
select { | ||
case <-app.Terminated(): | ||
return app.Err() | ||
case <-time.After(30 * time.Second): | ||
zlog.Warn("application did not terminate within 30s") | ||
return app.Err() | ||
} | ||
} | ||
|
||
func computeBlockRangeFromHead(ctx context.Context, blockmetaClient pbbmsrvconnect.BlockClient, reversibleSegmentSize uint64, substreamsSegmentSize uint64, blockRangeArg string) (string, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you should get the blockHead in the run() function and keep the 'computeBlockRangeFromHead' function's purpose to actually compute it (not fetch it.) |
||
request := connect.NewRequest(&pbbmsrv.Empty{}) | ||
|
||
if err := app.Err(); err != nil { | ||
return err | ||
apiKey := os.Getenv("SUBSTREAMS_API_KEY") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once you move this to run() function, you can check this environment variable just once, and maybe return an error that specifies that this environment variable is required especially for Blockmeta. It wouldn't be clear to a user using the substreams_api_token (JWT) why he is required to use the api_key too ! |
||
if apiKey == "" { | ||
return "", fmt.Errorf("missing SUBSTREAMS_API_KEY environment variable") | ||
} | ||
request.Header().Set(ApiKeyHeader, apiKey) | ||
|
||
zlog.Info("run terminated gracefully") | ||
return nil | ||
headBlock, err := blockmetaClient.Head(ctx, request) | ||
if err != nil { | ||
return "", fmt.Errorf("requesting head block to blockmeta service: %w", err) | ||
} | ||
|
||
computedEndBlock := ((headBlock.Msg.Num - reversibleSegmentSize) / substreamsSegmentSize) * substreamsSegmentSize | ||
blockRangeArray := strings.Split(blockRangeArg, ":") | ||
if len(blockRangeArray) != 2 { | ||
return "", fmt.Errorf("invalid block range format") | ||
} | ||
|
||
//The computed block range replace the end block by a computed one | ||
return (blockRangeArray[0] + ":" + strconv.FormatUint(computedEndBlock, 10)), nil | ||
} | ||
|
||
type Sinker struct { | ||
|
@@ -199,7 +330,7 @@ type Sinker struct { | |
headFetcher *HeadTracker | ||
|
||
activeCursor *sink.Cursor | ||
headBlockReached bool | ||
headBlockReachedMetric bool | ||
outputDataHash *dataHasher | ||
backprocessingCompleted bool | ||
} | ||
|
@@ -220,7 +351,7 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp | |
|
||
chainHeadBlock, found := s.headFetcher.Current() | ||
if found && block.Num() >= chainHeadBlock.Num() { | ||
s.headBlockReached = true | ||
s.headBlockReachedMetric = true | ||
HeadBlockReached.SetUint64(1) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "retrying" message is unclear, it looks like something failed, but it didn't. Also, start_block and end_block will always be equal, no need to print them both...
Replace with something like:
"waiting for head block to reach next threshold", zap.Uint64("target", startBlock+substreamsSegmentSize + reversibleSegmentSize), zap.Uint64("current_head", ...
this would be a lot more useful to the guy trying to figure out what's going on!There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, printed every 5 seconds may be too many. Use a counter and only zlog.Info one when
counter % 6 == 0
(so around 30 seconds)Another approach is to use zlog.Check() to specify debug level at runtime: always zap.DebugLevel until counter%6==0, then use zap.InfoLevel.