Skip to content

Commit

Permalink
fix tier1 storage read-bytes metering, fix 'gui' header
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Mar 20, 2024
1 parent e377e61 commit 6e51a3c
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 10 deletions.
4 changes: 3 additions & 1 deletion cmd/substreams/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"context"
"fmt"
"io"

"github.com/spf13/cobra"
"github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
Expand All @@ -14,7 +16,6 @@ import (
"github.com/streamingfast/substreams/tui"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"
"io"
)

func init() {
Expand Down Expand Up @@ -229,6 +230,7 @@ func runRun(cmd *cobra.Command, args []string) error {
if err != nil {
if err == io.EOF {
ui.Cancel()
fmt.Println("Total Read Bytes (server-side consumption):", ui.TotalReadBytes)
fmt.Println("all done")
if testRunner != nil {
testRunner.LogResults()
Expand Down
25 changes: 18 additions & 7 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

### Client

* Implement a `use` feature, enabling a module to use an existing module by overriding its inputs or initial block. (Inputs should have the same output type than override module's inputs).
Check a usage of this new feature on the [substreams-db-graph-converter](https://github.com/streamingfast/substreams-db-graph-converter/) repository.

* Pick up docs from the README.md or README in the same directory as the manifest, when top-level package.doc is empty
* Fix panic when using '--header (-H)' flag on `gui` command

* Tier2 service now supports a maximum concurrent requests limit. Default set to 0 (unlimited).
* When packing substreams, pick up docs from the README.md or README in the same directory as the manifest, when top-level package.doc is empty

* Improved file listing performance for Google Storage backends by 25%
* Added "Total read bytes" summary at the end of 'substreams run' command

### Server performance

Some redundant reprocessing has been removed, along with a better usage of caches to reduce reading the blocks multiple times when it can be avoided.

* Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs.

Expand All @@ -30,13 +36,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

* Scheduler modification: a stage now waits for the previous stage to have completed the same segment before running, to take advantage of the cached intermediate layers.

* [Operator] Readiness metric for Substreams tier1 app is now named `substreams_tier1` (was mistakenly called `firehose` before).
* Improved file listing performance for Google Storage backends by 25%

### Operator concerns
* Tier2 service now supports a maximum concurrent requests limit. Default set to 0 (unlimited).

* Readiness metric for Substreams tier1 app is now named `substreams_tier1` (was mistakenly called `firehose` before).

* [Operator] Added back deadiness metric for Substreams tiere app (named `substreams_tier2`).
* Added back deadiness metric for Substreams tiere app (named `substreams_tier2`).

* [Operator] Added metric `substreams_tier1_active_worker_requests` which gives the number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes.
* Added metric `substreams_tier1_active_worker_requests` which gives the number of active Substreams worker requests a tier1 app is currently doing against tier2 nodes.

* [Operator] Added metric `substreams_tier1_worker_request_counter` which gives the total Substreams worker requests a tier1 app made against tier2 nodes.
* Added metric `substreams_tier1_worker_request_counter` which gives the total Substreams worker requests a tier1 app made against tier2 nodes.

## v1.3.7

Expand Down
9 changes: 9 additions & 0 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,15 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
return fmt.Errorf("internal error setting store: %w", err)
}

if clonableStore, ok := cacheStore.(dstore.Clonable); ok {
cloned, err := clonableStore.Clone(ctx)
if err != nil {
return fmt.Errorf("cloning store: %w", err)
}
cloned.SetMeter(dmetering.GetBytesMeter(ctx))
cacheStore = cloned
}

execOutputConfigs, err := execout.NewConfigs(cacheStore, outputGraph.UsedModules(), outputGraph.ModuleHashes(), s.runtimeConfig.StateBundleSize, logger)
if err != nil {
return fmt.Errorf("new config map: %w", err)
Expand Down
9 changes: 7 additions & 2 deletions tui/tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ type TUI struct {
outputMode OutputMode
prettyPrintOutput bool

prog *tea.Program
seenFirstData bool
prog *tea.Program
seenFirstData bool
TotalReadBytes uint64

msgDescs map[string]*desc.MessageDescriptor
decodeMsgTypes map[string]func(in []byte) string
Expand Down Expand Up @@ -171,6 +172,10 @@ func (ui *TUI) IncomingMessage(ctx context.Context, resp *pbsubstreamsrpc.Respon
return ui.jsonBlockScopedData(m.BlockScopedData.Output, m.BlockScopedData.DebugMapOutputs, m.BlockScopedData.DebugStoreOutputs, m.BlockScopedData.Clock)
}
case *pbsubstreamsrpc.Response_Progress:
if m.Progress.ProcessedBytes != nil {
ui.TotalReadBytes = m.Progress.ProcessedBytes.TotalBytesRead
}

if !ui.seenFirstData {
if ui.outputMode == OutputModeTUI {
ui.ensureTerminalLocked()
Expand Down
3 changes: 3 additions & 0 deletions tui2/pages/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ func (c *Config) NewInstance() (*Instance, error) {
if err != nil {
return nil, fmt.Errorf("substreams client setup: %w", err)
}
if headers == nil {
headers = make(map[string]string)
}

req := &pbsubstreamsrpc.Request{
StartBlockNum: c.StartBlock,
Expand Down

0 comments on commit 6e51a3c

Please sign in to comment.