diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 30ad88d91a..2aacf32f00 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -64,12 +64,12 @@ jobs: run: | cd nitro-testnode ./test-node.bash --init --dev & - + - name: Wait for rpc to come up shell: bash run: | ${{ github.workspace }}/.github/workflows/waitForNitro.sh - + - name: Print WAVM module root id: module-root run: | @@ -77,7 +77,7 @@ jobs: # We work around this by piping a tarball through stdout docker run --rm --entrypoint tar localhost:5000/nitro-node-dev:latest -cf - target/machines/latest | tar xf - module_root="$(cat "target/machines/latest/module-root.txt")" - echo "name=module-root=$module_root" >> $GITHUB_STATE + echo "module-root=$module_root" >> "$GITHUB_OUTPUT" echo -e "\x1b[1;34mWAVM module root:\x1b[0m $module_root" - name: Upload WAVM machine as artifact diff --git a/.github/workflows/merge-checks.yml b/.github/workflows/merge-checks.yml index 6f291bbb22..b729df2b26 100644 --- a/.github/workflows/merge-checks.yml +++ b/.github/workflows/merge-checks.yml @@ -5,16 +5,38 @@ on: branches: [ master ] types: [synchronize, opened, reopened, labeled, unlabeled] +permissions: + statuses: write + jobs: - design-approved-check: - if: ${{ !contains(github.event.*.labels.*.name, 'design-approved') }} - name: Design Approved Check + check-design-approved: + name: Check if Design Approved runs-on: ubuntu-latest steps: - - name: Check for design-approved label + - name: Check if design approved and update status run: | - echo "Pull request is missing the 'design-approved' label" - echo "This workflow fails so that the pull request cannot be merged" - exit 1 - - + set -x pipefail + status_state="pending" + if ${{ contains(github.event.*.labels.*.name, 'design-approved') }}; then + status_state="success" + else + resp="$(curl -sSL --fail-with-body \ + -H "Accept: application/vnd.github+json" \ + -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + "https://api.github.com/repos/$GITHUB_REPOSITORY/commits/${{ github.event.pull_request.head.sha }}/statuses")" + if ! jq -e '.[] | select(.context == "Design Approved Check")' > /dev/null <<< "$resp"; then + # Design not approved yet and no status exists + # Keep it without a status to keep the green checkmark appearing + # Otherwise, the commit and PR's CI will appear to be indefinitely pending + # Merging will still be blocked until the required status appears + exit 0 + fi + fi + curl -sSL --fail-with-body \ + -X POST \ + -H "Accept: application/vnd.github+json" \ + -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + "https://api.github.com/repos/$GITHUB_REPOSITORY/statuses/${{ github.event.pull_request.head.sha }}" \ + -d '{"context":"Design Approved Check","state":"'"$status_state"'"}' diff --git a/Dockerfile b/Dockerfile index 37c1020a42..91c1f46250 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM debian:bookworm-slim as brotli-wasm-builder +FROM debian:bookworm-slim AS brotli-wasm-builder WORKDIR /workspace RUN apt-get update && \ apt-get install -y cmake make git lbzip2 python3 xz-utils && \ @@ -10,10 +10,10 @@ COPY scripts/build-brotli.sh scripts/ COPY brotli brotli RUN cd emsdk && . ./emsdk_env.sh && cd .. && ./scripts/build-brotli.sh -w -t /workspace/install/ -FROM scratch as brotli-wasm-export +FROM scratch AS brotli-wasm-export COPY --from=brotli-wasm-builder /workspace/install/ / -FROM debian:bookworm-slim as brotli-library-builder +FROM debian:bookworm-slim AS brotli-library-builder WORKDIR /workspace COPY scripts/build-brotli.sh scripts/ COPY brotli brotli @@ -21,10 +21,10 @@ RUN apt-get update && \ apt-get install -y cmake make gcc git && \ ./scripts/build-brotli.sh -l -t /workspace/install/ -FROM scratch as brotli-library-export +FROM scratch AS brotli-library-export COPY --from=brotli-library-builder /workspace/install/ / -FROM node:18-bookworm-slim as contracts-builder +FROM node:18-bookworm-slim AS contracts-builder RUN apt-get update && \ apt-get install -y git python3 make g++ curl RUN curl -L https://foundry.paradigm.xyz | bash && . ~/.bashrc && ~/.foundry/bin/foundryup @@ -35,11 +35,11 @@ COPY contracts contracts/ COPY Makefile . RUN . ~/.bashrc && NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-solidity -FROM debian:bookworm-20231218 as wasm-base +FROM debian:bookworm-20231218 AS wasm-base WORKDIR /workspace RUN apt-get update && apt-get install -y curl build-essential=12.9 -FROM wasm-base as wasm-libs-builder +FROM wasm-base AS wasm-libs-builder # clang / lld used by soft-float wasm RUN apt-get update && \ apt-get install -y clang=1:14.0-55.7~deb12u1 lld=1:14.0-55.7~deb12u1 wabt @@ -59,10 +59,10 @@ COPY --from=brotli-wasm-export / target/ RUN apt-get update && apt-get install -y cmake RUN . ~/.cargo/env && NITRO_BUILD_IGNORE_TIMESTAMPS=1 RUSTFLAGS='-C symbol-mangling-version=v0' make build-wasm-libs -FROM scratch as wasm-libs-export +FROM scratch AS wasm-libs-export COPY --from=wasm-libs-builder /workspace/ / -FROM wasm-base as wasm-bin-builder +FROM wasm-base AS wasm-bin-builder # pinned go version RUN curl -L https://golang.org/dl/go1.21.10.linux-`dpkg --print-architecture`.tar.gz | tar -C /usr/local -xzf - COPY ./Makefile ./go.mod ./go.sum ./ @@ -91,7 +91,7 @@ COPY --from=contracts-builder workspace/contracts/node_modules/@offchainlabs/upg COPY --from=contracts-builder workspace/.make/ .make/ RUN PATH="$PATH:/usr/local/go/bin" NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-wasm-bin -FROM rust:1.75-slim-bookworm as prover-header-builder +FROM rust:1.75-slim-bookworm AS prover-header-builder WORKDIR /workspace RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ @@ -113,10 +113,10 @@ COPY brotli brotli RUN apt-get update && apt-get install -y cmake RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-prover-header -FROM scratch as prover-header-export +FROM scratch AS prover-header-export COPY --from=prover-header-builder /workspace/target/ / -FROM rust:1.75-slim-bookworm as prover-builder +FROM rust:1.75-slim-bookworm AS prover-builder WORKDIR /workspace RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ @@ -156,10 +156,10 @@ RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-prover-lib RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-prover-bin RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-jit -FROM scratch as prover-export +FROM scratch AS prover-export COPY --from=prover-builder /workspace/target/ / -FROM debian:bookworm-slim as module-root-calc +FROM debian:bookworm-slim AS module-root-calc WORKDIR /workspace RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ @@ -181,7 +181,7 @@ COPY ./solgen ./solgen COPY ./contracts ./contracts RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build-replay-env -FROM debian:bookworm-slim as machine-versions +FROM debian:bookworm-slim AS machine-versions RUN apt-get update && apt-get install -y unzip wget curl WORKDIR /workspace/machines # Download WAVM machines @@ -206,7 +206,7 @@ COPY ./scripts/download-machine.sh . #RUN ./download-machine.sh consensus-v20 0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4 RUN ./download-machine.sh consensus-v30 0xb0de9cb89e4d944ae6023a3b62276e54804c242fd8c4c2d8e6cc4450f5fa8b1b && true -FROM golang:1.21.10-bookworm as node-builder +FROM golang:1.21.10-bookworm AS node-builder WORKDIR /workspace ARG version="" ARG datetime="" @@ -233,17 +233,17 @@ RUN mkdir -p target/bin COPY .nitro-tag.txt /nitro-tag.txt RUN NITRO_BUILD_IGNORE_TIMESTAMPS=1 make build -FROM node-builder as fuzz-builder +FROM node-builder AS fuzz-builder RUN mkdir fuzzers/ RUN ./scripts/fuzz.bash --build --binary-path /workspace/fuzzers/ -FROM debian:bookworm-slim as nitro-fuzzer +FROM debian:bookworm-slim AS nitro-fuzzer COPY --from=fuzz-builder /workspace/fuzzers/*.fuzz /usr/local/bin/ COPY ./scripts/fuzz.bash /usr/local/bin RUN mkdir /fuzzcache ENTRYPOINT [ "/usr/local/bin/fuzz.bash", "FuzzStateTransition", "--binary-path", "/usr/local/bin/", "--fuzzcache-path", "/fuzzcache" ] -FROM debian:bookworm-slim as nitro-node-slim +FROM debian:bookworm-slim AS nitro-node-slim WORKDIR /home/user COPY --from=node-builder /workspace/target/bin/nitro /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/relay /usr/local/bin/ @@ -271,9 +271,9 @@ USER user WORKDIR /home/user/ ENTRYPOINT [ "/usr/local/bin/nitro" ] -FROM offchainlabs/nitro-node:v2.3.4-rc.5-b4cc111 as nitro-legacy +FROM offchainlabs/nitro-node:v2.3.4-rc.5-b4cc111 AS nitro-legacy -FROM nitro-node-slim as nitro-node +FROM nitro-node-slim AS nitro-node USER root COPY --from=prover-export /bin/jit /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/daserver /usr/local/bin/ @@ -293,7 +293,7 @@ ENTRYPOINT [ "/usr/local/bin/nitro" , "--validation.wasm.allowed-wasm-module-roo USER user -FROM nitro-node as nitro-node-validator +FROM nitro-node AS nitro-node-validator USER root COPY --from=nitro-legacy /usr/local/bin/nitro-val /home/user/nitro-legacy/bin/nitro-val COPY --from=nitro-legacy /usr/local/bin/jit /home/user/nitro-legacy/bin/jit @@ -305,7 +305,7 @@ COPY scripts/split-val-entry.sh /usr/local/bin ENTRYPOINT [ "/usr/local/bin/split-val-entry.sh" ] USER user -FROM nitro-node-validator as nitro-node-dev +FROM nitro-node-validator AS nitro-node-dev USER root # Copy in latest WASM module root RUN rm -f /home/user/target/machines/latest @@ -329,5 +329,5 @@ RUN export DEBIAN_FRONTEND=noninteractive && \ USER user -FROM nitro-node as nitro-node-default +FROM nitro-node AS nitro-node-default # Just to ensure nitro-node-dist is default diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index 1167dba133..2225341560 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/offchainlabs/nitro/arbutil" m "github.com/offchainlabs/nitro/broadcaster/message" "github.com/offchainlabs/nitro/util/contracts" @@ -292,6 +291,10 @@ func (bc *BroadcastClient) connect(ctx context.Context, nextSeqNum arbutil.Messa return nil, err } if err != nil { + connectionRejectedError := &ws.ConnectionRejectedError{} + if errors.As(err, &connectionRejectedError) && connectionRejectedError.StatusCode() == 429 { + log.Error("rate limit exceeded, please run own local relay because too many nodes are connecting to feed from same IP address", "err", err) + } return nil, fmt.Errorf("broadcast client unable to connect: %w", err) } if config.RequireChainId && !foundChainId { diff --git a/cmd/genericconf/server.go b/cmd/genericconf/server.go index 9b8acd5f71..0b80b74594 100644 --- a/cmd/genericconf/server.go +++ b/cmd/genericconf/server.go @@ -8,9 +8,7 @@ import ( flag "github.com/spf13/pflag" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p/enode" ) type HTTPConfig struct { @@ -189,65 +187,6 @@ func AuthRPCConfigAddOptions(prefix string, f *flag.FlagSet) { f.StringSlice(prefix+".api", AuthRPCConfigDefault.API, "APIs offered over the AUTH-RPC interface") } -type P2PConfig struct { - ListenAddr string `koanf:"listen-addr"` - NoDial bool `koanf:"no-dial"` - NoDiscovery bool `koanf:"no-discovery"` - MaxPeers int `koanf:"max-peers"` - DiscoveryV5 bool `koanf:"discovery-v5"` - DiscoveryV4 bool `koanf:"discovery-v4"` - Bootnodes []string `koanf:"bootnodes"` - BootnodesV5 []string `koanf:"bootnodes-v5"` -} - -func (p P2PConfig) Apply(stackConf *node.Config) { - stackConf.P2P.ListenAddr = p.ListenAddr - stackConf.P2P.NoDial = p.NoDial - stackConf.P2P.NoDiscovery = p.NoDiscovery - stackConf.P2P.MaxPeers = p.MaxPeers - stackConf.P2P.DiscoveryV5 = p.DiscoveryV5 - stackConf.P2P.DiscoveryV4 = p.DiscoveryV4 - stackConf.P2P.BootstrapNodes = parseBootnodes(p.Bootnodes) - stackConf.P2P.BootstrapNodesV5 = parseBootnodes(p.BootnodesV5) -} - -func parseBootnodes(urls []string) []*enode.Node { - nodes := make([]*enode.Node, 0, len(urls)) - for _, url := range urls { - if url != "" { - node, err := enode.Parse(enode.ValidSchemes, url) - if err != nil { - log.Crit("Bootstrap URL invalid", "enode", url, "err", err) - return nil - } - nodes = append(nodes, node) - } - } - return nodes -} - -var P2PConfigDefault = P2PConfig{ - ListenAddr: "", - NoDial: true, - NoDiscovery: true, - MaxPeers: 50, - DiscoveryV5: false, - DiscoveryV4: false, - Bootnodes: []string{}, - BootnodesV5: []string{}, -} - -func P2PConfigAddOptions(prefix string, f *flag.FlagSet) { - f.String(prefix+".listen-addr", P2PConfigDefault.ListenAddr, "P2P listen address") - f.Bool(prefix+".no-dial", P2PConfigDefault.NoDial, "P2P no dial") - f.Bool(prefix+".no-discovery", P2PConfigDefault.NoDiscovery, "P2P no discovery") - f.Int(prefix+".max-peers", P2PConfigDefault.MaxPeers, "P2P max peers") - f.Bool(prefix+".discovery-v5", P2PConfigDefault.DiscoveryV5, "P2P discovery v5") - f.Bool(prefix+".discovery-v4", P2PConfigDefault.DiscoveryV4, "P2P discovery v4") - f.StringSlice(prefix+".bootnodes", P2PConfigDefault.Bootnodes, "P2P bootnodes") - f.StringSlice(prefix+".bootnodes-v5", P2PConfigDefault.BootnodesV5, "P2P bootnodes v5") -} - type MetricsServerConfig struct { Addr string `koanf:"addr"` Port int `koanf:"port"` diff --git a/cmd/nitro-val/config.go b/cmd/nitro-val/config.go index b52a1c6b5e..2adbe5e9aa 100644 --- a/cmd/nitro-val/config.go +++ b/cmd/nitro-val/config.go @@ -27,7 +27,6 @@ type ValidationNodeConfig struct { HTTP genericconf.HTTPConfig `koanf:"http"` WS genericconf.WSConfig `koanf:"ws"` IPC genericconf.IPCConfig `koanf:"ipc"` - P2P genericconf.P2PConfig `koanf:"p2p"` Auth genericconf.AuthRPCConfig `koanf:"auth"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` @@ -67,7 +66,6 @@ var ValidationNodeConfigDefault = ValidationNodeConfig{ HTTP: HTTPConfigDefault, WS: WSConfigDefault, IPC: IPCConfigDefault, - P2P: genericconf.P2PConfigDefault, Auth: genericconf.AuthRPCConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, @@ -87,7 +85,6 @@ func ValidationNodeConfigAddOptions(f *flag.FlagSet) { genericconf.WSConfigAddOptions("ws", f) genericconf.IPCConfigAddOptions("ipc", f) genericconf.AuthRPCConfigAddOptions("auth", f) - genericconf.P2PConfigAddOptions("p2p", f) f.Bool("metrics", ValidationNodeConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) f.Bool("pprof", ValidationNodeConfigDefault.PProf, "enable pprof") diff --git a/cmd/nitro-val/nitro_val.go b/cmd/nitro-val/nitro_val.go index 1e894336ea..6f5f546430 100644 --- a/cmd/nitro-val/nitro_val.go +++ b/cmd/nitro-val/nitro_val.go @@ -70,7 +70,6 @@ func mainImpl() int { nodeConfig.WS.Apply(&stackConf) nodeConfig.Auth.Apply(&stackConf) nodeConfig.IPC.Apply(&stackConf) - nodeConfig.P2P.Apply(&stackConf) vcsRevision, strippedRevision, vcsTime := confighelpers.GetVersion() stackConf.Version = strippedRevision diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 1c4ad80186..572e6d2f06 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -183,7 +183,6 @@ func mainImpl() int { if nodeConfig.WS.ExposeAll { stackConf.WSModules = append(stackConf.WSModules, "personal") } - nodeConfig.P2P.Apply(&stackConf) vcsRevision, strippedRevision, vcsTime := confighelpers.GetVersion() stackConf.Version = strippedRevision @@ -717,7 +716,6 @@ type NodeConfig struct { IPC genericconf.IPCConfig `koanf:"ipc"` Auth genericconf.AuthRPCConfig `koanf:"auth"` GraphQL genericconf.GraphQLConfig `koanf:"graphql"` - P2P genericconf.P2PConfig `koanf:"p2p"` Metrics bool `koanf:"metrics"` MetricsServer genericconf.MetricsServerConfig `koanf:"metrics-server"` PProf bool `koanf:"pprof"` @@ -743,7 +741,6 @@ var NodeConfigDefault = NodeConfig{ IPC: genericconf.IPCConfigDefault, Auth: genericconf.AuthRPCConfigDefault, GraphQL: genericconf.GraphQLConfigDefault, - P2P: genericconf.P2PConfigDefault, Metrics: false, MetricsServer: genericconf.MetricsServerConfigDefault, Init: conf.InitConfigDefault, @@ -768,7 +765,6 @@ func NodeConfigAddOptions(f *flag.FlagSet) { genericconf.WSConfigAddOptions("ws", f) genericconf.IPCConfigAddOptions("ipc", f) genericconf.AuthRPCConfigAddOptions("auth", f) - genericconf.P2PConfigAddOptions("p2p", f) genericconf.GraphQLConfigAddOptions("graphql", f) f.Bool("metrics", NodeConfigDefault.Metrics, "enable metrics") genericconf.MetricsServerAddOptions("metrics-server", f) diff --git a/das/das_test.go b/das/das_test.go index c52616fe20..179734c8b1 100644 --- a/das/das_test.go +++ b/das/das_test.go @@ -40,8 +40,9 @@ func testDASStoreRetrieveMultipleInstances(t *testing.T, storageType string) { KeyDir: dbPath, }, LocalFileStorage: LocalFileStorageConfig{ - Enable: enableFileStorage, - DataDir: dbPath, + Enable: enableFileStorage, + DataDir: dbPath, + MaxRetention: DefaultLocalFileStorageConfig.MaxRetention, }, LocalDBStorage: dbConfig, ParentChainNodeURL: "none", @@ -129,8 +130,9 @@ func testDASMissingMessage(t *testing.T, storageType string) { KeyDir: dbPath, }, LocalFileStorage: LocalFileStorageConfig{ - Enable: enableFileStorage, - DataDir: dbPath, + Enable: enableFileStorage, + DataDir: dbPath, + MaxRetention: DefaultLocalFileStorageConfig.MaxRetention, }, LocalDBStorage: dbConfig, ParentChainNodeURL: "none", diff --git a/das/factory.go b/das/factory.go index fd6f60abb2..88abc58a73 100644 --- a/das/factory.go +++ b/das/factory.go @@ -35,7 +35,11 @@ func CreatePersistentStorageService( } if config.LocalFileStorage.Enable { - s, err := NewLocalFileStorageService(config.LocalFileStorage.DataDir) + s, err := NewLocalFileStorageService(config.LocalFileStorage) + if err != nil { + return nil, nil, err + } + err = s.start(ctx) if err != nil { return nil, nil, err } diff --git a/das/local_file_storage_service.go b/das/local_file_storage_service.go index 8be03bcb30..6b0a5f0070 100644 --- a/das/local_file_storage_service.go +++ b/das/local_file_storage_service.go @@ -6,10 +6,17 @@ package das import ( "bytes" "context" - "encoding/base32" "errors" "fmt" + "io" "os" + "path" + "path/filepath" + "regexp" + "strconv" + "strings" + "sync" + "syscall" "time" "github.com/ethereum/go-ethereum/common" @@ -17,64 +24,146 @@ import ( "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/das/dastree" "github.com/offchainlabs/nitro/util/pretty" + "github.com/offchainlabs/nitro/util/stopwaiter" flag "github.com/spf13/pflag" "golang.org/x/sys/unix" ) type LocalFileStorageConfig struct { - Enable bool `koanf:"enable"` - DataDir string `koanf:"data-dir"` + Enable bool `koanf:"enable"` + DataDir string `koanf:"data-dir"` + EnableExpiry bool `koanf:"enable-expiry"` + MaxRetention time.Duration `koanf:"max-retention"` } var DefaultLocalFileStorageConfig = LocalFileStorageConfig{ - DataDir: "", + DataDir: "", + MaxRetention: defaultStorageRetention, } func LocalFileStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".enable", DefaultLocalFileStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a directory of files, one per batch") f.String(prefix+".data-dir", DefaultLocalFileStorageConfig.DataDir, "local data directory") + f.Bool(prefix+".enable-expiry", DefaultLocalFileStorageConfig.EnableExpiry, "enable expiry of batches") + f.Duration(prefix+".max-retention", DefaultLocalFileStorageConfig.MaxRetention, "store requests with expiry times farther in the future than max-retention will be rejected") } type LocalFileStorageService struct { - dataDir string + config LocalFileStorageConfig + + legacyLayout flatLayout + layout trieLayout + + // for testing only + enableLegacyLayout bool + + stopWaiter stopwaiter.StopWaiterSafe +} + +func NewLocalFileStorageService(config LocalFileStorageConfig) (*LocalFileStorageService, error) { + if unix.Access(config.DataDir, unix.W_OK|unix.R_OK) != nil { + return nil, fmt.Errorf("couldn't start LocalFileStorageService, directory '%s' must be readable and writeable", config.DataDir) + } + s := &LocalFileStorageService{ + config: config, + legacyLayout: flatLayout{root: config.DataDir, retention: config.MaxRetention}, + layout: trieLayout{root: config.DataDir, expiryEnabled: config.EnableExpiry}, + } + return s, nil } -func NewLocalFileStorageService(dataDir string) (StorageService, error) { - if unix.Access(dataDir, unix.W_OK|unix.R_OK) != nil { - return nil, fmt.Errorf("couldn't start LocalFileStorageService, directory '%s' must be readable and writeable", dataDir) +// Separate start function +// Tests want to be able to avoid triggering the auto migration +func (s *LocalFileStorageService) start(ctx context.Context) error { + migrated, err := s.layout.migrated() + if err != nil { + return err + } + + if !migrated && !s.enableLegacyLayout { + if err = migrate(&s.legacyLayout, &s.layout); err != nil { + return err + } + } + + if err := s.stopWaiter.Start(ctx, s); err != nil { + return err } - return &LocalFileStorageService{dataDir: dataDir}, nil + if s.config.EnableExpiry && !s.enableLegacyLayout { + err = s.stopWaiter.CallIterativelySafe(func(ctx context.Context) time.Duration { + err = s.layout.prune(time.Now()) + if err != nil { + log.Error("error pruning expired batches", "error", err) + } + return time.Minute * 5 + }) + if err != nil { + return err + } + } + return nil +} + +func (s *LocalFileStorageService) Close(ctx context.Context) error { + return s.stopWaiter.StopAndWait() } func (s *LocalFileStorageService) GetByHash(ctx context.Context, key common.Hash) ([]byte, error) { log.Trace("das.LocalFileStorageService.GetByHash", "key", pretty.PrettyHash(key), "this", s) - pathname := s.dataDir + "/" + EncodeStorageServiceKey(key) - data, err := os.ReadFile(pathname) + var batchPath string + if s.enableLegacyLayout { + batchPath = s.legacyLayout.batchPath(key) + } else { + batchPath = s.layout.batchPath(key) + } + + data, err := os.ReadFile(batchPath) if err != nil { - // Just for backward compatability. - pathname = s.dataDir + "/" + base32.StdEncoding.EncodeToString(key.Bytes()) - data, err = os.ReadFile(pathname) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return nil, ErrNotFound - } - return nil, err + if errors.Is(err, os.ErrNotExist) { + return nil, ErrNotFound } - return data, nil + return nil, err } return data, nil } -func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, timeout uint64) error { - logPut("das.LocalFileStorageService.Store", data, timeout, s) - fileName := EncodeStorageServiceKey(dastree.Hash(data)) - finalPath := s.dataDir + "/" + fileName +func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, expiry uint64) error { + logPut("das.LocalFileStorageService.Store", data, expiry, s) + expiryTime := time.Unix(int64(expiry), 0) + currentTimePlusRetention := time.Now().Add(s.config.MaxRetention) + if expiryTime.After(currentTimePlusRetention) { + return fmt.Errorf("requested expiry time (%v) exceeds current time plus maximum allowed retention period(%v)", expiryTime, currentTimePlusRetention) + } + + key := dastree.Hash(data) + var batchPath string + if !s.enableLegacyLayout { + s.layout.writeMutex.Lock() + defer s.layout.writeMutex.Unlock() + batchPath = s.layout.batchPath(key) + } else { + batchPath = s.legacyLayout.batchPath(key) + } + + err := os.MkdirAll(path.Dir(batchPath), 0o700) + if err != nil { + return fmt.Errorf("failed to create directory %s: %w", path.Dir(batchPath), err) + } // Use a temp file and rename to achieve atomic writes. - f, err := os.CreateTemp(s.dataDir, fileName) + f, err := os.CreateTemp(path.Dir(batchPath), path.Base(batchPath)) if err != nil { return err } + renamed := false + defer func() { + _ = f.Close() + if !renamed { + if err := os.Remove(f.Name()); err != nil { + log.Error("Couldn't clean up temporary file", "file", f.Name()) + } + } + }() err = f.Chmod(0o600) if err != nil { return err @@ -83,29 +172,55 @@ func (s *LocalFileStorageService) Put(ctx context.Context, data []byte, timeout if err != nil { return err } - err = f.Close() - if err != nil { - return err + + // For testing only. When migrating we treat the expiry time of existing flat layout + // files to be the modification time + the max allowed retention. So when creating + // new flat layout files, set their modification time accordingly. + if s.enableLegacyLayout { + tv := syscall.Timeval{ + Sec: int64(expiry - uint64(s.legacyLayout.retention.Seconds())), + Usec: 0, + } + times := []syscall.Timeval{tv, tv} + if err = syscall.Utimes(f.Name(), times); err != nil { + return err + } } - return os.Rename(f.Name(), finalPath) + _, err = os.Stat(batchPath) + if err != nil { + if os.IsNotExist(err) { + if err = os.Rename(f.Name(), batchPath); err != nil { + return err + } + renamed = true + } else { + return err + } + } -} + if !s.enableLegacyLayout && s.layout.expiryEnabled { + if err := createHardLink(batchPath, s.layout.expiryPath(key, expiry)); err != nil { + return fmt.Errorf("couldn't create by-expiry-path index entry: %w", err) + } + } -func (s *LocalFileStorageService) Sync(ctx context.Context) error { return nil } -func (s *LocalFileStorageService) Close(ctx context.Context) error { +func (s *LocalFileStorageService) Sync(ctx context.Context) error { return nil } func (s *LocalFileStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) { + if s.config.EnableExpiry { + return daprovider.DiscardAfterDataTimeout, nil + } return daprovider.KeepForever, nil } func (s *LocalFileStorageService) String() string { - return "LocalFileStorageService(" + s.dataDir + ")" + return "LocalFileStorageService(" + s.config.DataDir + ")" } func (s *LocalFileStorageService) HealthCheck(ctx context.Context) error { @@ -123,3 +238,578 @@ func (s *LocalFileStorageService) HealthCheck(ctx context.Context) error { } return nil } + +func listDir(dir string) ([]string, error) { + d, err := os.Open(dir) + if err != nil { + return nil, err + } + defer d.Close() + + // Read all the directory entries + files, err := d.Readdirnames(-1) + if err != nil { + return nil, err + } + + return files, nil +} + +var hex64Regex = regexp.MustCompile(fmt.Sprintf("^[a-fA-F0-9]{%d}$", common.HashLength*2)) + +func isStorageServiceKey(key string) bool { + return hex64Regex.MatchString(key) +} + +// Copies a file by its contents to a new file, making any directories needed +// in the new file's path. +func copyFile(new, orig string) error { + err := os.MkdirAll(path.Dir(new), 0o700) + if err != nil { + return fmt.Errorf("failed to create directory %s: %w", path.Dir(new), err) + } + + origFile, err := os.Open(orig) + if err != nil { + return fmt.Errorf("failed to open source file: %w", err) + } + defer origFile.Close() + + newFile, err := os.Create(new) + if err != nil { + return fmt.Errorf("failed to create destination file: %w", err) + } + defer newFile.Close() + + _, err = io.Copy(newFile, origFile) + if err != nil { + return fmt.Errorf("failed to copy contents: %w", err) + } + + return nil +} + +// Creates a hard link at new, to orig, making any directories needed in the new link's path. +func createHardLink(orig, new string) error { + err := os.MkdirAll(path.Dir(new), 0o700) + if err != nil { + return err + } + + info, err := os.Stat(new) + if err != nil { + if os.IsNotExist(err) { + err = os.Link(orig, new) + if err != nil { + return err + } + return nil + } else { + return err + } + } + + // Hard link already exists + stat, ok := info.Sys().(*syscall.Stat_t) + if ok && stat.Nlink > 1 { + return nil + } + + return fmt.Errorf("file exists but is not a hard link: %s", new) +} + +// migrate converts a file store from flatLayout to trieLayout. +// It is not thread safe and must be run before Put requests are served. +// The expiry index is only created if expiry is enabled. +func migrate(fl *flatLayout, tl *trieLayout) error { + flIt, err := fl.iterateBatches() + if err != nil { + return err + } + + batch, err := flIt.next() + if errors.Is(err, io.EOF) { + log.Info("No batches in legacy layout detected, skipping migration.") + return nil + } + if err != nil { + return err + } + + if startErr := tl.startMigration(); startErr != nil { + return startErr + } + + migrationStart := time.Now() + var migrated, skipped, removed int + err = func() error { + for ; !errors.Is(err, io.EOF); batch, err = flIt.next() { + if err != nil { + return err + } + + if tl.expiryEnabled && batch.expiry.Before(migrationStart) { + skipped++ + log.Debug("skipping expired batch during migration", "expiry", batch.expiry, "start", migrationStart) + continue // don't migrate expired batches + } + + origPath := fl.batchPath(batch.key) + newPath := tl.batchPath(batch.key) + if err = copyFile(newPath, origPath); err != nil { + return err + } + + if tl.expiryEnabled { + expiryPath := tl.expiryPath(batch.key, uint64(batch.expiry.Unix())) + if err = createHardLink(newPath, expiryPath); err != nil { + return err + } + } + migrated++ + } + + return tl.commitMigration() + }() + if err != nil { + return fmt.Errorf("error migrating local file store layout, retaining old layout: %w", err) + } + + flIt, err = fl.iterateBatches() + if err != nil { + return err + } + for batch, err := flIt.next(); !errors.Is(err, io.EOF); batch, err = flIt.next() { + if err != nil { + log.Warn("local file store migration completed, but error cleaning up old layout, files from that layout are now orphaned", "error", err) + break + } + toRemove := fl.batchPath(batch.key) + err = os.Remove(toRemove) + if err != nil { + log.Warn("local file store migration completed, but error cleaning up file from old layout, file is now orphaned", "file", toRemove, "error", err) + } + removed++ + } + + log.Info("Local file store legacy layout migration complete", "migratedFiles", migrated, "skippedExpiredFiles", skipped, "removedFiles", removed, "duration", time.Since(migrationStart)) + + return nil +} + +func (tl *trieLayout) prune(pruneTil time.Time) error { + tl.writeMutex.Lock() + defer tl.writeMutex.Unlock() + it, err := tl.iterateBatchesByTimestamp(pruneTil) + if err != nil { + return err + } + pruned := 0 + pruningStart := time.Now() + for pathByTimestamp, err := it.next(); !errors.Is(err, io.EOF); pathByTimestamp, err = it.next() { + if err != nil { + return err + } + key, err := DecodeStorageServiceKey(path.Base(pathByTimestamp)) + if err != nil { + return err + } + err = recursivelyDeleteUntil(pathByTimestamp, byExpiryTimestamp) + if err != nil { + log.Error("Couldn't prune expired batch expiry index entry, continuing trying to prune others", "path", pathByTimestamp, "err", err) + } + + pathByHash := tl.batchPath(key) + info, err := os.Stat(pathByHash) + if err != nil { + if os.IsNotExist(err) { + log.Warn("Couldn't find batch to expire, it may have been previously deleted but its by-expiry-timestamp index entry still existed, deleting its index entry and continuing", "path", pathByHash, "indexPath", pathByTimestamp, "err", err) + } else { + log.Error("Couldn't prune expired batch, continuing trying to prune others", "path", pathByHash, "err", err) + } + continue + } + stat, ok := info.Sys().(*syscall.Stat_t) + if !ok { + log.Error("Couldn't convert file stats to Stat_t struct, possible OS or filesystem incompatibility, skipping pruning this batch", "file", pathByHash) + continue + } + if stat.Nlink == 1 { + err = recursivelyDeleteUntil(pathByHash, byDataHash) + if err != nil { + return err + } + } + + pruned++ + } + if pruned > 0 { + log.Info("Local file store pruned expired batches", "count", pruned, "pruneTil", pruneTil, "duration", time.Since(pruningStart)) + } + return nil +} + +func recursivelyDeleteUntil(filePath, until string) error { + err := os.Remove(filePath) + if err != nil { + return err + } + + for filePath = path.Dir(filePath); path.Base(filePath) != until; filePath = path.Dir(filePath) { + err = os.Remove(filePath) + if err != nil { + if !strings.Contains(err.Error(), "directory not empty") { + log.Warn("error cleaning up empty directory when pruning expired batches", "path", filePath, "err", err) + } + break + } + } + return nil +} + +type batchIdentifier struct { + key common.Hash + expiry time.Time +} + +type flatLayout struct { + root string + + retention time.Duration +} + +type flatLayoutIterator struct { + files []string + + layout *flatLayout +} + +func (l *flatLayout) batchPath(key common.Hash) string { + return filepath.Join(l.root, EncodeStorageServiceKey(key)) +} + +type layerFilter func(*[][]string, int) bool + +func noopFilter(*[][]string, int) bool { return true } + +func (l *flatLayout) iterateBatches() (*flatLayoutIterator, error) { + files, err := listDir(l.root) + if err != nil { + return nil, err + } + return &flatLayoutIterator{ + files: files, + layout: l, + }, nil +} + +func (i *flatLayoutIterator) next() (batchIdentifier, error) { + for len(i.files) > 0 { + var f string + f, i.files = i.files[0], i.files[1:] + if !isStorageServiceKey(f) { + continue + } + key, err := DecodeStorageServiceKey(f) + if err != nil { + return batchIdentifier{}, err + } + + fullPath := i.layout.batchPath(key) + stat, err := os.Stat(fullPath) + if err != nil { + return batchIdentifier{}, err + } + + return batchIdentifier{ + key: key, + expiry: stat.ModTime().Add(i.layout.retention), + }, nil + } + return batchIdentifier{}, io.EOF +} + +const ( + byDataHash = "by-data-hash" + byExpiryTimestamp = "by-expiry-timestamp" + migratingSuffix = "-migrating" + expiryDivisor = 10_000 +) + +var expirySecondPartWidth = len(strconv.Itoa(expiryDivisor)) - 1 + +type trieLayout struct { + root string + expiryEnabled bool + + // Is the trieLayout currently being migrated to? + // Controls whether paths include the migratingSuffix. + migrating bool + + // Anything changing the layout (pruning, adding files) must go through + // this mutex. + // Pruning the entire history at statup of Arb Nova as of 2024-06-12 takes + // 5s on my laptop, so the overhead of pruning after startup should be neglibile. + writeMutex sync.Mutex +} + +type trieLayoutIterator struct { + levels [][]string + filters []layerFilter + topDir string + layout *trieLayout +} + +func (l *trieLayout) batchPath(key common.Hash) string { + encodedKey := EncodeStorageServiceKey(key) + firstDir := encodedKey[:2] + secondDir := encodedKey[2:4] + + topDir := byDataHash + if l.migrating { + topDir = topDir + migratingSuffix + } + + return filepath.Join(l.root, topDir, firstDir, secondDir, encodedKey) +} + +func (l *trieLayout) expiryPath(key common.Hash, expiry uint64) string { + encodedKey := EncodeStorageServiceKey(key) + firstDir := fmt.Sprintf("%d", expiry/expiryDivisor) + secondDir := fmt.Sprintf("%0*d", expirySecondPartWidth, expiry%expiryDivisor) + + topDir := byExpiryTimestamp + if l.migrating { + topDir = topDir + migratingSuffix + } + + return filepath.Join(l.root, topDir, firstDir, secondDir, encodedKey) +} + +func (l *trieLayout) iterateBatches() (*trieLayoutIterator, error) { + var firstLevel, secondLevel, files []string + var err error + + // TODO handle stray files that aren't dirs + + firstLevel, err = listDir(filepath.Join(l.root, byDataHash)) + if err != nil { + return nil, err + } + + if len(firstLevel) > 0 { + secondLevel, err = listDir(filepath.Join(l.root, byDataHash, firstLevel[0])) + if err != nil { + return nil, err + } + } + + if len(secondLevel) > 0 { + files, err = listDir(filepath.Join(l.root, byDataHash, firstLevel[0], secondLevel[0])) + if err != nil { + return nil, err + } + } + + storageKeyFilter := func(layers *[][]string, idx int) bool { + return isStorageServiceKey((*layers)[idx][0]) + } + + return &trieLayoutIterator{ + levels: [][]string{firstLevel, secondLevel, files}, + filters: []layerFilter{noopFilter, noopFilter, storageKeyFilter}, + topDir: byDataHash, + layout: l, + }, nil +} + +func (l *trieLayout) iterateBatchesByTimestamp(maxTimestamp time.Time) (*trieLayoutIterator, error) { + var firstLevel, secondLevel, files []string + var err error + + firstLevel, err = listDir(filepath.Join(l.root, byExpiryTimestamp)) + if err != nil { + return nil, err + } + + if len(firstLevel) > 0 { + secondLevel, err = listDir(filepath.Join(l.root, byExpiryTimestamp, firstLevel[0])) + if err != nil { + return nil, err + } + } + + if len(secondLevel) > 0 { + files, err = listDir(filepath.Join(l.root, byExpiryTimestamp, firstLevel[0], secondLevel[0])) + if err != nil { + return nil, err + } + } + + beforeUpper := func(layers *[][]string, idx int) bool { + num, err := strconv.Atoi((*layers)[idx][0]) + if err != nil { + return false + } + return int64(num) <= maxTimestamp.Unix()/expiryDivisor + } + beforeLower := func(layers *[][]string, idx int) bool { + num, err := strconv.Atoi((*layers)[idx-1][0] + (*layers)[idx][0]) + if err != nil { + return false + } + return int64(num) < maxTimestamp.Unix() + } + storageKeyFilter := func(layers *[][]string, idx int) bool { + return isStorageServiceKey((*layers)[idx][0]) + } + + return &trieLayoutIterator{ + levels: [][]string{firstLevel, secondLevel, files}, + filters: []layerFilter{beforeUpper, beforeLower, storageKeyFilter}, + topDir: byExpiryTimestamp, + layout: l, + }, nil +} + +func (l *trieLayout) migrated() (bool, error) { + info, err := os.Stat(filepath.Join(l.root, byDataHash)) + if os.IsNotExist(err) { + return false, nil + } + if err != nil { + return false, err + } + return info.IsDir(), nil +} + +func (l *trieLayout) startMigration() error { + migrated, err := l.migrated() + if err != nil { + return err + } + if migrated { + return errors.New("local file storage already migrated to trieLayout") + } + + l.migrating = true + + if err := os.MkdirAll(filepath.Join(l.root, byDataHash+migratingSuffix), 0o700); err != nil { + return err + } + + if l.expiryEnabled { + if err := os.MkdirAll(filepath.Join(l.root, byExpiryTimestamp+migratingSuffix), 0o700); err != nil { + return err + } + } + return nil + +} + +func (l *trieLayout) commitMigration() error { + if !l.migrating { + return errors.New("already finished migration") + } + + removeSuffix := func(prefix string) error { + oldDir := filepath.Join(l.root, prefix+migratingSuffix) + newDir := filepath.Join(l.root, prefix) + + if err := os.Rename(oldDir, newDir); err != nil { + return err // rename error already includes src and dst, no need to wrap + } + return nil + } + + if err := removeSuffix(byDataHash); err != nil { + return err + } + + if l.expiryEnabled { + if err := removeSuffix(byExpiryTimestamp); err != nil { + return err + } + } + + syscall.Sync() + + // Done migrating + l.migrating = false + + return nil +} + +func (it *trieLayoutIterator) next() (string, error) { + isLeaf := func(idx int) bool { + return idx == len(it.levels)-1 + } + + makePathAtLevel := func(idx int) string { + pathComponents := make([]string, idx+3) + pathComponents[0] = it.layout.root + pathComponents[1] = it.topDir + for i := 0; i <= idx; i++ { + pathComponents[i+2] = it.levels[i][0] + } + return filepath.Join(pathComponents...) + } + + var populateNextLevel func(idx int) error + populateNextLevel = func(idx int) error { + if isLeaf(idx) || len(it.levels[idx]) == 0 { + return nil + } + nextLevelEntries, err := listDir(makePathAtLevel(idx)) + if err != nil { + return err + } + it.levels[idx+1] = nextLevelEntries + if len(nextLevelEntries) > 0 { + return populateNextLevel(idx + 1) + } + return nil + } + + advanceWithinLevel := func(idx int) error { + if len(it.levels[idx]) > 1 { + it.levels[idx] = it.levels[idx][1:] + } else { + it.levels[idx] = nil + } + + return populateNextLevel(idx) + } + + for idx := 0; idx >= 0; { + if len(it.levels[idx]) == 0 { + idx-- + continue + } + + if !it.filters[idx](&it.levels, idx) { + if err := advanceWithinLevel(idx); err != nil { + return "", err + } + continue + } + + if isLeaf(idx) { + path := makePathAtLevel(idx) + if err := advanceWithinLevel(idx); err != nil { + return "", err + } + return path, nil + } + + if len(it.levels[idx+1]) > 0 { + idx++ + continue + } + + if err := advanceWithinLevel(idx); err != nil { + return "", err + } + } + return "", io.EOF +} diff --git a/das/local_file_storage_service_test.go b/das/local_file_storage_service_test.go new file mode 100644 index 0000000000..0b2ba9749d --- /dev/null +++ b/das/local_file_storage_service_test.go @@ -0,0 +1,215 @@ +// Copyright 2024, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package das + +import ( + "bytes" + "context" + "errors" + "io" + "testing" + "time" + + "github.com/offchainlabs/nitro/das/dastree" +) + +func getByHashAndCheck(t *testing.T, s *LocalFileStorageService, xs ...string) { + t.Helper() + ctx := context.Background() + + for _, x := range xs { + actual, err := s.GetByHash(ctx, dastree.Hash([]byte(x))) + Require(t, err) + if !bytes.Equal([]byte(x), actual) { + Fail(t, "unexpected result") + } + } +} + +func countEntries(t *testing.T, layout *trieLayout, expected int) { + t.Helper() + + count := 0 + trIt, err := layout.iterateBatches() + Require(t, err) + for _, err := trIt.next(); !errors.Is(err, io.EOF); _, err = trIt.next() { + Require(t, err) + count++ + } + if count != expected { + Fail(t, "unexpected number of batches", "expected", expected, "was", count) + } +} + +func countTimestampEntries(t *testing.T, layout *trieLayout, cutoff time.Time, expected int) { + t.Helper() + var count int + trIt, err := layout.iterateBatchesByTimestamp(cutoff) + Require(t, err) + for _, err := trIt.next(); !errors.Is(err, io.EOF); _, err = trIt.next() { + Require(t, err) + count++ + } + if count != expected { + Fail(t, "unexpected count of entries when iterating by timestamp", "expected", expected, "was", count) + } +} + +func pruneCountRemaining(t *testing.T, layout *trieLayout, pruneTil time.Time, expected int) { + t.Helper() + err := layout.prune(pruneTil) + Require(t, err) + + countEntries(t, layout, expected) +} + +func TestMigrationNoExpiry(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + + config := LocalFileStorageConfig{ + Enable: true, + DataDir: dir, + EnableExpiry: false, + MaxRetention: time.Hour * 24 * 30, + } + s, err := NewLocalFileStorageService(config) + Require(t, err) + s.enableLegacyLayout = true + + now := uint64(time.Now().Unix()) + + err = s.Put(ctx, []byte("a"), now+1) + Require(t, err) + err = s.Put(ctx, []byte("b"), now+1) + Require(t, err) + err = s.Put(ctx, []byte("c"), now+2) + Require(t, err) + err = s.Put(ctx, []byte("d"), now+10) + Require(t, err) + + getByHashAndCheck(t, s, "a", "b", "c", "d") + + err = migrate(&s.legacyLayout, &s.layout) + Require(t, err) + s.enableLegacyLayout = false + + countEntries(t, &s.layout, 4) + getByHashAndCheck(t, s, "a", "b", "c", "d") + + _, err = s.layout.iterateBatchesByTimestamp(time.Unix(int64(now+10), 0)) + if err == nil { + Fail(t, "can't iterate by timestamp when expiry is disabled") + } +} + +func TestMigrationExpiry(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + + config := LocalFileStorageConfig{ + Enable: true, + DataDir: dir, + EnableExpiry: true, + MaxRetention: time.Hour * 10, + } + s, err := NewLocalFileStorageService(config) + Require(t, err) + s.enableLegacyLayout = true + + now := time.Now() + + // Use increments of expiry divisor in order to span multiple by-expiry-timestamp dirs + err = s.Put(ctx, []byte("a"), uint64(now.Add(-2*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("b"), uint64(now.Add(-1*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("c"), uint64(now.Add(time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("d"), uint64(now.Add(time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("e"), uint64(now.Add(2*time.Second*expiryDivisor).Unix())) + Require(t, err) + + getByHashAndCheck(t, s, "a", "b", "c", "d", "e") + + err = migrate(&s.legacyLayout, &s.layout) + Require(t, err) + s.enableLegacyLayout = false + + countEntries(t, &s.layout, 3) + getByHashAndCheck(t, s, "c", "d", "e") + + afterNow := now.Add(time.Second) + countTimestampEntries(t, &s.layout, afterNow, 0) // They should have all been filtered out since they're after now + countTimestampEntries(t, &s.layout, afterNow.Add(time.Second*expiryDivisor), 2) + countTimestampEntries(t, &s.layout, afterNow.Add(2*time.Second*expiryDivisor), 3) + + pruneCountRemaining(t, &s.layout, afterNow, 3) + getByHashAndCheck(t, s, "c", "d", "e") + + pruneCountRemaining(t, &s.layout, afterNow.Add(time.Second*expiryDivisor), 1) + getByHashAndCheck(t, s, "e") + + pruneCountRemaining(t, &s.layout, afterNow.Add(2*time.Second*expiryDivisor), 0) +} + +func TestExpiryDuplicates(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + + config := LocalFileStorageConfig{ + Enable: true, + DataDir: dir, + EnableExpiry: true, + MaxRetention: time.Hour * 10, + } + s, err := NewLocalFileStorageService(config) + Require(t, err) + + now := time.Now() + + // Use increments of expiry divisor in order to span multiple by-expiry-timestamp dirs + err = s.Put(ctx, []byte("a"), uint64(now.Add(-2*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("a"), uint64(now.Add(-1*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("a"), uint64(now.Add(time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("d"), uint64(now.Add(time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("e"), uint64(now.Add(2*time.Second*expiryDivisor).Unix())) + Require(t, err) + err = s.Put(ctx, []byte("f"), uint64(now.Add(3*time.Second*expiryDivisor).Unix())) + Require(t, err) + // Put the same entry and expiry again, should have no effect + err = s.Put(ctx, []byte("f"), uint64(now.Add(3*time.Second*expiryDivisor).Unix())) + Require(t, err) + + afterNow := now.Add(time.Second) + // "a" is duplicated + countEntries(t, &s.layout, 4) + // There should be a timestamp entry for each time "a" was added + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 6) + + // We've expired the first "a", but there are still 2 other timestamp entries for it + pruneCountRemaining(t, &s.layout, afterNow.Add(-2*time.Second*expiryDivisor), 4) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 5) + + // We've expired the second "a", but there is still 1 other timestamp entry for it + pruneCountRemaining(t, &s.layout, afterNow.Add(-1*time.Second*expiryDivisor), 4) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 4) + + // We've expired the third "a", and also "d" + pruneCountRemaining(t, &s.layout, afterNow.Add(time.Second*expiryDivisor), 2) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 2) + + // We've expired the "e" + pruneCountRemaining(t, &s.layout, afterNow.Add(2*time.Second*expiryDivisor), 1) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 1) + + // We've expired the "f" + pruneCountRemaining(t, &s.layout, afterNow.Add(3*time.Second*expiryDivisor), 0) + countTimestampEntries(t, &s.layout, afterNow.Add(1000*time.Hour), 0) +} diff --git a/das/storage_service.go b/das/storage_service.go index 806e80dba5..b7526077e9 100644 --- a/das/storage_service.go +++ b/das/storage_service.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "strings" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -25,6 +26,8 @@ type StorageService interface { HealthCheck(ctx context.Context) error } +const defaultStorageRetention = time.Hour * 24 * 21 // 6 days longer than the batch poster default + func EncodeStorageServiceKey(key common.Hash) string { return key.Hex()[2:] } diff --git a/das/syncing_fallback_storage.go b/das/syncing_fallback_storage.go index 3f4f2765b5..1cf6a832f3 100644 --- a/das/syncing_fallback_storage.go +++ b/das/syncing_fallback_storage.go @@ -7,7 +7,6 @@ import ( "context" "encoding/binary" "fmt" - "math" "math/big" "os" "sync" @@ -63,26 +62,29 @@ type SyncToStorageConfig struct { IgnoreWriteErrors bool `koanf:"ignore-write-errors"` ParentChainBlocksPerRead uint64 `koanf:"parent-chain-blocks-per-read"` StateDir string `koanf:"state-dir"` + SyncExpiredData bool `koanf:"sync-expired-data"` } var DefaultSyncToStorageConfig = SyncToStorageConfig{ Eager: false, EagerLowerBoundBlock: 0, - RetentionPeriod: time.Duration(math.MaxInt64), + RetentionPeriod: defaultStorageRetention, DelayOnError: time.Second, IgnoreWriteErrors: true, ParentChainBlocksPerRead: 100, StateDir: "", + SyncExpiredData: true, } func SyncToStorageConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".eager", DefaultSyncToStorageConfig.Eager, "eagerly sync batch data to this DAS's storage from the rest endpoints, using L1 as the index of batch data hashes; otherwise only sync lazily") f.Uint64(prefix+".eager-lower-bound-block", DefaultSyncToStorageConfig.EagerLowerBoundBlock, "when eagerly syncing, start indexing forward from this L1 block. Only used if there is no sync state") f.Uint64(prefix+".parent-chain-blocks-per-read", DefaultSyncToStorageConfig.ParentChainBlocksPerRead, "when eagerly syncing, max l1 blocks to read per poll") - f.Duration(prefix+".retention-period", DefaultSyncToStorageConfig.RetentionPeriod, "period to retain synced data (defaults to forever)") + f.Duration(prefix+".retention-period", DefaultSyncToStorageConfig.RetentionPeriod, "period to request storage to retain synced data") f.Duration(prefix+".delay-on-error", DefaultSyncToStorageConfig.DelayOnError, "time to wait if encountered an error before retrying") f.Bool(prefix+".ignore-write-errors", DefaultSyncToStorageConfig.IgnoreWriteErrors, "log only on failures to write when syncing; otherwise treat it as an error") f.String(prefix+".state-dir", DefaultSyncToStorageConfig.StateDir, "directory to store the sync state in, ie the block number currently synced up to, so that we don't sync from scratch each time") + f.Bool(prefix+".sync-expired-data", DefaultSyncToStorageConfig.SyncExpiredData, "sync even data that is expired; needed for mirror configuration") } type l1SyncService struct { @@ -191,7 +193,7 @@ func (s *l1SyncService) processBatchDelivered(ctx context.Context, batchDelivere } log.Info("BatchDelivered", "log", batchDeliveredLog, "event", deliveredEvent) storeUntil := arbmath.SaturatingUAdd(deliveredEvent.TimeBounds.MaxTimestamp, uint64(s.config.RetentionPeriod.Seconds())) - if storeUntil < uint64(time.Now().Unix()) { + if !s.config.SyncExpiredData && storeUntil < uint64(time.Now().Unix()) { // old batch - no need to store return nil } diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 016f30bd61..84f597c095 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -149,6 +149,7 @@ var TestValidationServerConfig = ValidationServerConfig{ func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") + f.String(prefix+".redis-url", DefaultValidationServerConfig.RedisURL, "url of redis server") f.Duration(prefix+".stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") }