Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag to disable DAS chunked stores #2796

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/datool/datool.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type ClientStoreConfig struct {
SigningWallet string `koanf:"signing-wallet"`
SigningWalletPassword string `koanf:"signing-wallet-password"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
UseLegacyStore bool `koanf:"use-legacy-store"`
}

func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
Expand All @@ -104,6 +105,7 @@ func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
f.String("signing-wallet-password", genericconf.PASSWORD_NOT_SET, "password to unlock the wallet, if not specified the user is prompted for the password")
f.Duration("das-retention-period", 24*time.Hour, "The period which DASes are requested to retain the stored batches.")
f.Int("max-store-chunk-body-size", 512*1024, "The maximum HTTP POST body size for a chunked store request")
f.Bool("use-legacy-store", false, "enabling this forces the das rpc clients to use das_store. Disabled by default")
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved

k, err := confighelpers.BeginCommonParse(f, args)
if err != nil {
Expand Down Expand Up @@ -152,7 +154,7 @@ func startClientStore(args []string) error {
}
}

client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize)
client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize, config.UseLegacyStore)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions das/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ type AggregatorConfig struct {
AssumedHonest int `koanf:"assumed-honest"`
Backends BackendConfigList `koanf:"backends"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
UseLegacyStore bool `koanf:"use-legacy-store"`
}

var DefaultAggregatorConfig = AggregatorConfig{
AssumedHonest: 0,
Backends: nil,
MaxStoreChunkBodySize: 512 * 1024,
UseLegacyStore: false,
}

var parsedBackendsConf BackendConfigList
Expand All @@ -56,6 +58,7 @@ func AggregatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".assumed-honest", DefaultAggregatorConfig.AssumedHonest, "Number of assumed honest backends (H). If there are N backends, K=N+1-H valid responses are required to consider an Store request to be successful.")
f.Var(&parsedBackendsConf, prefix+".backends", "JSON RPC backend configuration. This can be specified on the command line as a JSON array, eg: [{\"url\": \"...\", \"pubkey\": \"...\"},...], or as a JSON array in the config file.")
f.Int(prefix+".max-store-chunk-body-size", DefaultAggregatorConfig.MaxStoreChunkBodySize, "maximum HTTP POST body size to use for individual batch chunks, including JSON RPC overhead and an estimated overhead of 512B of headers")
f.Bool(prefix+".use-legacy-store", DefaultAggregatorConfig.UseLegacyStore, "enabling this forces the das rpc clients to use das_store. Disabled by default")
}

type Aggregator struct {
Expand Down
39 changes: 25 additions & 14 deletions das/dasRpcClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ var (
)

type DASRPCClient struct { // implements DataAvailabilityService
clnt *rpc.Client
url string
signer signature.DataSignerFunc
chunkSize uint64
clnt *rpc.Client
url string
signer signature.DataSignerFunc
chunkSize uint64
useLegacyStore bool
}

func nilSigner(_ []byte) ([]byte, error) {
Expand All @@ -47,7 +48,7 @@ func nilSigner(_ []byte) ([]byte, error) {

const sendChunkJSONBoilerplate = "{\"jsonrpc\":\"2.0\",\"id\":4294967295,\"method\":\"das_sendChunked\",\"params\":[\"\"]}"

func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int) (*DASRPCClient, error) {
func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int, useLegacyStore bool) (*DASRPCClient, error) {
clnt, err := rpc.Dial(target)
if err != nil {
return nil, err
Expand All @@ -56,18 +57,23 @@ func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChu
signer = nilSigner
}

client := &DASRPCClient{
clnt: clnt,
url: target,
signer: signer,
useLegacyStore: useLegacyStore,
}

// Byte arrays are encoded in base64
chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2
if chunkSize <= 0 {
return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize)
if !useLegacyStore {
chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2
if chunkSize <= 0 {
return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize)
}
client.chunkSize = uint64(chunkSize)
}

return &DASRPCClient{
clnt: clnt,
url: target,
signer: signer,
chunkSize: uint64(chunkSize),
}, nil
return client, nil
}

func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) {
Expand All @@ -83,6 +89,11 @@ func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64
rpcClientStoreDurationHistogram.Update(time.Since(start).Nanoseconds())
}()

if c.useLegacyStore {
log.Info("Legacy store is being force-used by the DAS client", "url", c.url)
return c.legacyStore(ctx, message, timeout)
}

// #nosec G115
timestamp := uint64(start.Unix())
nChunks := uint64(len(message)) / c.chunkSize
Expand Down
2 changes: 1 addition & 1 deletion das/rpc_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func ParseServices(config AggregatorConfig, signer signature.DataSignerFunc) ([]
}
metricName := metricsutil.CanonicalizeMetricName(url.Hostname())

service, err := NewDASRPCClient(b.URL, signer, config.MaxStoreChunkBodySize)
service, err := NewDASRPCClient(b.URL, signer, config.MaxStoreChunkBodySize, config.UseLegacyStore)
if err != nil {
return nil, err
}
Expand Down
Loading