Skip to content

Commit

Permalink
feat: client construct payment header
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Dec 2, 2024
1 parent a36ddb9 commit 9982bf1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 46 deletions.
8 changes: 3 additions & 5 deletions api/clients/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (
"sync"
"time"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/meterer"
)

var requiredQuorums = []uint8{0, 1}

type Accountant interface {
AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*commonpb.PaymentHeader, error)
AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*core.PaymentMetadata, error)
}

var _ Accountant = &accountant{}
Expand Down Expand Up @@ -116,7 +115,7 @@ func (a *accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quo
}

// AccountBlob accountant provides and records payment information
func (a *accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*commonpb.PaymentHeader, error) {
func (a *accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*core.PaymentMetadata, error) {
binIndex, cumulativePayment, err := a.BlobPaymentInfo(ctx, numSymbols, quorums)
if err != nil {
return nil, err
Expand All @@ -127,9 +126,8 @@ func (a *accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums
BinIndex: binIndex,
CumulativePayment: cumulativePayment,
}
protoPaymentHeader := pm.ConvertToProtoPaymentHeader()

return protoPaymentHeader, nil
return pm, nil
}

// TODO: PaymentCharged and SymbolsCharged copied from meterer, should be refactored
Expand Down
33 changes: 11 additions & 22 deletions api/clients/accountant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,27 @@ func TestAccountBlob_Reservation(t *testing.T) {
quorums := []uint8{0, 1}

header, err := accountant.AccountBlob(ctx, symbolLength, quorums)
metadata := core.ConvertPaymentHeader(header)

assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(time.Now().Unix()), reservationWindow), header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{500, 0, 0}, mapRecordUsage(accountant.binRecords)), true)

symbolLength = uint64(700)

header, err = accountant.AccountBlob(ctx, symbolLength, quorums)
metadata = core.ConvertPaymentHeader(header)

assert.NoError(t, err)
assert.NotEqual(t, 0, header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1200, 0, 200}, mapRecordUsage(accountant.binRecords)), true)

// Second call should use on-demand payment
header, err = accountant.AccountBlob(ctx, 300, quorums)
metadata = core.ConvertPaymentHeader(header)

assert.NoError(t, err)
assert.Equal(t, uint32(0), header.BinIndex)
assert.Equal(t, big.NewInt(300), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(300), header.CumulativePayment)
}

func TestAccountBlob_OnDemand(t *testing.T) {
Expand Down Expand Up @@ -124,10 +121,9 @@ func TestAccountBlob_OnDemand(t *testing.T) {
header, err := accountant.AccountBlob(ctx, numSymbols, quorums)
assert.NoError(t, err)

metadata := core.ConvertPaymentHeader(header)
expectedPayment := big.NewInt(int64(numSymbols * uint64(pricePerSymbol)))
assert.Equal(t, uint32(0), header.BinIndex)
assert.Equal(t, expectedPayment, metadata.CumulativePayment)
assert.Equal(t, expectedPayment, header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{0, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, expectedPayment, accountant.cumulativePayment)
}
Expand Down Expand Up @@ -180,24 +176,21 @@ func TestAccountBlobCallSeries(t *testing.T) {

// First call: Use reservation
header, err := accountant.AccountBlob(ctx, 800, quorums)
metadata := core.ConvertPaymentHeader(header)
assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)

// Second call: Use remaining reservation + overflow
header, err = accountant.AccountBlob(ctx, 300, quorums)
metadata = core.ConvertPaymentHeader(header)
assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)

// Third call: Use on-demand
header, err = accountant.AccountBlob(ctx, 500, quorums)
metadata = core.ConvertPaymentHeader(header)
assert.NoError(t, err)
assert.Equal(t, uint32(0), header.BinIndex)
assert.Equal(t, big.NewInt(500), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(500), header.CumulativePayment)

// Fourth call: Insufficient on-demand
_, err = accountant.AccountBlob(ctx, 600, quorums)
Expand Down Expand Up @@ -321,23 +314,20 @@ func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) {
header, err := accountant.AccountBlob(ctx, 800, quorums)
assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
metadata := core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.binRecords)), true)

// Second call: Allow one overflow
header, err = accountant.AccountBlob(ctx, 500, quorums)
assert.NoError(t, err)
metadata = core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true)

// Third call: Should use on-demand payment
header, err = accountant.AccountBlob(ctx, 200, quorums)
assert.NoError(t, err)
assert.Equal(t, uint32(0), header.BinIndex)
metadata = core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(200), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(200), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true)
}

Expand Down Expand Up @@ -373,8 +363,7 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) {
header, err := accountant.AccountBlob(ctx, 500, quorums)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
metadata := core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(500), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(500), header.CumulativePayment)

// Wait for next reservation duration
time.Sleep(time.Duration(reservationWindow) * time.Second)
Expand Down
35 changes: 16 additions & 19 deletions api/clients/disperser_client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package clients
import (
"context"
"fmt"
"math/big"
"sync"

"github.com/Layr-Labs/eigenda/api"
Expand All @@ -30,12 +29,13 @@ type DisperserClientV2 interface {
}

type disperserClientV2 struct {
config *DisperserClientV2Config
signer corev2.BlobRequestSigner
initOnce sync.Once
conn *grpc.ClientConn
client disperser_rpc.DisperserClient
prover encoding.Prover
config *DisperserClientV2Config
signer corev2.BlobRequestSigner
initOnce sync.Once
conn *grpc.ClientConn
client disperser_rpc.DisperserClient
prover encoding.Prover
accountant Accountant
}

var _ DisperserClientV2 = &disperserClientV2{}
Expand All @@ -60,7 +60,7 @@ var _ DisperserClientV2 = &disperserClientV2{}
//
// // Subsequent calls will use the existing connection
// status2, blobKey2, err := client.DisperseBlob(ctx, data, blobHeader)
func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobRequestSigner, prover encoding.Prover) (*disperserClientV2, error) {
func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobRequestSigner, prover encoding.Prover, accountant Accountant) (*disperserClientV2, error) {
if config == nil {
return nil, api.NewErrorInvalidArg("config must be provided")
}
Expand All @@ -75,9 +75,10 @@ func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobReq
}

return &disperserClientV2{
config: config,
signer: signer,
prover: prover,
config: config,
signer: signer,
prover: prover,
accountant: accountant,
// conn and client are initialized lazily
}, nil
}
Expand Down Expand Up @@ -109,15 +110,11 @@ func (c *disperserClientV2) DisperseBlob(
return nil, [32]byte{}, api.NewErrorInternal("uninitialized signer for authenticated dispersal")
}

var payment core.PaymentMetadata
accountId, err := c.signer.GetAccountID()
symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
payment, err := c.accountant.AccountBlob(ctx, uint64(symbolLength), quorums)
if err != nil {
return nil, [32]byte{}, api.NewErrorInvalidArg(fmt.Sprintf("please configure signer key if you want to use authenticated endpoint %v", err))
return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
}
payment.AccountID = accountId
// TODO: add payment metadata
payment.BinIndex = 0
payment.CumulativePayment = big.NewInt(0)

if len(quorums) == 0 {
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum numbers must be provided")
Expand Down Expand Up @@ -160,7 +157,7 @@ func (c *disperserClientV2) DisperseBlob(
BlobVersion: blobVersion,
BlobCommitments: blobCommitments,
QuorumNumbers: quorums,
PaymentMetadata: payment,
PaymentMetadata: *payment,
}
sig, err := c.signer.SignBlobRequest(blobHeader)
if err != nil {
Expand Down

0 comments on commit 9982bf1

Please sign in to comment.