From aab4e2066c5653e00b6606986e676580ab57b830 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Tue, 4 Jun 2024 17:19:48 -0700 Subject: [PATCH 01/21] flat to trie layout migration --- das/local_file_storage_service.go | 364 ++++++++++++++++++++++++++++-- 1 file changed, 351 insertions(+), 13 deletions(-) diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 8be03bcb30..61545b9d8d 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -6,10 +6,13 @@ package das import ( "bytes" "context" - "encoding/base32" "errors" "fmt" + "io" "os" + "path" + "path/filepath" + "regexp" "time" "github.com/ethereum/go-ethereum/common" @@ -37,30 +40,31 @@ func LocalFileStorageConfigAddOptions(prefix string, f *flag.FlagSet) { type LocalFileStorageService struct { dataDir string + + legacyLayout flatLayout + layout trieLayout } func NewLocalFileStorageService(dataDir string) (StorageService, error) { if unix.Access(dataDir, unix.W_OK|unix.R_OK) != nil { return nil, fmt.Errorf("couldn't start LocalFileStorageService, directory '%s' must be readable and writeable", dataDir) } - return &LocalFileStorageService{dataDir: dataDir}, nil + return &LocalFileStorageService{ + dataDir: dataDir, + legacyLayout: flatLayout{root: dataDir}, + layout: trieLayout{root: dataDir}, + }, nil } func (s *LocalFileStorageService) GetByHash(ctx context.Context, key common.Hash) ([]byte, error) { log.Trace("das.LocalFileStorageService.GetByHash", "key", pretty.PrettyHash(key), "this", s) - pathname := s.dataDir + "/" + EncodeStorageServiceKey(key) - data, err := os.ReadFile(pathname) + batchPath := s.legacyLayout.batchPath(key) + data, err := os.ReadFile(batchPath) if err != nil { - // Just for backward compatability. - pathname = s.dataDir + "/" + base32.StdEncoding.EncodeToString(key.Bytes()) - data, err = os.ReadFile(pathname) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return nil, ErrNotFound - } - return nil, err + if errors.Is(err, os.ErrNotExist) { + return nil, ErrNotFound } - return data, nil + return nil, err } return data, nil } @@ -123,3 +127,337 @@ func (s *LocalFileStorageService) HealthCheck(ctx context.Context) error { } return nil } + +/* +New layout + Access data by hash -> by-data-hash/1st octet/2nd octet/hash + Store data with hash and expiry + Iterate Unordered + Iterate by Time + Prune before + + +Old layout + Access data by hash -> hash + Iterate unordered +*/ + +func listDir(dir string) ([]string, error) { + d, err := os.Open(dir) + if err != nil { + return nil, err + } + defer d.Close() + + // Read all the directory entries + files, err := d.Readdir(-1) + if err != nil { + return nil, err + } + + var fileNames []string + for _, file := range files { + fileNames = append(fileNames, file.Name()) + } + return fileNames, nil +} + +var hex64Regex = regexp.MustCompile(fmt.Sprintf("^[a-fA-F0-9]{%d}$", common.HashLength*2)) + +func isStorageServiceKey(key string) bool { + return hex64Regex.MatchString(key) +} + +// Copies a file by its contents to a new file, making any directories needed +// in the new file's path. +func copyFile(new, orig string) error { + err := os.MkdirAll(path.Dir(new), 0o700) + if err != nil { + return fmt.Errorf("failed to create directory %s: %w", path.Dir(new), err) + } + + origFile, err := os.Open(orig) + if err != nil { + return fmt.Errorf("failed to open source file: %w", err) + } + defer origFile.Close() + + newFile, err := os.Create(new) + if err != nil { + return fmt.Errorf("failed to create destination file: %w", err) + } + defer newFile.Close() + + _, err = io.Copy(newFile, origFile) + if err != nil { + return fmt.Errorf("failed to copy contents: %w", err) + } + + return nil +} + +// Creates an empty file, making any directories needed in the new file's path. +func createEmptyFile(new string) error { + err := os.MkdirAll(path.Dir(new), 0o700) + if err != nil { + return fmt.Errorf("failed to create directory %s: %w", path.Dir(new), err) + } + + file, err := os.OpenFile(new, os.O_CREATE|os.O_WRONLY, 0o600) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", new, err) + } + file.Close() + return nil +} + +func migrate(fl flatLayout, tl trieLayout) error { + flIt, err := fl.iterateBatches() + if err != nil { + return err + } + + if !tl.migrating { + return errors.New("LocalFileStorage already migrated to trieLayout") + } + + migrationStart := time.Now() + + err = func() error { + for batch, found, err := flIt.next(); found; batch, found, err = flIt.next() { + if err != nil { + return err + } + + if tl.expiryEnabled && batch.expiry.Before(migrationStart) { + continue // don't migrate expired batches + } + + origPath := fl.batchPath(batch.key) + newPath := tl.batchPath(batch.key) + if err = copyFile(newPath, origPath); err != nil { + return err + } + + if tl.expiryEnabled { + expiryPath := tl.expiryPath(batch.key, uint64(batch.expiry.Unix())) + createEmptyFile(expiryPath) + } + } + + return tl.commitMigration() + }() + if err != nil { + return fmt.Errorf("error migrating local file store layout, retaining old layout: %w", err) + } + + func() { + for batch, found, err := flIt.next(); found; batch, found, err = flIt.next() { + if err != nil { + log.Warn("local file store migration completed, but error cleaning up old layout, files from that layout are now orphaned", "error", err) + return + } + toRemove := fl.batchPath(batch.key) + err = os.Remove(toRemove) + if err != nil { + log.Warn("local file store migration completed, but error cleaning up file from old layout, file is now orphaned", "file", toRemove, "error", err) + } + } + }() + + return nil +} + +type batchIterator interface { + next() (batchIdentifier, bool, error) +} + +type batchIdentifier struct { + key common.Hash + expiry time.Time +} + +const ( + defaultRetention = time.Hour * 24 * 7 * 3 +) + +type flatLayout struct { + root string + + retention time.Duration +} + +type flatLayoutIterator struct { + files []string + + layout *flatLayout +} + +func (l *flatLayout) batchPath(key common.Hash) string { + return filepath.Join(l.root, EncodeStorageServiceKey(key)) +} + +func (l *flatLayout) iterateBatches() (*flatLayoutIterator, error) { + files, err := listDir(l.root) + if err != nil { + return nil, err + } + return &flatLayoutIterator{ + files: files, + layout: l, + }, nil +} + +func (i *flatLayoutIterator) next() (batchIdentifier, bool, error) { + for len(i.files) > 0 { + var f string + f, i.files = i.files[0], i.files[1:] + path := filepath.Join(i.layout.root, f) + if !isStorageServiceKey(f) { + log.Warn("Incorrectly named batch file found, ignoring", "file", path) + continue + } + key, err := DecodeStorageServiceKey(f) + + stat, err := os.Stat(f) + if err != nil { + return batchIdentifier{}, false, err + } + + return batchIdentifier{ + key: key, + expiry: stat.ModTime().Add(i.layout.retention), + }, true, nil + } + return batchIdentifier{}, false, nil +} + +const ( + byDataHash = "by-data-hash" + byExpiryTimestamp = "by-expiry-timestamp" + migratingSuffix = "-migrating" + expiryDivisor = 10_000 +) + +type trieLayout struct { + root string + expiryEnabled bool + + migrating bool // Is the trieLayout currently being migrated to +} + +type trieLayoutIterator struct { + firstLevel []string + secondLevel []string + files []string + + layout *trieLayout +} + +func (l *trieLayout) batchPath(key common.Hash) string { + encodedKey := EncodeStorageServiceKey(key) + firstDir := encodedKey[:2] + secondDir := encodedKey[2:4] + + topDir := byDataHash + if l.migrating { + topDir = topDir + migratingSuffix + } + + return filepath.Join(l.root, topDir, firstDir, secondDir, encodedKey) +} + +func (l *trieLayout) expiryPath(key common.Hash, expiry uint64) pathParts { + encodedKey := EncodeStorageServiceKey(key) + firstDir := fmt.Sprintf("%d", expiry/expiryDivisor) + secondDir := fmt.Sprintf("%d", expiry%expiryDivisor) + + topDir := byExpiryTimestamp + if l.migrating { + topDir = topDir + migratingSuffix + } + + return filepath.Join(l.root, topDir, firstDir, secondDir, encodedKey) +} + +func (l *trieLayout) iterateBatches() (*trieLayoutIterator, error) { + var firstLevel, secondLevel, files []string + var err error + + firstLevel, err = listDir(filepath.Join(l.root, byDataHash)) + if err != nil { + return nil, err + } + + if len(firstLevel) > 0 { + secondLevel, err = listDir(filepath.Join(l.root, byDataHash, firstLevel[0])) + if err != nil { + return nil, err + } + } + + if len(secondLevel) > 0 { + files, err = listDir(filepath.Join(l.root, byDataHash, firstLevel[0], secondLevel[0])) + if err != nil { + return nil, err + } + } + + return &trieLayoutIterator{ + firstLevel: firstLevel, + secondLevel: secondLevel, + files: files, + layout: l, + }, nil +} + +func (i *trieLayout) commitMigration() error { + if !i.migrating { + return errors.New("already finished migration") + } + + oldDir := filepath.Join(i.root, byDataHash+migratingSuffix) + newDir := filepath.Join(i.root, byDataHash) + + if err := os.Rename(oldDir, newDir); err != nil { + return fmt.Errorf("couldn't rename \"%s\" to \"%s\": %w", oldDir, newDir, err) + } + + syscall.Sync() + return nil +} + +func (i *trieLayoutIterator) next() (string, bool, error) { + for len(i.firstLevel) > 0 { + for len(i.secondLevel) > 0 { + if len(i.files) > 0 { + var f string + f, i.files = i.files[0], i.files[1:] + return filepath.Join(i.layout.root, byDataHash, i.firstLevel[0], i.secondLevel[0], f), true, nil + } + + if len(i.secondLevel) <= 1 { + return "", false, nil + } + i.secondLevel = i.secondLevel[1:] + + files, err := listDir(filepath.Join(i.layout.root, byDataHash, i.firstLevel[0], i.secondLevel[0])) + if err != nil { + return "", false, err + } + i.files = files + } + + if len(i.firstLevel) <= 1 { + return "", false, nil + } + i.firstLevel = i.firstLevel[1:] + secondLevel, err := listDir(filepath.Join(i.layout.root, byDataHash, i.firstLevel[0])) + if err != nil { + return "", false, err + } + i.secondLevel = secondLevel + } + + return "", false, nil +} From e3a9f9ebd68d5082e7c53fbaa7327b045162cc94 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Fri, 7 Jun 2024 17:00:09 -0700 Subject: [PATCH 02/21] Expiry implementation and tests --- das/local_file_storage_service.go | 378 +++++++++++++++++++++++------- 1 file changed, 297 insertions(+), 81 deletions(-) diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 61545b9d8d..7a39a9ec37 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -13,6 +13,9 @@ import ( "path" "path/filepath" "regexp" + "strconv" + "strings" + "syscall" "time" "github.com/ethereum/go-ethereum/common" @@ -43,9 +46,12 @@ type LocalFileStorageService struct { legacyLayout flatLayout layout trieLayout + + // for testing only + enableLegacyLayout bool } -func NewLocalFileStorageService(dataDir string) (StorageService, error) { +func NewLocalFileStorageService(dataDir string) (*LocalFileStorageService, error) { if unix.Access(dataDir, unix.W_OK|unix.R_OK) != nil { return nil, fmt.Errorf("couldn't start LocalFileStorageService, directory '%s' must be readable and writeable", dataDir) } @@ -69,16 +75,29 @@ func (s *LocalFileStorageService) GetByHash(ctx context.Context, key common.Hash return data, nil } -func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, timeout uint64) error { - logPut("das.LocalFileStorageService.Store", data, timeout, s) - fileName := EncodeStorageServiceKey(dastree.Hash(data)) - finalPath := s.dataDir + "/" + fileName +func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, expiry uint64) error { + // TODO input validation on expiry + logPut("das.LocalFileStorageService.Store", data, expiry, s) + key := dastree.Hash(data) + var batchPath string + if !s.enableLegacyLayout { + batchPath = s.layout.batchPath(key) + + if s.layout.expiryEnabled { + if err := createEmptyFile(s.layout.expiryPath(key, expiry)); err != nil { + return fmt.Errorf("Couldn't create by-expiry-path index entry: %w", err) + } + } + } else { + batchPath = s.legacyLayout.batchPath(key) + } // Use a temp file and rename to achieve atomic writes. - f, err := os.CreateTemp(s.dataDir, fileName) + f, err := os.CreateTemp(path.Dir(batchPath), path.Base(batchPath)) if err != nil { return err } + defer f.Close() err = f.Chmod(0o600) if err != nil { return err @@ -87,13 +106,19 @@ func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, timeout if err != nil { return err } - err = f.Close() - if err != nil { - return err - } - return os.Rename(f.Name(), finalPath) + if s.enableLegacyLayout { + tv := syscall.Timeval{ + Sec: int64(expiry - uint64(s.legacyLayout.retention.Seconds())), + Usec: 0, + } + times := []syscall.Timeval{tv, tv} + if err = syscall.Utimes(f.Name(), times); err != nil { + return err + } + } + return os.Rename(f.Name(), batchPath) } func (s *LocalFileStorageService) Sync(ctx context.Context) error { @@ -150,16 +175,12 @@ func listDir(dir string) ([]string, error) { defer d.Close() // Read all the directory entries - files, err := d.Readdir(-1) + files, err := d.Readdirnames(-1) if err != nil { return nil, err } - var fileNames []string - for _, file := range files { - fileNames = append(fileNames, file.Name()) - } - return fileNames, nil + return files, nil } var hex64Regex = regexp.MustCompile(fmt.Sprintf("^[a-fA-F0-9]{%d}$", common.HashLength*2)) @@ -211,25 +232,26 @@ func createEmptyFile(new string) error { return nil } -func migrate(fl flatLayout, tl trieLayout) error { +func migrate(fl *flatLayout, tl *trieLayout) error { flIt, err := fl.iterateBatches() if err != nil { return err } - if !tl.migrating { - return errors.New("LocalFileStorage already migrated to trieLayout") + if err = tl.startMigration(); err != nil { + return err } migrationStart := time.Now() - + var migrated, skipped, removed int err = func() error { - for batch, found, err := flIt.next(); found; batch, found, err = flIt.next() { + for batch, err := flIt.next(); err != io.EOF; batch, err = flIt.next() { if err != nil { return err } if tl.expiryEnabled && batch.expiry.Before(migrationStart) { + skipped++ continue // don't migrate expired batches } @@ -241,8 +263,11 @@ func migrate(fl flatLayout, tl trieLayout) error { if tl.expiryEnabled { expiryPath := tl.expiryPath(batch.key, uint64(batch.expiry.Unix())) - createEmptyFile(expiryPath) + if err = createEmptyFile(expiryPath); err != nil { + return err + } } + migrated++ } return tl.commitMigration() @@ -251,20 +276,80 @@ func migrate(fl flatLayout, tl trieLayout) error { return fmt.Errorf("error migrating local file store layout, retaining old layout: %w", err) } - func() { - for batch, found, err := flIt.next(); found; batch, found, err = flIt.next() { - if err != nil { - log.Warn("local file store migration completed, but error cleaning up old layout, files from that layout are now orphaned", "error", err) - return - } - toRemove := fl.batchPath(batch.key) - err = os.Remove(toRemove) - if err != nil { - log.Warn("local file store migration completed, but error cleaning up file from old layout, file is now orphaned", "file", toRemove, "error", err) + flIt, err = fl.iterateBatches() + if err != nil { + return err + } + for batch, err := flIt.next(); err != io.EOF; batch, err = flIt.next() { + if err != nil { + log.Warn("local file store migration completed, but error cleaning up old layout, files from that layout are now orphaned", "error", err) + break + } + toRemove := fl.batchPath(batch.key) + err = os.Remove(toRemove) + if err != nil { + log.Warn("local file store migration completed, but error cleaning up file from old layout, file is now orphaned", "file", toRemove, "error", err) + } + removed++ + } + + log.Info("Local file store legacy layout migration complete", "migratedFiles", migrated, "skippedExpiredFiles", skipped, "removedFiles", removed) + + return nil +} + +func prune(tl trieLayout, pruneTil time.Time) error { + it, err := tl.iterateBatchesByTimestamp(pruneTil) + if err != nil { + return err + } + pruned := 0 + for file, err := it.next(); err != io.EOF; file, err = it.next() { + if err != nil { + return err + } + pathByTimestamp := path.Base(file) + key, err := DecodeStorageServiceKey(path.Base(pathByTimestamp)) + if err != nil { + return err + } + pathByHash := tl.batchPath(key) + err = recursivelyDeleteUntil(pathByHash, byDataHash) + if err != nil { + if os.IsNotExist(err) { + log.Warn("Couldn't find batch to expire, it may have been previously deleted but its by-expiry-timestamp index entry still exists, trying to clean up the index next", "path", pathByHash, "indexPath", pathByTimestamp, "err", err) + + } else { + log.Error("Couldn't prune expired batch, continuing trying to prune others", "path", pathByHash, "err", err) + continue } + } - }() + err = recursivelyDeleteUntil(pathByTimestamp, byExpiryTimestamp) + if err != nil { + log.Error("Couldn't prune expired batch expiry index entry, continuing trying to prune others", "path", pathByHash, "err", err) + } + pruned++ + } + log.Info("local file store pruned expired batches", "count", pruned) + return nil +} + +func recursivelyDeleteUntil(filePath, until string) error { + err := os.Remove(filePath) + if err != nil { + return err + } + for filePath = path.Dir(filePath); path.Base(filePath) != until; filePath = path.Dir(filePath) { + err = os.Remove(filePath) + if err != nil { + if !strings.Contains(err.Error(), "directory not empty") { + log.Warn("error cleaning up empty directory when pruning expired batches", "path", filePath, "err", err) + } + break + } + } return nil } @@ -297,6 +382,10 @@ func (l *flatLayout) batchPath(key common.Hash) string { return filepath.Join(l.root, EncodeStorageServiceKey(key)) } +type layerFilter func(*[][]string, int) bool + +func noopFilter(*[][]string, int) bool { return true } + func (l *flatLayout) iterateBatches() (*flatLayoutIterator, error) { files, err := listDir(l.root) if err != nil { @@ -308,28 +397,30 @@ func (l *flatLayout) iterateBatches() (*flatLayoutIterator, error) { }, nil } -func (i *flatLayoutIterator) next() (batchIdentifier, bool, error) { +func (i *flatLayoutIterator) next() (batchIdentifier, error) { for len(i.files) > 0 { var f string f, i.files = i.files[0], i.files[1:] - path := filepath.Join(i.layout.root, f) if !isStorageServiceKey(f) { - log.Warn("Incorrectly named batch file found, ignoring", "file", path) continue } key, err := DecodeStorageServiceKey(f) + if err != nil { + return batchIdentifier{}, err + } - stat, err := os.Stat(f) + fullPath := i.layout.batchPath(key) + stat, err := os.Stat(fullPath) if err != nil { - return batchIdentifier{}, false, err + return batchIdentifier{}, err } return batchIdentifier{ key: key, expiry: stat.ModTime().Add(i.layout.retention), - }, true, nil + }, nil } - return batchIdentifier{}, false, nil + return batchIdentifier{}, io.EOF } const ( @@ -347,11 +438,10 @@ type trieLayout struct { } type trieLayoutIterator struct { - firstLevel []string - secondLevel []string - files []string - - layout *trieLayout + levels [][]string + filters []layerFilter + topDir string + layout *trieLayout } func (l *trieLayout) batchPath(key common.Hash) string { @@ -367,7 +457,7 @@ func (l *trieLayout) batchPath(key common.Hash) string { return filepath.Join(l.root, topDir, firstDir, secondDir, encodedKey) } -func (l *trieLayout) expiryPath(key common.Hash, expiry uint64) pathParts { +func (l *trieLayout) expiryPath(key common.Hash, expiry uint64) string { encodedKey := EncodeStorageServiceKey(key) firstDir := fmt.Sprintf("%d", expiry/expiryDivisor) secondDir := fmt.Sprintf("%d", expiry%expiryDivisor) @@ -384,6 +474,8 @@ func (l *trieLayout) iterateBatches() (*trieLayoutIterator, error) { var firstLevel, secondLevel, files []string var err error + // TODO handle stray files that aren't dirs + firstLevel, err = listDir(filepath.Join(l.root, byDataHash)) if err != nil { return nil, err @@ -403,61 +495,185 @@ func (l *trieLayout) iterateBatches() (*trieLayoutIterator, error) { } } + storageKeyFilter := func(layers *[][]string, idx int) bool { + return isStorageServiceKey((*layers)[idx][0]) + } + return &trieLayoutIterator{ - firstLevel: firstLevel, - secondLevel: secondLevel, - files: files, - layout: l, + levels: [][]string{firstLevel, secondLevel, files}, + filters: []layerFilter{noopFilter, noopFilter, storageKeyFilter}, + topDir: byDataHash, + layout: l, }, nil } -func (i *trieLayout) commitMigration() error { - if !i.migrating { +func (l *trieLayout) iterateBatchesByTimestamp(maxTimestamp time.Time) (*trieLayoutIterator, error) { + var firstLevel, secondLevel, files []string + var err error + + firstLevel, err = listDir(filepath.Join(l.root, byExpiryTimestamp)) + if err != nil { + return nil, err + } + + if len(firstLevel) > 0 { + secondLevel, err = listDir(filepath.Join(l.root, byExpiryTimestamp, firstLevel[0])) + if err != nil { + return nil, err + } + } + + if len(secondLevel) > 0 { + files, err = listDir(filepath.Join(l.root, byExpiryTimestamp, firstLevel[0], secondLevel[0])) + if err != nil { + return nil, err + } + } + + beforeUpper := func(layers *[][]string, idx int) bool { + num, err := strconv.Atoi((*layers)[idx][0]) + if err != nil { + return false + } + return int64(num) <= maxTimestamp.Unix()/expiryDivisor + } + beforeLower := func(layers *[][]string, idx int) bool { + num, err := strconv.Atoi((*layers)[idx-1][0] + (*layers)[idx][0]) + if err != nil { + return false + } + return int64(num) <= maxTimestamp.Unix() + } + storageKeyFilter := func(layers *[][]string, idx int) bool { + return isStorageServiceKey((*layers)[idx][0]) + } + + return &trieLayoutIterator{ + levels: [][]string{firstLevel, secondLevel, files}, + filters: []layerFilter{beforeUpper, beforeLower, storageKeyFilter}, + topDir: byExpiryTimestamp, + layout: l, + }, nil +} + +func (l *trieLayout) startMigration() error { + // TODO check for existing dirs + if !l.migrating { + return errors.New("Local file storage already migrated to trieLayout") + } + + if err := os.MkdirAll(filepath.Join(l.root, byDataHash+migratingSuffix), 0o700); err != nil { + return err + } + + if l.expiryEnabled { + if err := os.MkdirAll(filepath.Join(l.root, byExpiryTimestamp+migratingSuffix), 0o700); err != nil { + return err + } + } + return nil + +} + +func (l *trieLayout) commitMigration() error { + if !l.migrating { return errors.New("already finished migration") } - oldDir := filepath.Join(i.root, byDataHash+migratingSuffix) - newDir := filepath.Join(i.root, byDataHash) + removeSuffix := func(prefix string) error { + oldDir := filepath.Join(l.root, prefix+migratingSuffix) + newDir := filepath.Join(l.root, prefix) - if err := os.Rename(oldDir, newDir); err != nil { - return fmt.Errorf("couldn't rename \"%s\" to \"%s\": %w", oldDir, newDir, err) + if err := os.Rename(oldDir, newDir); err != nil { + return err // rename error already includes src and dst, no need to wrap + } + return nil + } + + if err := removeSuffix(byDataHash); err != nil { + return err + } + + if l.expiryEnabled { + if err := removeSuffix(byExpiryTimestamp); err != nil { + return err + } } syscall.Sync() return nil } -func (i *trieLayoutIterator) next() (string, bool, error) { - for len(i.firstLevel) > 0 { - for len(i.secondLevel) > 0 { - if len(i.files) > 0 { - var f string - f, i.files = i.files[0], i.files[1:] - return filepath.Join(i.layout.root, byDataHash, i.firstLevel[0], i.secondLevel[0], f), true, nil - } +func (it *trieLayoutIterator) next() (string, error) { + isLeaf := func(idx int) bool { + return idx == len(it.levels)-1 + } + + makePathAtLevel := func(idx int) string { + pathComponents := make([]string, idx+3) + pathComponents[0] = it.layout.root + pathComponents[1] = it.topDir + for i := 0; i <= idx; i++ { + pathComponents[i+2] = it.levels[i][0] + } + return filepath.Join(pathComponents...) + } - if len(i.secondLevel) <= 1 { - return "", false, nil + var populateNextLevel func(idx int) error + populateNextLevel = func(idx int) error { + if isLeaf(idx) || len(it.levels[idx]) == 0 { + return nil + } + nextLevelEntries, err := listDir(makePathAtLevel(idx)) + if err != nil { + return err + } + it.levels[idx+1] = nextLevelEntries + if len(nextLevelEntries) > 0 { + return populateNextLevel(idx + 1) + } + return nil + } + + advanceWithinLevel := func(idx int) error { + if len(it.levels[idx]) > 1 { + it.levels[idx] = it.levels[idx][1:] + } else { + it.levels[idx] = nil + } + + return populateNextLevel(idx) + } + + for idx := 0; idx >= 0; { + if len(it.levels[idx]) == 0 { + idx-- + continue + } + + if !it.filters[idx](&it.levels, idx) { + if err := advanceWithinLevel(idx); err != nil { + return "", err } - i.secondLevel = i.secondLevel[1:] + continue + } - files, err := listDir(filepath.Join(i.layout.root, byDataHash, i.firstLevel[0], i.secondLevel[0])) - if err != nil { - return "", false, err + if isLeaf(idx) { + path := makePathAtLevel(idx) + if err := advanceWithinLevel(idx); err != nil { + return "", err } - i.files = files + return path, nil } - if len(i.firstLevel) <= 1 { - return "", false, nil + if len(it.levels[idx+1]) > 0 { + idx++ + continue } - i.firstLevel = i.firstLevel[1:] - secondLevel, err := listDir(filepath.Join(i.layout.root, byDataHash, i.firstLevel[0])) - if err != nil { - return "", false, err + + if err := advanceWithinLevel(idx); err != nil { + return "", err } - i.secondLevel = secondLevel } - - return "", false, nil + return "", io.EOF } From 45ab8a279a9baa3b1a834e8317878c8cd75d3a2f Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Mon, 10 Jun 2024 18:12:49 -0700 Subject: [PATCH 03/21] Logic for starting expiry, tests --- das/das_test.go | 10 +- das/factory.go | 6 +- das/local_file_storage_service.go | 162 +++++++++++++++++-------- das/local_file_storage_service_test.go | 120 ++++++++++++++++++ 4 files changed, 243 insertions(+), 55 deletions(-) create mode 100644 das/local_file_storage_service_test.go diff --git a/das/das_test.go b/das/das_test.go index 950b63d9d9..0858a217da 100644 --- a/das/das_test.go +++ b/das/das_test.go @@ -40,8 +40,9 @@ func testDASStoreRetrieveMultipleInstances(t *testing.T, storageType string) { KeyDir: dbPath, }, LocalFileStorage: LocalFileStorageConfig{ - Enable: enableFileStorage, - DataDir: dbPath, + Enable: enableFileStorage, + DataDir: dbPath, + MaxRetention: DefaultLocalFileStorageConfig.MaxRetention, }, LocalDBStorage: dbConfig, ParentChainNodeURL: "none", @@ -129,8 +130,9 @@ func testDASMissingMessage(t *testing.T, storageType string) { KeyDir: dbPath, }, LocalFileStorage: LocalFileStorageConfig{ - Enable: enableFileStorage, - DataDir: dbPath, + Enable: enableFileStorage, + DataDir: dbPath, + MaxRetention: DefaultLocalFileStorageConfig.MaxRetention, }, LocalDBStorage: dbConfig, ParentChainNodeURL: "none", diff --git a/das/factory.go b/das/factory.go index d9eacd0ada..3ab7cc2401 100644 --- a/das/factory.go +++ b/das/factory.go @@ -35,7 +35,11 @@ func CreatePersistentStorageService( } if config.LocalFileStorage.Enable { - s, err := NewLocalFileStorageService(config.LocalFileStorage.DataDir) + s, err := NewLocalFileStorageService(config.LocalFileStorage) + if err != nil { + return nil, nil, err + } + s.start(ctx) if err != nil { return nil, nil, err } diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 7a39a9ec37..470cac88b6 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -23,48 +23,99 @@ import ( "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/das/dastree" "github.com/offchainlabs/nitro/util/pretty" + "github.com/offchainlabs/nitro/util/stopwaiter" flag "github.com/spf13/pflag" "golang.org/x/sys/unix" ) type LocalFileStorageConfig struct { - Enable bool `koanf:"enable"` - DataDir string `koanf:"data-dir"` + Enable bool `koanf:"enable"` + DataDir string `koanf:"data-dir"` + EnableExpiry bool `koanf:"enable-expiry"` + MaxRetention time.Duration `koanf:"max-retention"` } var DefaultLocalFileStorageConfig = LocalFileStorageConfig{ - DataDir: "", + DataDir: "", + MaxRetention: time.Hour * 24 * 21, // 6 days longer than the batch poster default } func LocalFileStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable", DefaultLocalFileStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a directory of files, one per batch") f.String(prefix+".data-dir", DefaultLocalFileStorageConfig.DataDir, "local data directory") + f.Bool(prefix+".enable-expiry", DefaultLocalFileStorageConfig.EnableExpiry, "enable expiry of batches") + f.Duration(prefix+".max-retention", DefaultLocalFileStorageConfig.MaxRetention, "store requests with expiry times farther in the future than max-retention will be rejected") } type LocalFileStorageService struct { - dataDir string + config LocalFileStorageConfig legacyLayout flatLayout layout trieLayout // for testing only enableLegacyLayout bool + + stopWaiter stopwaiter.StopWaiterSafe } -func NewLocalFileStorageService(dataDir string) (*LocalFileStorageService, error) { - if unix.Access(dataDir, unix.W_OK|unix.R_OK) != nil { - return nil, fmt.Errorf("couldn't start LocalFileStorageService, directory '%s' must be readable and writeable", dataDir) +func NewLocalFileStorageService(config LocalFileStorageConfig) (*LocalFileStorageService, error) { + if unix.Access(config.DataDir, unix.W_OK|unix.R_OK) != nil { + return nil, fmt.Errorf("couldn't start LocalFileStorageService, directory '%s' must be readable and writeable", config.DataDir) } - return &LocalFileStorageService{ - dataDir: dataDir, - legacyLayout: flatLayout{root: dataDir}, - layout: trieLayout{root: dataDir}, - }, nil + s := &LocalFileStorageService{ + config: config, + legacyLayout: flatLayout{root: config.DataDir}, + layout: trieLayout{root: config.DataDir, expiryEnabled: config.EnableExpiry}, + } + return s, nil +} + +// Separate start function +// Tests want to be able to avoid triggering the auto migration +func (s *LocalFileStorageService) start(ctx context.Context) error { + migrated, err := s.layout.migrated() + if err != nil { + return err + } + + if !migrated && !s.enableLegacyLayout { + if err = migrate(&s.legacyLayout, &s.layout); err != nil { + return err + } + } + + if err := s.stopWaiter.Start(ctx, s); err != nil { + return err + } + if s.config.EnableExpiry && !s.enableLegacyLayout { + err = s.stopWaiter.CallIterativelySafe(func(ctx context.Context) time.Duration { + err = s.layout.prune(time.Now()) + if err != nil { + log.Error("error pruning expired batches", "error", err) + } + return time.Minute * 5 + }) + if err != nil { + return err + } + } + return nil +} + +func (s *LocalFileStorageService) Close(ctx context.Context) error { + return s.stopWaiter.StopAndWait() } func (s *LocalFileStorageService) GetByHash(ctx context.Context, key common.Hash) ([]byte, error) { log.Trace("das.LocalFileStorageService.GetByHash", "key", pretty.PrettyHash(key), "this", s) - batchPath := s.legacyLayout.batchPath(key) + var batchPath string + if s.enableLegacyLayout { + batchPath = s.legacyLayout.batchPath(key) + } else { + batchPath = s.layout.batchPath(key) + } + data, err := os.ReadFile(batchPath) if err != nil { if errors.Is(err, os.ErrNotExist) { @@ -76,8 +127,11 @@ func (s *LocalFileStorageService) GetByHash(ctx context.Context, key common.Hash } func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, expiry uint64) error { - // TODO input validation on expiry logPut("das.LocalFileStorageService.Store", data, expiry, s) + if time.Unix(int64(expiry), 0).After(time.Now().Add(s.config.MaxRetention)) { + return errors.New("requested expiry time exceeds maximum allowed retention period") + } + key := dastree.Hash(data) var batchPath string if !s.enableLegacyLayout { @@ -92,6 +146,11 @@ func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, expiry u batchPath = s.legacyLayout.batchPath(key) } + err := os.MkdirAll(path.Dir(batchPath), 0o700) + if err != nil { + return fmt.Errorf("failed to create directory %s: %w", path.Dir(batchPath), err) + } + // Use a temp file and rename to achieve atomic writes. f, err := os.CreateTemp(path.Dir(batchPath), path.Base(batchPath)) if err != nil { @@ -125,16 +184,12 @@ func (s *LocalFileStorageService) Sync(ctx context.Context) error { return nil } -func (s *LocalFileStorageService) Close(ctx context.Context) error { - return nil -} - func (s *LocalFileStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { return daprovider.KeepForever, nil } func (s *LocalFileStorageService) String() string { - return "LocalFileStorageService(" + s.dataDir + ")" + return "LocalFileStorageService(" + s.config.DataDir + ")" } func (s *LocalFileStorageService) HealthCheck(ctx context.Context) error { @@ -153,20 +208,6 @@ func (s *LocalFileStorageService) HealthCheck(ctx context.Context) error { return nil } -/* -New layout - Access data by hash -> by-data-hash/1st octet/2nd octet/hash - Store data with hash and expiry - Iterate Unordered - Iterate by Time - Prune before - - -Old layout - Access data by hash -> hash - Iterate unordered -*/ - func listDir(dir string) ([]string, error) { d, err := os.Open(dir) if err != nil { @@ -238,14 +279,23 @@ func migrate(fl *flatLayout, tl *trieLayout) error { return err } - if err = tl.startMigration(); err != nil { + batch, err := flIt.next() + if errors.Is(err, io.EOF) { + log.Info("No batches in legacy layout detected, skipping migration.") + return nil + } + if err != nil { return err } + if startErr := tl.startMigration(); startErr != nil { + return startErr + } + migrationStart := time.Now() var migrated, skipped, removed int err = func() error { - for batch, err := flIt.next(); err != io.EOF; batch, err = flIt.next() { + for ; !errors.Is(err, io.EOF); batch, err = flIt.next() { if err != nil { return err } @@ -280,7 +330,7 @@ func migrate(fl *flatLayout, tl *trieLayout) error { if err != nil { return err } - for batch, err := flIt.next(); err != io.EOF; batch, err = flIt.next() { + for batch, err := flIt.next(); !errors.Is(err, io.EOF); batch, err = flIt.next() { if err != nil { log.Warn("local file store migration completed, but error cleaning up old layout, files from that layout are now orphaned", "error", err) break @@ -298,13 +348,13 @@ func migrate(fl *flatLayout, tl *trieLayout) error { return nil } -func prune(tl trieLayout, pruneTil time.Time) error { +func (tl *trieLayout) prune(pruneTil time.Time) error { it, err := tl.iterateBatchesByTimestamp(pruneTil) if err != nil { return err } pruned := 0 - for file, err := it.next(); err != io.EOF; file, err = it.next() { + for file, err := it.next(); !errors.Is(err, io.EOF); file, err = it.next() { if err != nil { return err } @@ -331,7 +381,9 @@ func prune(tl trieLayout, pruneTil time.Time) error { } pruned++ } - log.Info("local file store pruned expired batches", "count", pruned) + if pruned > 0 { + log.Info("local file store pruned expired batches", "count", pruned) + } return nil } @@ -353,19 +405,11 @@ func recursivelyDeleteUntil(filePath, until string) error { return nil } -type batchIterator interface { - next() (batchIdentifier, bool, error) -} - type batchIdentifier struct { key common.Hash expiry time.Time } -const ( - defaultRetention = time.Hour * 24 * 7 * 3 -) - type flatLayout struct { root string @@ -434,7 +478,9 @@ type trieLayout struct { root string expiryEnabled bool - migrating bool // Is the trieLayout currently being migrated to + // Is the trieLayout currently being migrated to? + // Controls whether paths include the migratingSuffix. + migrating bool } type trieLayoutIterator struct { @@ -556,12 +602,28 @@ func (l *trieLayout) iterateBatchesByTimestamp(maxTimestamp time.Time) (*trieLay }, nil } +func (l *trieLayout) migrated() (bool, error) { + info, err := os.Stat(filepath.Join(l.root, byDataHash)) + if os.IsNotExist(err) { + return false, nil + } + if err != nil { + return false, err + } + return info.IsDir(), nil +} + func (l *trieLayout) startMigration() error { - // TODO check for existing dirs - if !l.migrating { + migrated, err := l.migrated() + if err != nil { + return err + } + if migrated { return errors.New("Local file storage already migrated to trieLayout") } + l.migrating = true + if err := os.MkdirAll(filepath.Join(l.root, byDataHash+migratingSuffix), 0o700); err != nil { return err } diff --git a/das/local_file_storage_service_test.go b/das/local_file_storage_service_test.go new file mode 100644 index 0000000000..7eb6aea8f7 --- /dev/null +++ b/das/local_file_storage_service_test.go @@ -0,0 +1,120 @@ +// Copyright 2024, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package das + +import ( + "context" + "errors" + "io" + "testing" + "time" +) + +func TestMigrationNoExpiry(t *testing.T) { + dir := t.TempDir() + t.Logf("temp dir: %s", dir) + ctx := context.Background() + + config := LocalFileStorageConfig{ + Enable: true, + DataDir: dir, + EnableExpiry: false, + MaxRetention: time.Hour * 24 * 30, + } + s, err := NewLocalFileStorageService(config) + Require(t, err) + s.enableLegacyLayout = true + + now := uint64(time.Now().Unix()) + + err = s.Put(ctx, []byte("a"), now+1) + Require(t, err) + err = s.Put(ctx, []byte("b"), now+1) + Require(t, err) + err = s.Put(ctx, []byte("c"), now+2) + Require(t, err) + err = s.Put(ctx, []byte("d"), now+10) + Require(t, err) + + err = migrate(&s.legacyLayout, &s.layout) + Require(t, err) + + migrated := 0 + trIt, err := s.layout.iterateBatches() + Require(t, err) + for _, err := trIt.next(); !errors.Is(err, io.EOF); _, err = trIt.next() { + Require(t, err) + migrated++ + } + if migrated != 4 { + t.Fail() + } + + // byTimestampEntries := 0 + trIt, err = s.layout.iterateBatchesByTimestamp(time.Unix(int64(now+10), 0)) + if err == nil { + t.Fail() + } +} + +func TestMigrationExpiry(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + + config := LocalFileStorageConfig{ + Enable: true, + DataDir: dir, + EnableExpiry: true, + MaxRetention: time.Hour * 24 * 30, + } + s, err := NewLocalFileStorageService(config) + Require(t, err) + s.enableLegacyLayout = true + + now := uint64(time.Now().Unix()) + + err = s.Put(ctx, []byte("a"), now-expiryDivisor*2) + Require(t, err) + err = s.Put(ctx, []byte("b"), now-expiryDivisor) + Require(t, err) + err = s.Put(ctx, []byte("c"), now+expiryDivisor) + Require(t, err) + err = s.Put(ctx, []byte("d"), now+expiryDivisor) + Require(t, err) + err = s.Put(ctx, []byte("e"), now+expiryDivisor*2) + Require(t, err) + + s.layout.expiryEnabled = true + err = migrate(&s.legacyLayout, &s.layout) + Require(t, err) + + migrated := 0 + trIt, err := s.layout.iterateBatches() + Require(t, err) + for _, err := trIt.next(); !errors.Is(err, io.EOF); _, err = trIt.next() { + Require(t, err) + migrated++ + } + if migrated != 3 { + t.Fail() + } + + countTimestampEntries := func(cutoff, expected uint64) { + var byTimestampEntries uint64 + trIt, err = s.layout.iterateBatchesByTimestamp(time.Unix(int64(cutoff), 0)) + Require(t, err) + for batch, err := trIt.next(); !errors.Is(err, io.EOF); batch, err = trIt.next() { + Require(t, err) + t.Logf("indexCreated %s", batch) + byTimestampEntries++ + } + if byTimestampEntries != expected { + t.Fail() + } + } + + countTimestampEntries(now, 0) // They should have all been filtered out since they're after now + countTimestampEntries(now+expiryDivisor, 2) + countTimestampEntries(now+expiryDivisor*2, 3) +} From 088c380d1cabf7bec22c96c3866785fffe819d5b Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Tue, 11 Jun 2024 17:42:20 -0700 Subject: [PATCH 04/21] fix l1SyncService retention and expiry Make it so that the l1SyncService can sync expired data and set also set sensible retention. --- das/local_file_storage_service.go | 2 +- das/storage_service.go | 3 +++ das/syncing_fallback_storage.go | 10 ++++++---- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 470cac88b6..bd907a3645 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -37,7 +37,7 @@ type LocalFileStorageConfig struct { var DefaultLocalFileStorageConfig = LocalFileStorageConfig{ DataDir: "", - MaxRetention: time.Hour * 24 * 21, // 6 days longer than the batch poster default + MaxRetention: defaultStorageRetention, } func LocalFileStorageConfigAddOptions(prefix string, f *flag.FlagSet) { diff --git a/das/storage_service.go b/das/storage_service.go index 806e80dba5..b7526077e9 100644 --- a/das/storage_service.go +++ b/das/storage_service.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "strings" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -25,6 +26,8 @@ type StorageService interface { HealthCheck(ctx context.Context) error } +const defaultStorageRetention = time.Hour * 24 * 21 // 6 days longer than the batch poster default + func EncodeStorageServiceKey(key common.Hash) string { return key.Hex()[2:] } diff --git a/das/syncing_fallback_storage.go b/das/syncing_fallback_storage.go index 411e7a1977..000c16ff09 100644 --- a/das/syncing_fallback_storage.go +++ b/das/syncing_fallback_storage.go @@ -8,7 +8,6 @@ import ( "encoding/binary" "errors" "fmt" - "math" "math/big" "os" "sync" @@ -65,17 +64,19 @@ type SyncToStorageConfig struct { IgnoreWriteErrors bool `koanf:"ignore-write-errors"` ParentChainBlocksPerRead uint64 `koanf:"parent-chain-blocks-per-read"` StateDir string `koanf:"state-dir"` + SyncExpiredData bool `koanf:"sync-expired-data"` } var DefaultSyncToStorageConfig = SyncToStorageConfig{ CheckAlreadyExists: true, Eager: false, EagerLowerBoundBlock: 0, - RetentionPeriod: time.Duration(math.MaxInt64), + RetentionPeriod: defaultStorageRetention, DelayOnError: time.Second, IgnoreWriteErrors: true, ParentChainBlocksPerRead: 100, StateDir: "", + SyncExpiredData: true, } func SyncToStorageConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -83,10 +84,11 @@ func SyncToStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".eager", DefaultSyncToStorageConfig.Eager, "eagerly sync batch data to this DAS's storage from the rest endpoints, using L1 as the index of batch data hashes; otherwise only sync lazily") f.Uint64(prefix+".eager-lower-bound-block", DefaultSyncToStorageConfig.EagerLowerBoundBlock, "when eagerly syncing, start indexing forward from this L1 block. Only used if there is no sync state") f.Uint64(prefix+".parent-chain-blocks-per-read", DefaultSyncToStorageConfig.ParentChainBlocksPerRead, "when eagerly syncing, max l1 blocks to read per poll") - f.Duration(prefix+".retention-period", DefaultSyncToStorageConfig.RetentionPeriod, "period to retain synced data (defaults to forever)") + f.Duration(prefix+".retention-period", DefaultSyncToStorageConfig.RetentionPeriod, "period to request storage to retain synced data") f.Duration(prefix+".delay-on-error", DefaultSyncToStorageConfig.DelayOnError, "time to wait if encountered an error before retrying") f.Bool(prefix+".ignore-write-errors", DefaultSyncToStorageConfig.IgnoreWriteErrors, "log only on failures to write when syncing; otherwise treat it as an error") f.String(prefix+".state-dir", DefaultSyncToStorageConfig.StateDir, "directory to store the sync state in, ie the block number currently synced up to, so that we don't sync from scratch each time") + f.Bool(prefix+".sync-expired-data", DefaultSyncToStorageConfig.SyncExpiredData, "sync even data that is expired; needed for mirror configuration") } type l1SyncService struct { @@ -192,7 +194,7 @@ func (s *l1SyncService) processBatchDelivered(ctx context.Context, batchDelivere } log.Info("BatchDelivered", "log", batchDeliveredLog, "event", deliveredEvent) storeUntil := arbmath.SaturatingUAdd(deliveredEvent.TimeBounds.MaxTimestamp, uint64(s.config.RetentionPeriod.Seconds())) - if storeUntil < uint64(time.Now().Unix()) { + if !s.config.SyncExpiredData && storeUntil < uint64(time.Now().Unix()) { // old batch - no need to store return nil } From 7596da61a4a7af76fce22ad52336e2324946da84 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Wed, 12 Jun 2024 11:00:18 -0700 Subject: [PATCH 05/21] Plumb retention period to legacy layout Also fix unsetting the migrating flag after migration is done. --- das/factory.go | 2 +- das/local_file_storage_service.go | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/das/factory.go b/das/factory.go index 3ab7cc2401..8f6432234d 100644 --- a/das/factory.go +++ b/das/factory.go @@ -39,7 +39,7 @@ func CreatePersistentStorageService( if err != nil { return nil, nil, err } - s.start(ctx) + err = s.start(ctx) if err != nil { return nil, nil, err } diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index bd907a3645..0b731bbdda 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -65,7 +65,7 @@ func NewLocalFileStorageService(config LocalFileStorageConfig) (*LocalFileStorag } s := &LocalFileStorageService{ config: config, - legacyLayout: flatLayout{root: config.DataDir}, + legacyLayout: flatLayout{root: config.DataDir, retention: config.MaxRetention}, layout: trieLayout{root: config.DataDir, expiryEnabled: config.EnableExpiry}, } return s, nil @@ -128,8 +128,10 @@ func (s *LocalFileStorageService) GetByHash(ctx context.Context, key common.Hash func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, expiry uint64) error { logPut("das.LocalFileStorageService.Store", data, expiry, s) - if time.Unix(int64(expiry), 0).After(time.Now().Add(s.config.MaxRetention)) { - return errors.New("requested expiry time exceeds maximum allowed retention period") + expiryTime := time.Unix(int64(expiry), 0) + currentTimePlusRetention := time.Now().Add(s.config.MaxRetention) + if expiryTime.After(currentTimePlusRetention) { + return fmt.Errorf("requested expiry time (%v) exceeds current time plus maximum allowed retention period(%v)", expiryTime, currentTimePlusRetention) } key := dastree.Hash(data) @@ -343,7 +345,7 @@ func migrate(fl *flatLayout, tl *trieLayout) error { removed++ } - log.Info("Local file store legacy layout migration complete", "migratedFiles", migrated, "skippedExpiredFiles", skipped, "removedFiles", removed) + log.Info("Local file store legacy layout migration complete", "migratedFiles", migrated, "skippedExpiredFiles", skipped, "removedFiles", removed, "duration", time.Since(migrationStart)) return nil } @@ -354,6 +356,7 @@ func (tl *trieLayout) prune(pruneTil time.Time) error { return err } pruned := 0 + pruningStart := time.Now() for file, err := it.next(); !errors.Is(err, io.EOF); file, err = it.next() { if err != nil { return err @@ -382,7 +385,7 @@ func (tl *trieLayout) prune(pruneTil time.Time) error { pruned++ } if pruned > 0 { - log.Info("local file store pruned expired batches", "count", pruned) + log.Info("local file store pruned expired batches", "count", pruned, "pruneTil", pruneTil, "duration", time.Since(pruningStart)) } return nil } @@ -663,6 +666,10 @@ func (l *trieLayout) commitMigration() error { } syscall.Sync() + + // Done migrating + l.migrating = false + return nil } From 13f79586319e5b86943672084218ecf0215327ac Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Wed, 12 Jun 2024 13:12:23 -0700 Subject: [PATCH 06/21] Switch to ref counted expiry using hard links We can have batches with identical data, so the trieLayout needs to be able to handle being requested to store the same batch multiple times with different expiry times. The way it works now is that the leaf files in by-expiry-timestamp are hard links to the batch files in by-data-hash. The link count is used to know when there are no more links in by-expiry-timestamp pointing to the batch file and that it can be deleted. --- das/local_file_storage_service.go | 98 +++++++++++++++++++++++-------- 1 file changed, 72 insertions(+), 26 deletions(-) diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 0b731bbdda..10298e6826 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -15,6 +15,7 @@ import ( "regexp" "strconv" "strings" + "sync" "syscall" "time" @@ -137,13 +138,9 @@ func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, expiry u key := dastree.Hash(data) var batchPath string if !s.enableLegacyLayout { + s.layout.writeMutex.Lock() + defer s.layout.writeMutex.Unlock() batchPath = s.layout.batchPath(key) - - if s.layout.expiryEnabled { - if err := createEmptyFile(s.layout.expiryPath(key, expiry)); err != nil { - return fmt.Errorf("Couldn't create by-expiry-path index entry: %w", err) - } - } } else { batchPath = s.legacyLayout.batchPath(key) } @@ -158,7 +155,15 @@ func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, expiry u if err != nil { return err } - defer f.Close() + renamed := false + defer func() { + _ = f.Close() + if !renamed { + if err := os.Remove(f.Name()); err != nil { + log.Error("Couldn't clean up temporary file", "file", f.Name()) + } + } + }() err = f.Chmod(0o600) if err != nil { return err @@ -179,7 +184,25 @@ func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, expiry u } } - return os.Rename(f.Name(), batchPath) + _, err = os.Stat(batchPath) + if err != nil { + if os.IsNotExist(err) { + if err = os.Rename(f.Name(), batchPath); err != nil { + return err + } + renamed = true + } else { + return err + } + } + + if !s.enableLegacyLayout && s.layout.expiryEnabled { + if err := createHardLink(batchPath, s.layout.expiryPath(key, expiry)); err != nil { + return fmt.Errorf("couldn't create by-expiry-path index entry: %w", err) + } + } + + return nil } func (s *LocalFileStorageService) Sync(ctx context.Context) error { @@ -261,20 +284,22 @@ func copyFile(new, orig string) error { } // Creates an empty file, making any directories needed in the new file's path. -func createEmptyFile(new string) error { +func createHardLink(orig, new string) error { err := os.MkdirAll(path.Dir(new), 0o700) if err != nil { - return fmt.Errorf("failed to create directory %s: %w", path.Dir(new), err) + return err } - file, err := os.OpenFile(new, os.O_CREATE|os.O_WRONLY, 0o600) + err = os.Link(orig, new) if err != nil { - return fmt.Errorf("failed to create file %s: %w", new, err) + return err } - file.Close() return nil } +// migrate converts a file store from flatLayout to trieLayout. +// It is not thread safe and must be run before Put requests are served. +// The expiry index is only created if expiry is enabled. func migrate(fl *flatLayout, tl *trieLayout) error { flIt, err := fl.iterateBatches() if err != nil { @@ -304,6 +329,7 @@ func migrate(fl *flatLayout, tl *trieLayout) error { if tl.expiryEnabled && batch.expiry.Before(migrationStart) { skipped++ + log.Debug("skipping expired batch during migration", "expiry", batch.expiry, "start", migrationStart) continue // don't migrate expired batches } @@ -315,7 +341,7 @@ func migrate(fl *flatLayout, tl *trieLayout) error { if tl.expiryEnabled { expiryPath := tl.expiryPath(batch.key, uint64(batch.expiry.Unix())) - if err = createEmptyFile(expiryPath); err != nil { + if err = createHardLink(newPath, expiryPath); err != nil { return err } } @@ -351,37 +377,49 @@ func migrate(fl *flatLayout, tl *trieLayout) error { } func (tl *trieLayout) prune(pruneTil time.Time) error { + tl.writeMutex.Lock() + defer tl.writeMutex.Unlock() it, err := tl.iterateBatchesByTimestamp(pruneTil) if err != nil { return err } pruned := 0 pruningStart := time.Now() - for file, err := it.next(); !errors.Is(err, io.EOF); file, err = it.next() { + for pathByTimestamp, err := it.next(); !errors.Is(err, io.EOF); pathByTimestamp, err = it.next() { if err != nil { return err } - pathByTimestamp := path.Base(file) key, err := DecodeStorageServiceKey(path.Base(pathByTimestamp)) if err != nil { return err } + err = recursivelyDeleteUntil(pathByTimestamp, byExpiryTimestamp) + if err != nil { + log.Error("Couldn't prune expired batch expiry index entry, continuing trying to prune others", "path", pathByTimestamp, "err", err) + } + pathByHash := tl.batchPath(key) - err = recursivelyDeleteUntil(pathByHash, byDataHash) + info, err := os.Stat(pathByHash) if err != nil { if os.IsNotExist(err) { - log.Warn("Couldn't find batch to expire, it may have been previously deleted but its by-expiry-timestamp index entry still exists, trying to clean up the index next", "path", pathByHash, "indexPath", pathByTimestamp, "err", err) - + log.Warn("Couldn't find batch to expire, it may have been previously deleted but its by-expiry-timestamp index entry still existed, deleting its index entry and continuing", "path", pathByHash, "indexPath", pathByTimestamp, "err", err) } else { log.Error("Couldn't prune expired batch, continuing trying to prune others", "path", pathByHash, "err", err) - continue } - + continue } - err = recursivelyDeleteUntil(pathByTimestamp, byExpiryTimestamp) - if err != nil { - log.Error("Couldn't prune expired batch expiry index entry, continuing trying to prune others", "path", pathByHash, "err", err) + stat, ok := info.Sys().(*syscall.Stat_t) + if !ok { + log.Error("Couldn't convert file stats to Stat_t struct, possible OS or filesystem incompatibility, skipping pruning this batch", "file", pathByHash) + continue } + if stat.Nlink == 1 { + err = recursivelyDeleteUntil(pathByHash, byDataHash) + if err != nil { + + } + } + pruned++ } if pruned > 0 { @@ -477,6 +515,8 @@ const ( expiryDivisor = 10_000 ) +var expirySecondPartWidth = len(strconv.Itoa(expiryDivisor)) - 1 + type trieLayout struct { root string expiryEnabled bool @@ -484,6 +524,12 @@ type trieLayout struct { // Is the trieLayout currently being migrated to? // Controls whether paths include the migratingSuffix. migrating bool + + // Anything changing the layout (pruning, adding files) must go through + // this mutex. + // Pruning the entire history at statup of Arb Nova as of 2024-06-12 takes + // 5s on my laptop, so the overhead of pruning after startup should be neglibile. + writeMutex sync.Mutex } type trieLayoutIterator struct { @@ -509,7 +555,7 @@ func (l *trieLayout) batchPath(key common.Hash) string { func (l *trieLayout) expiryPath(key common.Hash, expiry uint64) string { encodedKey := EncodeStorageServiceKey(key) firstDir := fmt.Sprintf("%d", expiry/expiryDivisor) - secondDir := fmt.Sprintf("%d", expiry%expiryDivisor) + secondDir := fmt.Sprintf("%0*d", expirySecondPartWidth, expiry%expiryDivisor) topDir := byExpiryTimestamp if l.migrating { @@ -622,7 +668,7 @@ func (l *trieLayout) startMigration() error { return err } if migrated { - return errors.New("Local file storage already migrated to trieLayout") + return errors.New("local file storage already migrated to trieLayout") } l.migrating = true From 929c9c08853f33e2e43828ad9a198faff2893110 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Wed, 12 Jun 2024 17:49:59 -0700 Subject: [PATCH 07/21] Unit test for handling of duplicates with expiry --- das/local_file_storage_service.go | 5 +- das/local_file_storage_service_test.go | 186 ++++++++++++++++++------- 2 files changed, 143 insertions(+), 48 deletions(-) diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 10298e6826..20d8e21ea2 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -173,6 +173,9 @@ func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, expiry u return err } + // For testing only. When migrating we treat the expiry time of existing flat layout + // files to be the modification time + the max allowed retention. So when creating + // new flat layout files, set their modification time accordingly. if s.enableLegacyLayout { tv := syscall.Timeval{ Sec: int64(expiry - uint64(s.legacyLayout.retention.Seconds())), @@ -637,7 +640,7 @@ func (l *trieLayout) iterateBatchesByTimestamp(maxTimestamp time.Time) (*trieLay if err != nil { return false } - return int64(num) <= maxTimestamp.Unix() + return int64(num) < maxTimestamp.Unix() } storageKeyFilter := func(layers *[][]string, idx int) bool { return isStorageServiceKey((*layers)[idx][0]) diff --git a/das/local_file_storage_service_test.go b/das/local_file_storage_service_test.go index 7eb6aea8f7..59e2f61a59 100644 --- a/das/local_file_storage_service_test.go +++ b/das/local_file_storage_service_test.go @@ -4,16 +4,68 @@ package das import ( + "bytes" "context" "errors" "io" "testing" "time" + + "github.com/offchainlabs/nitro/das/dastree" ) +func getByHashAndCheck(t *testing.T, s *LocalFileStorageService, xs ...string) { + t.Helper() + ctx := context.Background() + + for _, x := range xs { + actual, err := s.GetByHash(ctx, dastree.Hash([]byte(x))) + Require(t, err) + if !bytes.Equal([]byte(x), actual) { + Fail(t, "unexpected result") + } + } +} + +func countEntries(t *testing.T, layout *trieLayout, expected int) { + t.Helper() + + count := 0 + trIt, err := layout.iterateBatches() + Require(t, err) + for _, err := trIt.next(); !errors.Is(err, io.EOF); _, err = trIt.next() { + Require(t, err) + count++ + } + if count != expected { + Fail(t, "unexpected number of batches", "expected", expected, "was", count) + } +} + +func countTimestampEntries(t *testing.T, layout *trieLayout, cutoff time.Time, expected int) { + t.Helper() + var count int + trIt, err := layout.iterateBatchesByTimestamp(cutoff) + Require(t, err) + for _, err := trIt.next(); !errors.Is(err, io.EOF); _, err = trIt.next() { + Require(t, err) + count++ + } + if count != expected { + Fail(t, "unexpected count of entries when iterating by timestamp", "expected", expected, "was", count) + } +} + +func pruneCountRemaining(t *testing.T, layout *trieLayout, pruneTil time.Time, expected int) { + t.Helper() + err := layout.prune(pruneTil) + Require(t, err) + + countEntries(t, layout, expected) +} + func TestMigrationNoExpiry(t *testing.T) { dir := t.TempDir() - t.Logf("temp dir: %s", dir) ctx := context.Background() config := LocalFileStorageConfig{ @@ -37,24 +89,18 @@ func TestMigrationNoExpiry(t *testing.T) { err = s.Put(ctx, []byte("d"), now+10) Require(t, err) + getByHashAndCheck(t, s, "a", "b", "c", "d") + err = migrate(&s.legacyLayout, &s.layout) Require(t, err) + s.enableLegacyLayout = false - migrated := 0 - trIt, err := s.layout.iterateBatches() - Require(t, err) - for _, err := trIt.next(); !errors.Is(err, io.EOF); _, err = trIt.next() { - Require(t, err) - migrated++ - } - if migrated != 4 { - t.Fail() - } + countEntries(t, &s.layout, 4) + getByHashAndCheck(t, s, "a", "b", "c", "d") - // byTimestampEntries := 0 - trIt, err = s.layout.iterateBatchesByTimestamp(time.Unix(int64(now+10), 0)) + _, err = s.layout.iterateBatchesByTimestamp(time.Unix(int64(now+10), 0)) if err == nil { - t.Fail() + Fail(t, "can't iterate by timestamp when expiry is disabled") } } @@ -66,55 +112,101 @@ func TestMigrationExpiry(t *testing.T) { Enable: true, DataDir: dir, EnableExpiry: true, - MaxRetention: time.Hour * 24 * 30, + MaxRetention: time.Hour * 10, } s, err := NewLocalFileStorageService(config) Require(t, err) s.enableLegacyLayout = true - now := uint64(time.Now().Unix()) + now := time.Now() - err = s.Put(ctx, []byte("a"), now-expiryDivisor*2) + // Use increments of expiry divisor in order to span multiple by-expiry-timestamp dirs + err = s.Put(ctx, []byte("a"), uint64(now.Add(-2*time.Second*expiryDivisor).Unix())) Require(t, err) - err = s.Put(ctx, []byte("b"), now-expiryDivisor) + err = s.Put(ctx, []byte("b"), uint64(now.Add(-1*time.Second*expiryDivisor).Unix())) Require(t, err) - err = s.Put(ctx, []byte("c"), now+expiryDivisor) + err = s.Put(ctx, []byte("c"), uint64(now.Add(time.Second*expiryDivisor).Unix())) Require(t, err) - err = s.Put(ctx, []byte("d"), now+expiryDivisor) + err = s.Put(ctx, []byte("d"), uint64(now.Add(time.Second*expiryDivisor).Unix())) Require(t, err) - err = s.Put(ctx, []byte("e"), now+expiryDivisor*2) + err = s.Put(ctx, []byte("e"), uint64(now.Add(2*time.Second*expiryDivisor).Unix())) Require(t, err) - s.layout.expiryEnabled = true + getByHashAndCheck(t, s, "a", "b", "c", "d", "e") + err = migrate(&s.legacyLayout, &s.layout) Require(t, err) + s.enableLegacyLayout = false - migrated := 0 - trIt, err := s.layout.iterateBatches() - Require(t, err) - for _, err := trIt.next(); !errors.Is(err, io.EOF); _, err = trIt.next() { - Require(t, err) - migrated++ - } - if migrated != 3 { - t.Fail() - } + countEntries(t, &s.layout, 3) + getByHashAndCheck(t, s, "c", "d", "e") - countTimestampEntries := func(cutoff, expected uint64) { - var byTimestampEntries uint64 - trIt, err = s.layout.iterateBatchesByTimestamp(time.Unix(int64(cutoff), 0)) - Require(t, err) - for batch, err := trIt.next(); !errors.Is(err, io.EOF); batch, err = trIt.next() { - Require(t, err) - t.Logf("indexCreated %s", batch) - byTimestampEntries++ - } - if byTimestampEntries != expected { - t.Fail() - } + afterNow := now.Add(time.Second) + countTimestampEntries(t, &s.layout, afterNow, 0) // They should have all been filtered out since they're after now + countTimestampEntries(t, &s.layout, afterNow.Add(time.Second*expiryDivisor), 2) + countTimestampEntries(t, &s.layout, afterNow.Add(2*time.Second*expiryDivisor), 3) + + pruneCountRemaining(t, &s.layout, afterNow, 3) + getByHashAndCheck(t, s, "c", "d", "e") + + pruneCountRemaining(t, &s.layout, afterNow.Add(time.Second*expiryDivisor), 1) + getByHashAndCheck(t, s, "e") + + pruneCountRemaining(t, &s.layout, afterNow.Add(2*time.Second*expiryDivisor), 0) +} + +func TestExpiryDuplicates(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + + config := LocalFileStorageConfig{ + Enable: true, + DataDir: dir, + EnableExpiry: true, + MaxRetention: time.Hour * 10, } + s, err := NewLocalFileStorageService(config) + Require(t, err) + + now := time.Now() + + // Use increments of expiry divisor in order to span multiple by-expiry-timestamp dirs + err = s.Put(ctx, []byte("a"), uint64(now.Add(-2*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("a"), uint64(now.Add(-1*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("a"), uint64(now.Add(time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("d"), uint64(now.Add(time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("e"), uint64(now.Add(2*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("f"), uint64(now.Add(3*time.Second*expiryDivisor).Unix())) + Require(t, err) + + afterNow := now.Add(time.Second) + // "a" is duplicated + countEntries(t, &s.layout, 4) + // There should be a timestamp entry for each time "a" was added + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 6) + + // We've expired the first "a", but there are still 2 other timestamp entries for it + pruneCountRemaining(t, &s.layout, afterNow.Add(-2*time.Second*expiryDivisor), 4) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 5) + + // We've expired the second "a", but there is still 1 other timestamp entry for it + pruneCountRemaining(t, &s.layout, afterNow.Add(-1*time.Second*expiryDivisor), 4) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 4) + + // We've expired the third "a", and also "d" + pruneCountRemaining(t, &s.layout, afterNow.Add(time.Second*expiryDivisor), 2) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 2) + + // We've expired the "e" + pruneCountRemaining(t, &s.layout, afterNow.Add(2*time.Second*expiryDivisor), 1) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 1) - countTimestampEntries(now, 0) // They should have all been filtered out since they're after now - countTimestampEntries(now+expiryDivisor, 2) - countTimestampEntries(now+expiryDivisor*2, 3) + // We've expired the "f" + pruneCountRemaining(t, &s.layout, afterNow.Add(3*time.Second*expiryDivisor), 0) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 0) } From 92aa569c8572c5da78ea3bda64cf6af3c621cdf6 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Wed, 12 Jun 2024 22:46:43 -0700 Subject: [PATCH 08/21] Fix case of exactly same batch and expiry time --- das/local_file_storage_service.go | 23 +++++++++++++++++++---- das/local_file_storage_service_test.go | 3 +++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 20d8e21ea2..de3a1379da 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -286,18 +286,33 @@ func copyFile(new, orig string) error { return nil } -// Creates an empty file, making any directories needed in the new file's path. +// Creates a hard link at new, to orig, making any directories needed in the new link's path. func createHardLink(orig, new string) error { err := os.MkdirAll(path.Dir(new), 0o700) if err != nil { return err } - err = os.Link(orig, new) + info, err := os.Stat(new) if err != nil { - return err + if os.IsNotExist(err) { + err = os.Link(orig, new) + if err != nil { + return err + } + return nil + } else { + return err + } } - return nil + + // Hard link already exists + stat, ok := info.Sys().(*syscall.Stat_t) + if ok && stat.Nlink > 1 { + return nil + } + + return fmt.Errorf("file exists but is not a hard link: %s", new) } // migrate converts a file store from flatLayout to trieLayout. diff --git a/das/local_file_storage_service_test.go b/das/local_file_storage_service_test.go index 59e2f61a59..0b2ba9749d 100644 --- a/das/local_file_storage_service_test.go +++ b/das/local_file_storage_service_test.go @@ -183,6 +183,9 @@ func TestExpiryDuplicates(t *testing.T) { Require(t, err) err = s.Put(ctx, []byte("f"), uint64(now.Add(3*time.Second*expiryDivisor).Unix())) Require(t, err) + // Put the same entry and expiry again, should have no effect + err = s.Put(ctx, []byte("f"), uint64(now.Add(3*time.Second*expiryDivisor).Unix())) + Require(t, err) afterNow := now.Add(time.Second) // "a" is duplicated From bfa498dcc0707aab285ff17e1deb4dec0df94692 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Mon, 17 Jun 2024 17:41:28 -0700 Subject: [PATCH 09/21] Fix ExpirationPolicy now that there is expiry --- das/local_file_storage_service.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index de3a1379da..0e2a381cd5 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -213,6 +213,9 @@ func (s *LocalFileStorageService) Sync(ctx context.Context) error { } func (s *LocalFileStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { + if s.config.EnableExpiry { + return daprovider.DiscardAfterDataTimeout, nil + } return daprovider.KeepForever, nil } From 1f67c39d19b85d5fd9610178766326b5ba244747 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Tue, 18 Jun 2024 11:49:58 -0700 Subject: [PATCH 10/21] Add missed error return --- das/local_file_storage_service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 0e2a381cd5..6b0a5f0070 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -437,14 +437,14 @@ func (tl *trieLayout) prune(pruneTil time.Time) error { if stat.Nlink == 1 { err = recursivelyDeleteUntil(pathByHash, byDataHash) if err != nil { - + return err } } pruned++ } if pruned > 0 { - log.Info("local file store pruned expired batches", "count", pruned, "pruneTil", pruneTil, "duration", time.Since(pruningStart)) + log.Info("Local file store pruned expired batches", "count", pruned, "pruneTil", pruneTil, "duration", time.Since(pruningStart)) } return nil } From 75a88e3c3cf1ca80cc7d33ef7a1723761442ddaa Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Mon, 8 Jul 2024 19:50:06 +0530 Subject: [PATCH 11/21] Feed Client should log loud error when rate limited --- broadcastclient/broadcastclient.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index 1167dba133..a0348b7f9c 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbutil" m "github.com/offchainlabs/nitro/broadcaster/message" @@ -292,6 +293,11 @@ func (bc *BroadcastClient) connect(ctx context.Context, nextSeqNum arbutil.Messa return nil, err } if err != nil { + var httpError rpc.HTTPError + if errors.As(err, &httpError) { + if httpError.StatusCode == 429 { + log.Error("rate limit exceeded, please run own local relay because too many nodes are connecting to feed from same IP address", "err", err) + } return nil, fmt.Errorf("broadcast client unable to connect: %w", err) } if config.RequireChainId && !foundChainId { From c1284529b82c552118fa59a8c6441fc6e316c46c Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Mon, 8 Jul 2024 20:03:35 +0530 Subject: [PATCH 12/21] fix --- broadcastclient/broadcastclient.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index a0348b7f9c..d4135ed75f 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -294,10 +294,9 @@ func (bc *BroadcastClient) connect(ctx context.Context, nextSeqNum arbutil.Messa } if err != nil { var httpError rpc.HTTPError - if errors.As(err, &httpError) { - if httpError.StatusCode == 429 { - log.Error("rate limit exceeded, please run own local relay because too many nodes are connecting to feed from same IP address", "err", err) - } + if errors.As(err, &httpError) && httpError.StatusCode == 429 { + log.Error("rate limit exceeded, please run own local relay because too many nodes are connecting to feed from same IP address", "err", err) + } return nil, fmt.Errorf("broadcast client unable to connect: %w", err) } if config.RequireChainId && !foundChainId { From c205b67f99e9cd6f038f521cf74a5bae98661f4c Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 9 Jul 2024 16:17:27 +0530 Subject: [PATCH 13/21] Changes based on PR comments --- broadcastclient/broadcastclient.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index d4135ed75f..2225341560 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -25,8 +25,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/rpc" - "github.com/offchainlabs/nitro/arbutil" m "github.com/offchainlabs/nitro/broadcaster/message" "github.com/offchainlabs/nitro/util/contracts" @@ -293,8 +291,8 @@ func (bc *BroadcastClient) connect(ctx context.Context, nextSeqNum arbutil.Messa return nil, err } if err != nil { - var httpError rpc.HTTPError - if errors.As(err, &httpError) && httpError.StatusCode == 429 { + connectionRejectedError := &ws.ConnectionRejectedError{} + if errors.As(err, &connectionRejectedError) && connectionRejectedError.StatusCode() == 429 { log.Error("rate limit exceeded, please run own local relay because too many nodes are connecting to feed from same IP address", "err", err) } return nil, fmt.Errorf("broadcast client unable to connect: %w", err) From 1cb907ca700ffdd98408b14a5891bea9aa117afe Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 9 Jul 2024 09:51:53 -0600 Subject: [PATCH 14/21] Drop nitro p2p config options --- cmd/nitro-val/config.go | 3 --- cmd/nitro-val/nitro_val.go | 1 - cmd/nitro/nitro.go | 4 ---- 3 files changed, 8 deletions(-) diff --git a/cmd/nitro-val/config.go b/cmd/nitro-val/config.go index b52a1c6b5e..2adbe5e9aa 100644 --- a/cmd/nitro-val/config.go +++ b/cmd/nitro-val/config.go @@ -27,7 +27,6 @@ type ValidationNodeConfig struct { HTTP genericconf.HTTPConfig `koanf:"http"` WS genericconf.WSConfig `koanf:"ws"` IPC genericconf.IPCConfig `koanf:"ipc"` - P2P genericconf.P2PConfig `koanf:"p2p"` Auth genericconf.AuthRPCConfig `koanf:"auth"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` @@ -67,7 +66,6 @@ var ValidationNodeConfigDefault = ValidationNodeConfig{ HTTP: HTTPConfigDefault, WS: WSConfigDefault, IPC: IPCConfigDefault, - P2P: genericconf.P2PConfigDefault, Auth: genericconf.AuthRPCConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, @@ -87,7 +85,6 @@ func ValidationNodeConfigAddOptions(f *flag.FlagSet) { genericconf.WSConfigAddOptions("ws", f) genericconf.IPCConfigAddOptions("ipc", f) genericconf.AuthRPCConfigAddOptions("auth", f) - genericconf.P2PConfigAddOptions("p2p", f) f.Bool("metrics", ValidationNodeConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) f.Bool("pprof", ValidationNodeConfigDefault.PProf, "enable pprof") diff --git a/cmd/nitro-val/nitro_val.go b/cmd/nitro-val/nitro_val.go index 1e894336ea..6f5f546430 100644 --- a/cmd/nitro-val/nitro_val.go +++ b/cmd/nitro-val/nitro_val.go @@ -70,7 +70,6 @@ func mainImpl() int { nodeConfig.WS.Apply(&stackConf) nodeConfig.Auth.Apply(&stackConf) nodeConfig.IPC.Apply(&stackConf) - nodeConfig.P2P.Apply(&stackConf) vcsRevision, strippedRevision, vcsTime := confighelpers.GetVersion() stackConf.Version = strippedRevision diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 1c4ad80186..572e6d2f06 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -183,7 +183,6 @@ func mainImpl() int { if nodeConfig.WS.ExposeAll { stackConf.WSModules = append(stackConf.WSModules, "personal") } - nodeConfig.P2P.Apply(&stackConf) vcsRevision, strippedRevision, vcsTime := confighelpers.GetVersion() stackConf.Version = strippedRevision @@ -717,7 +716,6 @@ type NodeConfig struct { IPC genericconf.IPCConfig `koanf:"ipc"` Auth genericconf.AuthRPCConfig `koanf:"auth"` GraphQL genericconf.GraphQLConfig `koanf:"graphql"` - P2P genericconf.P2PConfig `koanf:"p2p"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` PProf bool `koanf:"pprof"` @@ -743,7 +741,6 @@ var NodeConfigDefault = NodeConfig{ IPC: genericconf.IPCConfigDefault, Auth: genericconf.AuthRPCConfigDefault, GraphQL: genericconf.GraphQLConfigDefault, - P2P: genericconf.P2PConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, Init: conf.InitConfigDefault, @@ -768,7 +765,6 @@ func NodeConfigAddOptions(f *flag.FlagSet) { genericconf.WSConfigAddOptions("ws", f) genericconf.IPCConfigAddOptions("ipc", f) genericconf.AuthRPCConfigAddOptions("auth", f) - genericconf.P2PConfigAddOptions("p2p", f) genericconf.GraphQLConfigAddOptions("graphql", f) f.Bool("metrics", NodeConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) From 77ed71604595198438c444fbd2d1b91176200c07 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 9 Jul 2024 13:38:29 -0600 Subject: [PATCH 15/21] Drop P2PConfig struct --- cmd/genericconf/server.go | 61 --------------------------------------- 1 file changed, 61 deletions(-) diff --git a/cmd/genericconf/server.go b/cmd/genericconf/server.go index 9b8acd5f71..0b80b74594 100644 --- a/cmd/genericconf/server.go +++ b/cmd/genericconf/server.go @@ -8,9 +8,7 @@ import ( flag "github.com/spf13/pflag" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p/enode" ) type HTTPConfig struct { @@ -189,65 +187,6 @@ func AuthRPCConfigAddOptions(prefix string, f *flag.FlagSet) { f.StringSlice(prefix+".api", AuthRPCConfigDefault.API, "APIs offered over the AUTH-RPC interface") } -type P2PConfig struct { - ListenAddr string `koanf:"listen-addr"` - NoDial bool `koanf:"no-dial"` - NoDiscovery bool `koanf:"no-discovery"` - MaxPeers int `koanf:"max-peers"` - DiscoveryV5 bool `koanf:"discovery-v5"` - DiscoveryV4 bool `koanf:"discovery-v4"` - Bootnodes []string `koanf:"bootnodes"` - BootnodesV5 []string `koanf:"bootnodes-v5"` -} - -func (p P2PConfig) Apply(stackConf *node.Config) { - stackConf.P2P.ListenAddr = p.ListenAddr - stackConf.P2P.NoDial = p.NoDial - stackConf.P2P.NoDiscovery = p.NoDiscovery - stackConf.P2P.MaxPeers = p.MaxPeers - stackConf.P2P.DiscoveryV5 = p.DiscoveryV5 - stackConf.P2P.DiscoveryV4 = p.DiscoveryV4 - stackConf.P2P.BootstrapNodes = parseBootnodes(p.Bootnodes) - stackConf.P2P.BootstrapNodesV5 = parseBootnodes(p.BootnodesV5) -} - -func parseBootnodes(urls []string) []*enode.Node { - nodes := make([]*enode.Node, 0, len(urls)) - for _, url := range urls { - if url != "" { - node, err := enode.Parse(enode.ValidSchemes, url) - if err != nil { - log.Crit("Bootstrap URL invalid", "enode", url, "err", err) - return nil - } - nodes = append(nodes, node) - } - } - return nodes -} - -var P2PConfigDefault = P2PConfig{ - ListenAddr: "", - NoDial: true, - NoDiscovery: true, - MaxPeers: 50, - DiscoveryV5: false, - DiscoveryV4: false, - Bootnodes: []string{}, - BootnodesV5: []string{}, -} - -func P2PConfigAddOptions(prefix string, f *flag.FlagSet) { - f.String(prefix+".listen-addr", P2PConfigDefault.ListenAddr, "P2P listen address") - f.Bool(prefix+".no-dial", P2PConfigDefault.NoDial, "P2P no dial") - f.Bool(prefix+".no-discovery", P2PConfigDefault.NoDiscovery, "P2P no discovery") - f.Int(prefix+".max-peers", P2PConfigDefault.MaxPeers, "P2P max peers") - f.Bool(prefix+".discovery-v5", P2PConfigDefault.DiscoveryV5, "P2P discovery v5") - f.Bool(prefix+".discovery-v4", P2PConfigDefault.DiscoveryV4, "P2P discovery v4") - f.StringSlice(prefix+".bootnodes", P2PConfigDefault.Bootnodes, "P2P bootnodes") - f.StringSlice(prefix+".bootnodes-v5", P2PConfigDefault.BootnodesV5, "P2P bootnodes v5") -} - type MetricsServerConfig struct { Addr string `koanf:"addr"` Port int `koanf:"port"` From c8307db86c6b264fbc3a1f3509cd7faaeb4e5eef Mon Sep 17 00:00:00 2001 From: Tsahi Zidenberg Date: Tue, 9 Jul 2024 16:27:36 -0600 Subject: [PATCH 16/21] export redis-url for valnode --- validator/valnode/redis/consumer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 016f30bd61..84f597c095 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -149,6 +149,7 @@ var TestValidationServerConfig = ValidationServerConfig{ func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") + f.String(prefix+".redis-url", DefaultValidationServerConfig.RedisURL, "url of redis server") f.Duration(prefix+".stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") } From 2ffb416b64682151412e405a4ab9e7ece016f414 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 9 Jul 2024 19:12:46 -0600 Subject: [PATCH 17/21] Uppercase AS keyword in Dockerfile --- Dockerfile | 48 ++++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/Dockerfile b/Dockerfile index 37c1020a42..91c1f46250 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM debian:bookworm-slim as brotli-wasm-builder +FROM debian:bookworm-slim AS brotli-wasm-builder WORKDIR /workspace RUN apt-get update && \ apt-get install -y cmake make git lbzip2 python3 xz-utils && \ @@ -10,10 +10,10 @@ COPY scripts/build-brotli.sh scripts/ COPY brotli brotli RUN cd emsdk && . ./emsdk_env.sh && cd .. && ./scripts/build-brotli.sh -w -t /workspace/install/ -FROM scratch as brotli-wasm-export +FROM scratch AS brotli-wasm-export COPY --from=brotli-wasm-builder /workspace/install/ / -FROM debian:bookworm-slim as brotli-library-builder +FROM debian:bookworm-slim AS brotli-library-builder WORKDIR /workspace COPY scripts/build-brotli.sh scripts/ COPY brotli brotli @@ -21,10 +21,10 @@ RUN apt-get update && \ apt-get install -y cmake make gcc git && \ ./scripts/build-brotli.sh -l -t /workspace/install/ -FROM scratch as brotli-library-export +FROM scratch AS brotli-library-export COPY --from=brotli-library-builder /workspace/install/ / -FROM node:18-bookworm-slim as contracts-builder +FROM node:18-bookworm-slim AS contracts-builder RUN apt-get update && \ apt-get install -y git python3 make g++ curl RUN curl -L https://foundry.paradigm.xyz | bash && . ~/.bashrc && ~/.foundry/bin/foundryup @@ -35,11 +35,11 @@ COPY contracts contracts/ COPY Makefile . RUN . ~/.bashrc && NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-solidity -FROM debian:bookworm-20231218 as wasm-base +FROM debian:bookworm-20231218 AS wasm-base WORKDIR /workspace RUN apt-get update && apt-get install -y curl build-essential=12.9 -FROM wasm-base as wasm-libs-builder +FROM wasm-base AS wasm-libs-builder # clang / lld used by soft-float wasm RUN apt-get update && \ apt-get install -y clang=1:14.0-55.7~deb12u1 lld=1:14.0-55.7~deb12u1 wabt @@ -59,10 +59,10 @@ COPY --from=brotli-wasm-export / target/ RUN apt-get update && apt-get install -y cmake RUN . ~/.cargo/env && NITRO_BUILD_IGNORE_TIMESTAMPS=1 RUSTFLAGS='-C symbol-mangling-version=v0' make build-wasm-libs -FROM scratch as wasm-libs-export +FROM scratch AS wasm-libs-export COPY --from=wasm-libs-builder /workspace/ / -FROM wasm-base as wasm-bin-builder +FROM wasm-base AS wasm-bin-builder # pinned go version RUN curl -L https://golang.org/dl/go1.21.10.linux-`dpkg --print-architecture`.tar.gz | tar -C /usr/local -xzf - COPY ./Makefile ./go.mod ./go.sum ./ @@ -91,7 +91,7 @@ COPY --from=contracts-builder workspace/contracts/node_modules/@offchainlabs/upg COPY --from=contracts-builder workspace/.make/ .make/ RUN PATH="$PATH:/usr/local/go/bin" NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-wasm-bin -FROM rust:1.75-slim-bookworm as prover-header-builder +FROM rust:1.75-slim-bookworm AS prover-header-builder WORKDIR /workspace RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ @@ -113,10 +113,10 @@ COPY brotli brotli RUN apt-get update && apt-get install -y cmake RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-prover-header -FROM scratch as prover-header-export +FROM scratch AS prover-header-export COPY --from=prover-header-builder /workspace/target/ / -FROM rust:1.75-slim-bookworm as prover-builder +FROM rust:1.75-slim-bookworm AS prover-builder WORKDIR /workspace RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ @@ -156,10 +156,10 @@ RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-prover-lib RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-prover-bin RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-jit -FROM scratch as prover-export +FROM scratch AS prover-export COPY --from=prover-builder /workspace/target/ / -FROM debian:bookworm-slim as module-root-calc +FROM debian:bookworm-slim AS module-root-calc WORKDIR /workspace RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ @@ -181,7 +181,7 @@ COPY ./solgen ./solgen COPY ./contracts ./contracts RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-replay-env -FROM debian:bookworm-slim as machine-versions +FROM debian:bookworm-slim AS machine-versions RUN apt-get update && apt-get install -y unzip wget curl WORKDIR /workspace/machines # Download WAVM machines @@ -206,7 +206,7 @@ COPY ./scripts/download-machine.sh . #RUN ./download-machine.sh consensus-v20 0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4 RUN ./download-machine.sh consensus-v30 0xb0de9cb89e4d944ae6023a3b62276e54804c242fd8c4c2d8e6cc4450f5fa8b1b && true -FROM golang:1.21.10-bookworm as node-builder +FROM golang:1.21.10-bookworm AS node-builder WORKDIR /workspace ARG version="" ARG datetime="" @@ -233,17 +233,17 @@ RUN mkdir -p target/bin COPY .nitro-tag.txt /nitro-tag.txt RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build -FROM node-builder as fuzz-builder +FROM node-builder AS fuzz-builder RUN mkdir fuzzers/ RUN ./scripts/fuzz.bash --build --binary-path /workspace/fuzzers/ -FROM debian:bookworm-slim as nitro-fuzzer +FROM debian:bookworm-slim AS nitro-fuzzer COPY --from=fuzz-builder /workspace/fuzzers/*.fuzz /usr/local/bin/ COPY ./scripts/fuzz.bash /usr/local/bin RUN mkdir /fuzzcache ENTRYPOINT [ "/usr/local/bin/fuzz.bash", "FuzzStateTransition", "--binary-path", "/usr/local/bin/", "--fuzzcache-path", "/fuzzcache" ] -FROM debian:bookworm-slim as nitro-node-slim +FROM debian:bookworm-slim AS nitro-node-slim WORKDIR /home/user COPY --from=node-builder /workspace/target/bin/nitro /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/relay /usr/local/bin/ @@ -271,9 +271,9 @@ USER user WORKDIR /home/user/ ENTRYPOINT [ "/usr/local/bin/nitro" ] -FROM offchainlabs/nitro-node:v2.3.4-rc.5-b4cc111 as nitro-legacy +FROM offchainlabs/nitro-node:v2.3.4-rc.5-b4cc111 AS nitro-legacy -FROM nitro-node-slim as nitro-node +FROM nitro-node-slim AS nitro-node USER root COPY --from=prover-export /bin/jit /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/daserver /usr/local/bin/ @@ -293,7 +293,7 @@ ENTRYPOINT [ "/usr/local/bin/nitro" , "--validation.wasm.allowed-wasm-module-roo USER user -FROM nitro-node as nitro-node-validator +FROM nitro-node AS nitro-node-validator USER root COPY --from=nitro-legacy /usr/local/bin/nitro-val /home/user/nitro-legacy/bin/nitro-val COPY --from=nitro-legacy /usr/local/bin/jit /home/user/nitro-legacy/bin/jit @@ -305,7 +305,7 @@ COPY scripts/split-val-entry.sh /usr/local/bin ENTRYPOINT [ "/usr/local/bin/split-val-entry.sh" ] USER user -FROM nitro-node-validator as nitro-node-dev +FROM nitro-node-validator AS nitro-node-dev USER root # Copy in latest WASM module root RUN rm -f /home/user/target/machines/latest @@ -329,5 +329,5 @@ RUN export DEBIAN_FRONTEND=noninteractive && \ USER user -FROM nitro-node as nitro-node-default +FROM nitro-node AS nitro-node-default # Just to ensure nitro-node-dist is default From dd77b2284f399ff266ab4855d48386d5969d06a6 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 9 Jul 2024 19:29:30 -0600 Subject: [PATCH 18/21] Try to fix docker artifact name in CI --- .github/workflows/docker.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 30ad88d91a..2aacf32f00 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -64,12 +64,12 @@ jobs: run: | cd nitro-testnode ./test-node.bash --init --dev & - + - name: Wait for rpc to come up shell: bash run: | ${{ github.workspace }}/.github/workflows/waitForNitro.sh - + - name: Print WAVM module root id: module-root run: | @@ -77,7 +77,7 @@ jobs: # We work around this by piping a tarball through stdout docker run --rm --entrypoint tar localhost:5000/nitro-node-dev:latest -cf - target/machines/latest | tar xf - module_root="$(cat "target/machines/latest/module-root.txt")" - echo "name=module-root=$module_root" >> $GITHUB_STATE + echo "module-root=$module_root" >> "$GITHUB_OUTPUT" echo -e "\x1b[1;34mWAVM module root:\x1b[0m $module_root" - name: Upload WAVM machine as artifact From e048bf2bc3e27a95c02ddfdc3c25f9ccda281457 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 9 Jul 2024 21:29:54 -0600 Subject: [PATCH 19/21] Add workflow_dispatch to gh actions merge-checks.yml --- .github/workflows/merge-checks.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/merge-checks.yml b/.github/workflows/merge-checks.yml index 6f291bbb22..be79520da4 100644 --- a/.github/workflows/merge-checks.yml +++ b/.github/workflows/merge-checks.yml @@ -1,6 +1,7 @@ name: Merge Checks on: + workflow_dispatch: pull_request: branches: [ master ] types: [synchronize, opened, reopened, labeled, unlabeled] From b9d7f87c325c1f3c110142010676cca05878a462 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Tue, 9 Jul 2024 23:59:56 -0600 Subject: [PATCH 20/21] Only run "design approved" check when necessary --- .github/workflows/merge-checks.yml | 40 ++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/.github/workflows/merge-checks.yml b/.github/workflows/merge-checks.yml index be79520da4..c50b28ada1 100644 --- a/.github/workflows/merge-checks.yml +++ b/.github/workflows/merge-checks.yml @@ -1,21 +1,41 @@ name: Merge Checks on: - workflow_dispatch: pull_request: branches: [ master ] types: [synchronize, opened, reopened, labeled, unlabeled] +permissions: + statuses: write + jobs: - design-approved-check: - if: ${{ !contains(github.event.*.labels.*.name, 'design-approved') }} - name: Design Approved Check + check-design-approved: + name: Check if Design Approved runs-on: ubuntu-latest steps: - - name: Check for design-approved label + - name: Check if design approved and update status run: | - echo "Pull request is missing the 'design-approved' label" - echo "This workflow fails so that the pull request cannot be merged" - exit 1 - - + set -x pipefail + status_state="pending" + if ${{ contains(github.event.*.labels.*.name, 'design-approved') }}; then + status_state="success" + else + resp="$(curl -sSL --fail-with-body \ + -H "Accept: application/vnd.github+json" \ + -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + "https://api.github.com/repos/$GITHUB_REPOSITORY/commits/${{ github.event.pull_request.head.sha }}/statuses")" + if ! jq -e '.[] | select(.context == "Design Approved Check")' > /dev/null <<< "$resp"; then + # Design not approved yet and no status exists + # Keep it without a status to keep the green checkmark appearing + # Merging will still be blocked until the required status appears + exit 0 + fi + fi + curl -sSL --fail-with-body \ + -X POST \ + -H "Accept: application/vnd.github+json" \ + -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + "https://api.github.com/repos/$GITHUB_REPOSITORY/statuses/${{ github.event.pull_request.head.sha }}" \ + -d '{"context":"Design Approved Check","state":"'"$status_state"'"}' From 6e416f405125786ee088fc3b25a5cae0576249c1 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Wed, 10 Jul 2024 00:01:31 -0600 Subject: [PATCH 21/21] Clarify comment --- .github/workflows/merge-checks.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/merge-checks.yml b/.github/workflows/merge-checks.yml index c50b28ada1..b729df2b26 100644 --- a/.github/workflows/merge-checks.yml +++ b/.github/workflows/merge-checks.yml @@ -28,6 +28,7 @@ jobs: if ! jq -e '.[] | select(.context == "Design Approved Check")' > /dev/null <<< "$resp"; then # Design not approved yet and no status exists # Keep it without a status to keep the green checkmark appearing + # Otherwise, the commit and PR's CI will appear to be indefinitely pending # Merging will still be blocked until the required status appears exit 0 fi