diff --git a/api/clients/accountant.go b/api/clients/accountant.go index 5121fea65..ca8a2fa84 100644 --- a/api/clients/accountant.go +++ b/api/clients/accountant.go @@ -30,7 +30,7 @@ type Accountant struct { usageLock sync.Mutex cumulativePayment *big.Int - // number of bins in the circular accounting, restricted by minNumBins which is 3 + // number of bins in the circular accounting, restricted by MinNumPeriods which is 3 numBins uint32 } @@ -56,7 +56,7 @@ func NewAccountant(accountID string, reservation *core.ActiveReservation, onDema minNumSymbols: minNumSymbols, reservationPeriodRecords: reservationPeriodRecords, cumulativePayment: big.NewInt(0), - numBins: max(numBins, uint32(meterer.MinNumBins)), + numBins: max(numBins, uint32(meterer.MinNumPeriods)), } // TODO: add a routine to refresh the on-chain state occasionally? return &a diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go index 3aa08ef4d..832000bc7 100644 --- a/core/meterer/meterer_test.go +++ b/core/meterer/meterer_test.go @@ -137,6 +137,7 @@ func setup(_ *testing.M) { reservationTableName, ondemandTableName, globalReservationTableName, + uint64(100), logger, ) diff --git a/core/meterer/offchain_store.go b/core/meterer/offchain_store.go index 09cbc8597..5204a3c72 100644 --- a/core/meterer/offchain_store.go +++ b/core/meterer/offchain_store.go @@ -18,7 +18,6 @@ import ( ) const MinNumPeriods int32 = 3 -const MaxNumOnDemandPayments int32 = 100 type OffchainStore struct { dynamoClient commondynamodb.Client @@ -35,6 +34,7 @@ func NewOffchainStore( reservationTableName string, onDemandTableName string, globalBinTableName string, + maxOnDemandStorage uint64, logger logging.Logger, ) (OffchainStore, error) { @@ -63,6 +63,7 @@ func NewOffchainStore( onDemandTableName: onDemandTableName, globalBinTableName: globalBinTableName, logger: logger, + MaxOnDemandStorage: maxOnDemandStorage, }, nil } diff --git a/core/meterer/onchain_state.go b/core/meterer/onchain_state.go index 247d8e845..1f3e27e9f 100644 --- a/core/meterer/onchain_state.go +++ b/core/meterer/onchain_state.go @@ -121,7 +121,7 @@ func (pcs *OnchainPaymentState) RefreshOnchainPaymentState(ctx context.Context) accountIDs = append(accountIDs, accountID) } - activeReservations, err := tx.GetActiveReservations(ctx, accountIDs) + activeReservations, err := pcs.tx.GetActiveReservations(ctx, accountIDs) if err != nil { return err } @@ -134,7 +134,7 @@ func (pcs *OnchainPaymentState) RefreshOnchainPaymentState(ctx context.Context) accountIDs = append(accountIDs, accountID) } - onDemandPayments, err := tx.GetOnDemandPayments(ctx, accountIDs) + onDemandPayments, err := pcs.tx.GetOnDemandPayments(ctx, accountIDs) if err != nil { return err } diff --git a/core/meterer/onchain_state_test.go b/core/meterer/onchain_state_test.go index d7fca8484..b7a0c816b 100644 --- a/core/meterer/onchain_state_test.go +++ b/core/meterer/onchain_state_test.go @@ -6,7 +6,6 @@ import ( "testing" "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigenda/core/mock" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" @@ -30,7 +29,7 @@ func TestRefreshOnchainPaymentState(t *testing.T) { ctx := context.Background() mockState.On("RefreshOnchainPaymentState", testifymock.Anything, testifymock.Anything).Return(nil) - err := mockState.RefreshOnchainPaymentState(ctx, ð.Reader{}) + err := mockState.RefreshOnchainPaymentState(ctx) assert.NoError(t, err) } diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 29f4f74a9..5e63d1c78 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -748,7 +748,7 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal mockState := &mock.MockOnchainPaymentState{} mockState.On("RefreshOnchainPaymentState", tmock.Anything).Return(nil).Maybe() - if err := mockState.RefreshOnchainPaymentState(context.Background(), nil); err != nil { + if err := mockState.RefreshOnchainPaymentState(context.Background()); err != nil { panic("failed to make initial query to the on-chain state") } @@ -791,6 +791,7 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal table_names[0], table_names[1], table_names[2], + uint64(100), logger, ) if err != nil { @@ -798,7 +799,7 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal panic("failed to create offchain store") } mt := meterer.NewMeterer(meterer.Config{}, mockState, store, logger) - err = mt.ChainPaymentState.RefreshOnchainPaymentState(context.Background(), nil) + err = mt.ChainPaymentState.RefreshOnchainPaymentState(context.Background()) if err != nil { panic("failed to make initial query to the on-chain state") } diff --git a/disperser/apiserver/server_v2.go b/disperser/apiserver/server_v2.go index b7ecde9f7..d5a04e458 100644 --- a/disperser/apiserver/server_v2.go +++ b/disperser/apiserver/server_v2.go @@ -146,7 +146,8 @@ func (s *DispersalServerV2) Start(ctx context.Context) error { for { select { case <-ticker.C: - if err := s.meterer.OffchainStore.DeleteOldBins(ctx, s.meterer.CurrentReservationPeriod()-1); err != nil { + prevPeriod := s.meterer.CurrentReservationPeriod() - 1 + if err := s.meterer.OffchainStore.DeleteOldPeriods(ctx, prevPeriod); err != nil { s.logger.Error("failed to delete old bins", "err", err) } case <-ctx.Done(): diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index ed291d33f..37b8237e8 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -450,7 +450,7 @@ func newTestServerV2(t *testing.T) *testComponents { mockState.On("GetOnDemandPaymentByAccount", tmock.Anything, tmock.Anything).Return(&core.OnDemandPayment{CumulativePayment: big.NewInt(3864)}, nil) mockState.On("GetOnDemandQuorumNumbers", tmock.Anything).Return([]uint8{0, 1}, nil) - if err := mockState.RefreshOnchainPaymentState(context.Background(), nil); err != nil { + if err := mockState.RefreshOnchainPaymentState(context.Background()); err != nil { panic("failed to make initial query to the on-chain state") } table_names := []string{"reservations_server_" + t.Name(), "ondemand_server_" + t.Name(), "global_server_" + t.Name()} @@ -475,6 +475,7 @@ func newTestServerV2(t *testing.T) *testComponents { table_names[0], table_names[1], table_names[2], + uint64(100), logger, ) if err != nil { diff --git a/disperser/cmd/apiserver/config.go b/disperser/cmd/apiserver/config.go index ca32f712b..7434fec2a 100644 --- a/disperser/cmd/apiserver/config.go +++ b/disperser/cmd/apiserver/config.go @@ -35,7 +35,9 @@ type Config struct { EncodingConfig kzg.KzgConfig EnableRatelimiter bool EnablePaymentMeterer bool - UpdateInterval int + OnchainUpdateInterval int + OffchainPruneInterval int + OffchainMaxOnDemandStorage int ChainReadTimeout int ReservationsTableName string OnDemandTableName string @@ -120,7 +122,9 @@ func NewConfig(ctx *cli.Context) (Config, error) { GlobalRateTableName: ctx.GlobalString(flags.GlobalRateTableName.Name), BucketTableName: ctx.GlobalString(flags.BucketTableName.Name), BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name), - UpdateInterval: ctx.GlobalInt(flags.UpdateInterval.Name), + OnchainUpdateInterval: ctx.GlobalInt(flags.OnchainUpdateInterval.Name), + OffchainPruneInterval: ctx.GlobalInt(flags.OffchainPruneInterval.Name), + OffchainMaxOnDemandStorage: ctx.GlobalInt(flags.OffchainMaxOnDemandStorage.Name), ChainReadTimeout: ctx.GlobalInt(flags.ChainReadTimeout.Name), EthClientConfig: geth.ReadEthClientConfigRPCOnly(ctx), MaxBlobSize: ctx.GlobalInt(flags.MaxBlobSize.Name), diff --git a/disperser/cmd/apiserver/flags/flags.go b/disperser/cmd/apiserver/flags/flags.go index ec5cb6a9f..7a32dea79 100644 --- a/disperser/cmd/apiserver/flags/flags.go +++ b/disperser/cmd/apiserver/flags/flags.go @@ -107,11 +107,25 @@ var ( Value: "global_rate", EnvVar: common.PrefixEnvVar(envVarPrefix, "GLOBAL_RATE_TABLE_NAME"), } - UpdateInterval = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "update-interval"), + OnchainUpdateInterval = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "onchain-update-interval"), Usage: "update interval for refreshing the on-chain state", - Value: 1 * time.Second, - EnvVar: common.PrefixEnvVar(envVarPrefix, "UPDATE_INTERVAL"), + Value: 2 * time.Minute, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ONCHAIN_UPDATE_INTERVAL"), + Required: false, + } + OffchainPruneInterval = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "offchain-prune-interval"), + Usage: "update interval for pruning the off-chain state", + Value: 2 * time.Minute, + EnvVar: common.PrefixEnvVar(envVarPrefix, "OFFCHAIN_PRUNE_INTERVAL"), + Required: false, + } + OffchainMaxOnDemandStorage = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "offchain-max-on-demand-storage"), + Usage: "max number of on-demand payments to store in the off-chain state", + Value: 100, + EnvVar: common.PrefixEnvVar(envVarPrefix, "OFFCHAIN_MAX_ON_DEMAND_STORAGE"), Required: false, } ChainReadTimeout = cli.UintFlag{ diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 825a9d046..06e12b441 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -99,15 +99,16 @@ func RunDisperserServer(ctx *cli.Context) error { var meterer *mt.Meterer if config.EnablePaymentMeterer { mtConfig := mt.Config{ - ChainReadTimeout: time.Duration(config.ChainReadTimeout) * time.Second, - UpdateInterval: time.Duration(config.UpdateInterval) * time.Second, + ChainReadTimeout: time.Duration(config.ChainReadTimeout) * time.Second, + OnchainUpdateInterval: time.Duration(config.OnchainUpdateInterval) * time.Second, + OffchainPruneInterval: time.Duration(config.OffchainPruneInterval) * time.Second, } paymentChainState, err := mt.NewOnchainPaymentState(context.Background(), transactor) if err != nil { return fmt.Errorf("failed to create onchain payment state: %w", err) } - if err := paymentChainState.RefreshOnchainPaymentState(context.Background(), nil); err != nil { + if err := paymentChainState.RefreshOnchainPaymentState(context.Background()); err != nil { return fmt.Errorf("failed to make initial query to the on-chain state: %w", err) } @@ -116,6 +117,7 @@ func RunDisperserServer(ctx *cli.Context) error { config.ReservationsTableName, config.OnDemandTableName, config.GlobalRateTableName, + uint64(config.OffchainMaxOnDemandStorage), logger, ) if err != nil { diff --git a/test/integration_test.go b/test/integration_test.go index 5016f3598..9bc831cfe 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -281,6 +281,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser table_names[0], table_names[1], table_names[2], + uint64(100), logger, ) if err != nil { @@ -288,7 +289,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser } mockState.On("RefreshOnchainPaymentState", mock.Anything).Return(nil).Maybe() - if err := mockState.RefreshOnchainPaymentState(context.Background(), nil); err != nil { + if err := mockState.RefreshOnchainPaymentState(context.Background()); err != nil { panic("failed to make initial query to the on-chain state") }