Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag to disable DAS chunked stores #2796

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/datool/datool.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type ClientStoreConfig struct {
SigningWallet string `koanf:"signing-wallet"`
SigningWalletPassword string `koanf:"signing-wallet-password"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
DisableChunkedStore bool `koanf:"disable-chunked-store"`
}

func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
Expand All @@ -104,6 +105,7 @@ func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
f.String("signing-wallet-password", genericconf.PASSWORD_NOT_SET, "password to unlock the wallet, if not specified the user is prompted for the password")
f.Duration("das-retention-period", 24*time.Hour, "The period which DASes are requested to retain the stored batches.")
f.Int("max-store-chunk-body-size", 512*1024, "The maximum HTTP POST body size for a chunked store request")
f.Bool("disable-chunked-store", false, "force data to always be sent to DAS all at once instead of splitting into chunks")

k, err := confighelpers.BeginCommonParse(f, args)
if err != nil {
Expand Down Expand Up @@ -152,7 +154,7 @@ func startClientStore(args []string) error {
}
}

client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize)
client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize, config.DisableChunkedStore)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions das/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ type AggregatorConfig struct {
AssumedHonest int `koanf:"assumed-honest"`
Backends BackendConfigList `koanf:"backends"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
DisableChunkedStore bool `koanf:"disable-chunked-store"`
}

var DefaultAggregatorConfig = AggregatorConfig{
AssumedHonest: 0,
Backends: nil,
MaxStoreChunkBodySize: 512 * 1024,
DisableChunkedStore: false,
}

var parsedBackendsConf BackendConfigList
Expand All @@ -56,6 +58,7 @@ func AggregatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".assumed-honest", DefaultAggregatorConfig.AssumedHonest, "Number of assumed honest backends (H). If there are N backends, K=N+1-H valid responses are required to consider an Store request to be successful.")
f.Var(&parsedBackendsConf, prefix+".backends", "JSON RPC backend configuration. This can be specified on the command line as a JSON array, eg: [{\"url\": \"...\", \"pubkey\": \"...\"},...], or as a JSON array in the config file.")
f.Int(prefix+".max-store-chunk-body-size", DefaultAggregatorConfig.MaxStoreChunkBodySize, "maximum HTTP POST body size to use for individual batch chunks, including JSON RPC overhead and an estimated overhead of 512B of headers")
f.Bool(prefix+".disable-chunked-store", DefaultAggregatorConfig.DisableChunkedStore, "force data to always be sent to DAS all at once instead of splitting into chunks")
}

type Aggregator struct {
Expand Down
39 changes: 25 additions & 14 deletions das/dasRpcClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ var (
)

type DASRPCClient struct { // implements DataAvailabilityService
clnt *rpc.Client
url string
signer signature.DataSignerFunc
chunkSize uint64
clnt *rpc.Client
url string
signer signature.DataSignerFunc
chunkSize uint64
disableChunkedStore bool
}

func nilSigner(_ []byte) ([]byte, error) {
Expand All @@ -47,7 +48,7 @@ func nilSigner(_ []byte) ([]byte, error) {

const sendChunkJSONBoilerplate = "{\"jsonrpc\":\"2.0\",\"id\":4294967295,\"method\":\"das_sendChunked\",\"params\":[\"\"]}"

func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int) (*DASRPCClient, error) {
func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int, disableChunkedStore bool) (*DASRPCClient, error) {
clnt, err := rpc.Dial(target)
if err != nil {
return nil, err
Expand All @@ -56,18 +57,23 @@ func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChu
signer = nilSigner
}

client := &DASRPCClient{
clnt: clnt,
url: target,
signer: signer,
disableChunkedStore: disableChunkedStore,
}

// Byte arrays are encoded in base64
chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2
if chunkSize <= 0 {
return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize)
if !disableChunkedStore {
chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2
if chunkSize <= 0 {
return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize)
}
client.chunkSize = uint64(chunkSize)
}

return &DASRPCClient{
clnt: clnt,
url: target,
signer: signer,
chunkSize: uint64(chunkSize),
}, nil
return client, nil
}

func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) {
Expand All @@ -83,6 +89,11 @@ func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64
rpcClientStoreDurationHistogram.Update(time.Since(start).Nanoseconds())
}()

if c.disableChunkedStore {
log.Info("Legacy store is being force-used by the DAS client", "url", c.url)
return c.legacyStore(ctx, message, timeout)
}

// #nosec G115
timestamp := uint64(start.Unix())
nChunks := uint64(len(message)) / c.chunkSize
Expand Down
2 changes: 1 addition & 1 deletion das/rpc_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func ParseServices(config AggregatorConfig, signer signature.DataSignerFunc) ([]
}
metricName := metricsutil.CanonicalizeMetricName(url.Hostname())

service, err := NewDASRPCClient(b.URL, signer, config.MaxStoreChunkBodySize)
service, err := NewDASRPCClient(b.URL, signer, config.MaxStoreChunkBodySize, config.DisableChunkedStore)
if err != nil {
return nil, err
}
Expand Down
Loading