Skip to content

Commit

Permalink
fix detection of connect-based gzip encoding, add EnforceCompression …
Browse files Browse the repository at this point in the history
…config
  • Loading branch information
sduchesneau committed Dec 16, 2024
1 parent 02a95c4 commit fd30ba4
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
2 changes: 2 additions & 0 deletions app/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Tier1Config struct {
StateStoreDefaultTag string
BlockType string
StateBundleSize uint64
EnforceCompression bool // refuse incoming requests that do not accept gzip compression (ConnectRPC or GRPC)

MaxSubrequests uint64
SubrequestsEndpoint string
Expand Down Expand Up @@ -199,6 +200,7 @@ func (a *Tier1App) Run() error {
a.config.BlockType,
subrequestsClientConfig,
tier2RequestParameters,
a.config.EnforceCompression,
opts...,
)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v1.11.3

### Server-side

* Fixed: detection of gzip compression on 'connect' protocol (js/ts clients)
* Added: tier1.Config `EnforceCompression` to refuse incoming connections that do not support GZIP compression (default: false)

## v1.11.2

### Server-side
Expand Down
42 changes: 32 additions & 10 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Tier1Service struct {
resolveCursor pipeline.CursorResolver
getHeadBlock func() (uint64, error)

enforceCompression bool
tier2RequestParameters reqctx.Tier2RequestParameters
}

Expand Down Expand Up @@ -129,6 +130,7 @@ func NewTier1(

substreamsClientConfig *client.SubstreamsClientConfig,
tier2RequestParameters reqctx.Tier2RequestParameters,
enforceCompression bool,
opts ...Option,
) (*Tier1Service, error) {

Expand Down Expand Up @@ -174,6 +176,7 @@ func NewTier1(
logger: logger,
tier2RequestParameters: tier2RequestParameters,
blockExecutionTimeout: 3 * time.Minute,
enforceCompression: enforceCompression,
}

s.streamFactoryFunc = sf.New
Expand Down Expand Up @@ -209,6 +212,14 @@ func (s *Tier1Service) Blocks(
ctx, span := reqctx.WithSpan(ctx, "substreams/tier1/request")
defer span.EndWithErr(&err)

var compressed bool
if matchGZIPHeader(req) {
compressed = true
}
if s.enforceCompression && !compressed {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Your client does not accept gzip-compressed streams. Check how to enable it on your GRPC or ConnectRPC client"))
}

request := req.Msg
if request.Modules == nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request"))
Expand Down Expand Up @@ -243,16 +254,6 @@ func (s *Tier1Service) Blocks(
for i := 0; i < len(moduleNames); i++ {
moduleNames[i] = request.Modules.Modules[i].Name
}
var compressed bool
for _, vv := range req.Header().Values("grpc-Accept-Encoding") {
for _, v := range strings.Split(vv, ",") {
if v == "gzip" {
compressed = true
break
}
}
}

fields := []zap.Field{
zap.Int64("start_block", request.StartBlockNum),
zap.Uint64("stop_block", request.StopBlockNum),
Expand Down Expand Up @@ -713,3 +714,24 @@ func toConnectError(ctx context.Context, err error) error {
// data?
return connect.NewError(connect.CodeInternal, err)
}

// must be lowercase
var compressionHeader = map[string]bool{"grpc-accept-encoding": true, "connect-accept-encoding": true}

const compressionValue = "gzip"

func matchGZIPHeader(req *connect.Request[pbsubstreamsrpc.Request]) bool {

for k, v := range req.Header() {
if compressionHeader[strings.ToLower(k)] {
for _, vv := range v {
for _, vvv := range strings.Split(vv, ",") {
if strings.ToLower(vvv) == compressionValue {
return true
}
}
}
}
}
return false
}

0 comments on commit fd30ba4

Please sign in to comment.