Skip to content

Commit

Permalink
Storage implementations enforce minimum checkpoint intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Dec 6, 2024
1 parent afd61ea commit e838fa9
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 5 deletions.
10 changes: 7 additions & 3 deletions storage/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ import (
)

const (
entryBundleSize = 256
logContType = "application/octet-stream"
ckptContType = "text/plain; charset=utf-8"
entryBundleSize = 256
logContType = "application/octet-stream"
ckptContType = "text/plain; charset=utf-8"
minCheckpointInterval = 1 * time.Second

DefaultPushbackMaxOutstanding = 4096
DefaultIntegrationSizeLimit = 5 * 4096
Expand Down Expand Up @@ -129,6 +130,9 @@ func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions))
if opt.PushbackMaxOutstanding == 0 {
opt.PushbackMaxOutstanding = DefaultPushbackMaxOutstanding
}
if opt.CheckpointInterval < minCheckpointInterval {
return nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opt.CheckpointInterval, minCheckpointInterval)
}

sdkConfig, err := config.LoadDefaultConfig(ctx)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ const (
entryBundleSize = 256
logContType = "application/octet-stream"
ckptContType = "text/plain; charset=utf-8"
// minCheckpointInterval is the shortest permitted interval between updating published checkpoints.
// GCS has a rate limit 1 update per second for individual objects, but we've observed that attempting
// to update at exactly that rate still results in the occasional refusal, so bake in a little wiggle
// room.
minCheckpointInterval = 1200 * time.Millisecond

DefaultPushbackMaxOutstanding = 4096
DefaultIntegrationSizeLimit = 5 * 4096
Expand Down Expand Up @@ -121,6 +126,9 @@ func New(ctx context.Context, cfg Config, opts ...func(*options.StorageOptions))
if opt.PushbackMaxOutstanding == 0 {
opt.PushbackMaxOutstanding = DefaultPushbackMaxOutstanding
}
if opt.CheckpointInterval < minCheckpointInterval {
return nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opt.CheckpointInterval, minCheckpointInterval)
}

c, err := gcs.NewClient(ctx, gcs.WithJSONReads())
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions storage/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
checkpointID = 0
treeStateID = 0
entryBundleSize = 256

minCheckpointInterval = time.Second
)

// Storage is a MySQL-based storage implementation for Tessera.
Expand All @@ -66,6 +68,10 @@ type Storage struct {
// Note that `tessera.WithCheckpointSigner()` is mandatory in the `opts` argument.
func New(ctx context.Context, db *sql.DB, opts ...func(*options.StorageOptions)) (*Storage, error) {
opt := storage.ResolveStorageOptions(opts...)
if opt.CheckpointInterval < minCheckpointInterval {
return nil, fmt.Errorf("requested CheckpointInterval too low - %v < %v", opt.CheckpointInterval, minCheckpointInterval)
}

s := &Storage{
db: db,
newCheckpoint: opt.NewCP,
Expand Down
4 changes: 2 additions & 2 deletions storage/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,10 @@ func newTestMySQLStorage(t *testing.T, ctx context.Context) *mysql.Storage {

s, err := mysql.New(ctx, testDB,
tessera.WithCheckpointSigner(noteSigner),
tessera.WithCheckpointInterval(200*time.Millisecond),
tessera.WithCheckpointInterval(time.Second),
tessera.WithBatching(128, 100*time.Millisecond))
if err != nil {
t.Errorf("Failed to create mysql.Storage: %v", err)
t.Fatalf("Failed to create mysql.Storage: %v", err)
}

return s
Expand Down
5 changes: 5 additions & 0 deletions storage/posix/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
dirPerm = 0o755
filePerm = 0o644
stateDir = ".state"

minCheckpointInterval = time.Second
)

// Storage implements storage functions for a POSIX filesystem.
Expand All @@ -64,6 +66,9 @@ type NewTreeFunc func(size uint64, root []byte) error
// - create must only be set when first creating the log, and will create the directory structure and an empty checkpoint
func New(ctx context.Context, path string, create bool, opts ...func(*options.StorageOptions)) (*Storage, error) {
opt := storage.ResolveStorageOptions(opts...)
if opt.CheckpointInterval < minCheckpointInterval {
return nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opt.CheckpointInterval, minCheckpointInterval)
}

r := &Storage{
path: path,
Expand Down

0 comments on commit e838fa9

Please sign in to comment.