Skip to content

Commit

Permalink
refactor: rebase and update
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 25, 2024
1 parent e7f68ea commit 835f1be
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 92 deletions.
66 changes: 39 additions & 27 deletions api/clients/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package clients

import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
Expand All @@ -18,37 +18,40 @@ type IAccountant interface {

type Accountant struct {
// on-chain states
reservation core.ActiveReservation
onDemand core.OnDemandPayment
reservationWindow uint32
pricePerChargeable uint32
minChargeableSize uint32
reservation core.ActiveReservation
onDemand core.OnDemandPayment
reservationWindow uint32
pricePerSymbol uint32
minNumSymbols uint32

// local accounting
// contains 3 bins; 0 for current bin, 1 for next bin, 2 for overflowed bin
// contains 3 bins; index 0 for current bin, 1 for next bin, 2 for overflowed bin
binUsages []uint64
usageLock sync.Mutex
cumulativePayment *big.Int
stopRotation chan struct{}

paymentSigner core.PaymentSigner
}

func NewAccountant(reservation core.ActiveReservation, onDemand core.OnDemandPayment, reservationWindow uint32, pricePerChargeable uint32, minChargeableSize uint32, paymentSigner core.PaymentSigner) Accountant {
func NewAccountant(reservation core.ActiveReservation, onDemand core.OnDemandPayment, reservationWindow uint32, pricePerSymbol uint32, minNumSymbols uint32, paymentSigner core.PaymentSigner) *Accountant {
//TODO: client storage; currently every instance starts fresh but on-chain or a small store makes more sense
// Also client is currently responsible for supplying network params, we need to add RPC in order to be automatic
// There's a subsequent PR that handles populating the accountant with on-chain state from the disperser
a := Accountant{
reservation: reservation,
onDemand: onDemand,
reservationWindow: reservationWindow,
pricePerChargeable: pricePerChargeable,
minChargeableSize: minChargeableSize,
binUsages: []uint64{0, 0, 0},
cumulativePayment: big.NewInt(0),
stopRotation: make(chan struct{}),
paymentSigner: paymentSigner,
reservation: reservation,
onDemand: onDemand,
reservationWindow: reservationWindow,
pricePerSymbol: pricePerSymbol,
minNumSymbols: minNumSymbols,
binUsages: []uint64{0, 0, 0},
cumulativePayment: big.NewInt(0),
stopRotation: make(chan struct{}),
paymentSigner: paymentSigner,
}
go a.startBinRotation()
return a
// TODO: add a routine to refresh the on-chain state occasionally?
return &a
}

func (a *Accountant) startBinRotation() {
Expand All @@ -66,7 +69,9 @@ func (a *Accountant) startBinRotation() {
}

func (a *Accountant) rotateBins() {
// Shift bins: bin_i to bin_{i-1}, add 0 to bin2
a.usageLock.Lock()
defer a.usageLock.Unlock()
// Shift bins: bin_i to bin_{i-1}, set 0 to bin2
a.binUsages[0] = a.binUsages[1]
a.binUsages[1] = a.binUsages[2]
a.binUsages[2] = 0
Expand All @@ -80,6 +85,8 @@ func (a *Accountant) Stop() {
func (a *Accountant) BlobPaymentInfo(ctx context.Context, dataLength uint64) (uint32, *big.Int, error) {
//TODO: do we need to lock the binUsages here in case the blob rotation happens in the middle of the function?
// binUsage := a.binUsages[0] + dataLength
a.usageLock.Lock()
defer a.usageLock.Unlock()
a.binUsages[0] += dataLength
now := time.Now().Unix()
currentBinIndex := meterer.GetBinIndex(uint64(now), a.reservationWindow)
Expand All @@ -99,13 +106,12 @@ func (a *Accountant) BlobPaymentInfo(ctx context.Context, dataLength uint64) (ui
// reservation not available, attempt on-demand
//todo: rollback if disperser respond with some type of rejection?
a.binUsages[0] -= dataLength
incrementRequired := big.NewInt(int64(a.PaymentCharged(uint32(dataLength))))
incrementRequired := big.NewInt(int64(a.PaymentCharged(uint(dataLength))))
a.cumulativePayment.Add(a.cumulativePayment, incrementRequired)
if a.cumulativePayment.Cmp(a.onDemand.CumulativePayment) <= 0 {
return 0, a.cumulativePayment, nil
}
fmt.Println("Accountant cannot approve payment for this blob")
return 0, big.NewInt(0), errors.New("Accountant cannot approve payment for this blob")
return 0, big.NewInt(0), fmt.Errorf("Accountant cannot approve payment for this blob")
}

// accountant provides and records payment information
Expand All @@ -131,12 +137,18 @@ func (a *Accountant) AccountBlob(ctx context.Context, dataLength uint64, quorums
return protoPaymentHeader, signature, nil
}

// TODO: PaymentCharged and SymbolsCharged copied from meterer, should be refactored
// PaymentCharged returns the chargeable price for a given data length
func (a *Accountant) PaymentCharged(dataLength uint32) uint64 {
return uint64(core.RoundUpDivide(uint(a.BlobSizeCharged(dataLength)*a.pricePerChargeable), uint(a.minChargeableSize)))
func (a *Accountant) PaymentCharged(dataLength uint) uint64 {
return uint64(a.SymbolsCharged(dataLength)) * uint64(a.pricePerSymbol)
}

// BlobSizeCharged returns the chargeable data length for a given data length
func (a *Accountant) BlobSizeCharged(dataLength uint32) uint32 {
return max(dataLength, uint32(a.minChargeableSize))
// SymbolsCharged returns the number of symbols charged for a given data length
// being at least MinNumSymbols or the nearest rounded-up multiple of MinNumSymbols.
func (a *Accountant) SymbolsCharged(dataLength uint) uint32 {
if dataLength <= uint(a.minNumSymbols) {
return a.minNumSymbols
}
// Round up to the nearest multiple of MinNumSymbols
return uint32(core.RoundUpDivide(uint(dataLength), uint(a.minNumSymbols))) * a.minNumSymbols
}
68 changes: 34 additions & 34 deletions api/clients/accountant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ func TestNewAccountant(t *testing.T) {
CumulativePayment: big.NewInt(500),
}
reservationWindow := uint32(6)
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
defer accountant.Stop()
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)

assert.NotNil(t, accountant)
assert.Equal(t, reservation, accountant.reservation)
assert.Equal(t, onDemand, accountant.onDemand)
assert.Equal(t, reservationWindow, accountant.reservationWindow)
assert.Equal(t, pricePerChargeable, accountant.pricePerChargeable)
assert.Equal(t, minChargeableSize, accountant.minChargeableSize)
assert.Equal(t, pricePerSymbol, accountant.pricePerSymbol)
assert.Equal(t, minNumSymbols, accountant.minNumSymbols)
assert.Equal(t, []uint64{0, 0, 0}, accountant.binUsages)
assert.Equal(t, big.NewInt(0), accountant.cumulativePayment)
}
Expand All @@ -57,13 +57,13 @@ func TestAccountBlob_Reservation(t *testing.T) {
CumulativePayment: big.NewInt(500),
}
reservationWindow := uint32(5)
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -109,13 +109,13 @@ func TestAccountBlob_OnDemand(t *testing.T) {
CumulativePayment: big.NewInt(500),
}
reservationWindow := uint32(5)
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand All @@ -124,7 +124,7 @@ func TestAccountBlob_OnDemand(t *testing.T) {

header, _, err := accountant.AccountBlob(ctx, dataLength, quorums)
metadata := core.ConvertPaymentHeader(header)
expectedPayment := big.NewInt(int64(dataLength * uint64(pricePerChargeable) / uint64(minChargeableSize)))
expectedPayment := big.NewInt(int64(dataLength * uint64(pricePerSymbol) / uint64(minNumSymbols)))
assert.NoError(t, err)
assert.Equal(t, uint32(0), header.BinIndex)
assert.Equal(t, expectedPayment, metadata.CumulativePayment)
Expand All @@ -138,13 +138,13 @@ func TestAccountBlob_InsufficientOnDemand(t *testing.T) {
CumulativePayment: big.NewInt(500),
}
reservationWindow := uint32(60)
pricePerChargeable := uint32(100)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(100)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand All @@ -169,13 +169,13 @@ func TestAccountBlobCallSeries(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(5)
pricePerChargeable := uint32(100)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(100)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -221,12 +221,12 @@ func TestAccountBlob_BinRotation(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(1) // Set to 1 second for testing
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)
privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -263,13 +263,13 @@ func TestBinRotation(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(1) // Set to 1 second for testing
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -312,13 +312,13 @@ func TestConcurrentBinRotationAndAccountBlob(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(1) // Set to 1 second for testing
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -357,13 +357,13 @@ func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(5)
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()
ctx := context.Background()
quorums := []uint8{0, 1}
Expand Down Expand Up @@ -405,13 +405,13 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(1) // Set to 1 second for testing
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down
5 changes: 3 additions & 2 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ type disperserClient struct {
// or to use a pool of connections here.
conn *grpc.ClientConn
client disperser_rpc.DisperserClient
accountant Accountant
accountant *Accountant
}

var _ DisperserClient = &disperserClient{}

func NewDisperserClient(config *Config, signer core.BlobRequestSigner, accountant Accountant) DisperserClient {
func NewDisperserClient(config *Config, signer core.BlobRequestSigner, accountant *Accountant) DisperserClient {
return &disperserClient{
config: config,
signer: signer,
Expand Down Expand Up @@ -127,6 +127,7 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums
return blobStatus, reply.GetRequestId(), nil
}

// DispersePaidBlob disperses a blob with a payment header and signature. Similar to DisperseBlob but with signed payment header.
func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {
err := c.initOnceGrpcConnection()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (m EigenDAClient) PaidPutBlobAsync(ctx context.Context, data []byte) (resul
}

func (m EigenDAClient) paidPutBlob(ctx context.Context, rawData []byte, resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
m.Log.Info("Attempting to disperse blob to EigenDA")
m.Log.Info("Attempting to disperse blob to EigenDA with payment")

// encode blob
if m.Codec == nil {
Expand All @@ -304,9 +304,9 @@ func (m EigenDAClient) paidPutBlob(ctx context.Context, rawData []byte, resultCh
customQuorumNumbers[i] = uint8(e)
}
// disperse blob
blobStatus, requestID, err := m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers)
blobStatus, requestID, err := m.Client.DispersePaidBlob(ctx, data, customQuorumNumbers)
if err != nil {
errChan <- fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err)
errChan <- fmt.Errorf("error initializing DispersePaidBlob() client: %w", err)
return
}

Expand Down
2 changes: 1 addition & 1 deletion api/clients/eigenda_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func TestPutBlobTotalTimeout(t *testing.T) {

func TestPutBlobNoopSigner(t *testing.T) {
config := clients.NewConfig("nohost", "noport", time.Second, false)
disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner(), clients.Accountant{})
disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner(), &clients.Accountant{})

test := []byte("test")
test[0] = 0x00 // make sure the first byte of the requst is always 0
Expand Down
Loading

0 comments on commit 835f1be

Please sign in to comment.