diff --git a/api/clients/accountant.go b/api/clients/accountant.go index 13dd53999b..0c8f0b28e2 100644 --- a/api/clients/accountant.go +++ b/api/clients/accountant.go @@ -49,13 +49,17 @@ func NewAccountant(reservation *core.ActiveReservation, onDemand *core.OnDemandP //TODO: client storage; currently every instance starts fresh but on-chain or a small store makes more sense // Also client is currently responsible for supplying network params, we need to add RPC in order to be automatic // There's a subsequent PR that handles populating the accountant with on-chain state from the disperser + binRecords := make([]BinRecord, numBins) + for i := range binRecords { + binRecords[i] = BinRecord{Index: uint32(i), Usage: 0} + } a := accountant{ reservation: reservation, onDemand: onDemand, reservationWindow: reservationWindow, pricePerSymbol: pricePerSymbol, minNumSymbols: minNumSymbols, - binRecords: []BinRecord{{Index: 0, Usage: 0}, {Index: 1, Usage: 0}, {Index: 2, Usage: 0}}, + binRecords: binRecords, cumulativePayment: big.NewInt(0), paymentSigner: paymentSigner, numBins: max(numBins, minNumBins), diff --git a/api/clients/accountant_test.go b/api/clients/accountant_test.go index 2f74ce0d2c..d40f75550f 100644 --- a/api/clients/accountant_test.go +++ b/api/clients/accountant_test.go @@ -3,7 +3,6 @@ package clients import ( "context" "encoding/hex" - "fmt" "math/big" "sync" "testing" @@ -19,14 +18,14 @@ import ( const numBins = uint32(3) func TestNewAccountant(t *testing.T) { - reservation := core.ActiveReservation{ + reservation := &core.ActiveReservation{ SymbolsPerSec: 100, StartTimestamp: 100, EndTimestamp: 200, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}, } - onDemand := core.OnDemandPayment{ + onDemand := &core.OnDemandPayment{ CumulativePayment: big.NewInt(500), } reservationWindow := uint32(6) @@ -35,8 +34,9 @@ func TestNewAccountant(t *testing.T) { privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) - paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(&reservation, &onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) assert.NotNil(t, accountant) assert.Equal(t, reservation, accountant.reservation) @@ -49,14 +49,14 @@ func TestNewAccountant(t *testing.T) { } func TestAccountBlob_Reservation(t *testing.T) { - reservation := core.ActiveReservation{ + reservation := &core.ActiveReservation{ SymbolsPerSec: 200, StartTimestamp: 100, EndTimestamp: 200, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}, } - onDemand := core.OnDemandPayment{ + onDemand := &core.OnDemandPayment{ CumulativePayment: big.NewInt(500), } reservationWindow := uint32(5) @@ -65,8 +65,9 @@ func TestAccountBlob_Reservation(t *testing.T) { privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) - paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(&reservation, &onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) ctx := context.Background() symbolLength := uint64(500) @@ -100,14 +101,14 @@ func TestAccountBlob_Reservation(t *testing.T) { } func TestAccountBlob_OnDemand(t *testing.T) { - reservation := core.ActiveReservation{ + reservation := &core.ActiveReservation{ SymbolsPerSec: 200, StartTimestamp: 100, EndTimestamp: 200, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}, } - onDemand := core.OnDemandPayment{ + onDemand := &core.OnDemandPayment{ CumulativePayment: big.NewInt(1500), } reservationWindow := uint32(5) @@ -116,8 +117,9 @@ func TestAccountBlob_OnDemand(t *testing.T) { privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) - paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(&reservation, &onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) ctx := context.Background() numSymbols := uint64(1500) @@ -135,8 +137,8 @@ func TestAccountBlob_OnDemand(t *testing.T) { } func TestAccountBlob_InsufficientOnDemand(t *testing.T) { - reservation := core.ActiveReservation{} - onDemand := core.OnDemandPayment{ + reservation := &core.ActiveReservation{} + onDemand := &core.OnDemandPayment{ CumulativePayment: big.NewInt(500), } reservationWindow := uint32(60) @@ -145,8 +147,9 @@ func TestAccountBlob_InsufficientOnDemand(t *testing.T) { privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) - paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(&reservation, &onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) ctx := context.Background() numSymbols := uint64(2000) @@ -157,14 +160,14 @@ func TestAccountBlob_InsufficientOnDemand(t *testing.T) { } func TestAccountBlobCallSeries(t *testing.T) { - reservation := core.ActiveReservation{ + reservation := &core.ActiveReservation{ SymbolsPerSec: 200, StartTimestamp: 100, EndTimestamp: 200, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}, } - onDemand := core.OnDemandPayment{ + onDemand := &core.OnDemandPayment{ CumulativePayment: big.NewInt(1000), } reservationWindow := uint32(5) @@ -173,8 +176,9 @@ func TestAccountBlobCallSeries(t *testing.T) { privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) - paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(&reservation, &onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) ctx := context.Background() quorums := []uint8{0, 1} @@ -208,14 +212,14 @@ func TestAccountBlobCallSeries(t *testing.T) { } func TestAccountBlob_BinRotation(t *testing.T) { - reservation := core.ActiveReservation{ + reservation := &core.ActiveReservation{ SymbolsPerSec: 1000, StartTimestamp: 100, EndTimestamp: 200, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}, } - onDemand := core.OnDemandPayment{ + onDemand := &core.OnDemandPayment{ CumulativePayment: big.NewInt(1000), } reservationWindow := uint32(1) // Set to 1 second for testing @@ -223,14 +227,15 @@ func TestAccountBlob_BinRotation(t *testing.T) { minNumSymbols := uint32(100) privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) - paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(&reservation, &onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) ctx := context.Background() quorums := []uint8{0, 1} // First call - header, _, err := accountant.AccountBlob(ctx, 800, quorums) + _, _, err = accountant.AccountBlob(ctx, 800, quorums) assert.NoError(t, err) assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.binRecords)), true) @@ -238,26 +243,25 @@ func TestAccountBlob_BinRotation(t *testing.T) { time.Sleep(1000 * time.Millisecond) // Second call - header, _, err = accountant.AccountBlob(ctx, 300, quorums) + _, _, err = accountant.AccountBlob(ctx, 300, quorums) assert.NoError(t, err) - fmt.Println("shifts ", header.BinIndex%3) assert.Equal(t, isRotation([]uint64{800, 300, 0}, mapRecordUsage(accountant.binRecords)), true) // Third call - header, _, err = accountant.AccountBlob(ctx, 500, quorums) + _, _, err = accountant.AccountBlob(ctx, 500, quorums) assert.NoError(t, err) assert.Equal(t, isRotation([]uint64{800, 800, 0}, mapRecordUsage(accountant.binRecords)), true) } func TestConcurrentBinRotationAndAccountBlob(t *testing.T) { - reservation := core.ActiveReservation{ + reservation := &core.ActiveReservation{ SymbolsPerSec: 1000, StartTimestamp: 100, EndTimestamp: 200, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}, } - onDemand := core.OnDemandPayment{ + onDemand := &core.OnDemandPayment{ CumulativePayment: big.NewInt(1000), } reservationWindow := uint32(1) // Set to 1 second for testing @@ -266,8 +270,9 @@ func TestConcurrentBinRotationAndAccountBlob(t *testing.T) { privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) - paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(&reservation, &onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) ctx := context.Background() quorums := []uint8{0, 1} @@ -296,14 +301,14 @@ func TestConcurrentBinRotationAndAccountBlob(t *testing.T) { } func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) { - reservation := core.ActiveReservation{ + reservation := &core.ActiveReservation{ SymbolsPerSec: 200, StartTimestamp: 100, EndTimestamp: 200, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}, } - onDemand := core.OnDemandPayment{ + onDemand := &core.OnDemandPayment{ CumulativePayment: big.NewInt(1000), } reservationWindow := uint32(5) @@ -312,8 +317,9 @@ func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) { privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) - paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(&reservation, &onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) ctx := context.Background() quorums := []uint8{0, 1} now := time.Now().Unix() @@ -343,14 +349,14 @@ func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) { } func TestAccountBlob_ReservationOverflowReset(t *testing.T) { - reservation := core.ActiveReservation{ + reservation := &core.ActiveReservation{ SymbolsPerSec: 1000, StartTimestamp: 100, EndTimestamp: 200, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}, } - onDemand := core.OnDemandPayment{ + onDemand := &core.OnDemandPayment{ CumulativePayment: big.NewInt(1000), } reservationWindow := uint32(1) // Set to 1 second for testing @@ -359,8 +365,9 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) { privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) - paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(&reservation, &onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) ctx := context.Background() quorums := []uint8{0, 1} @@ -381,7 +388,7 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) { time.Sleep(time.Duration(reservationWindow) * time.Second) // Third call: Should use new bin and allow overflow again - header, _, err = accountant.AccountBlob(ctx, 500, quorums) + _, _, err = accountant.AccountBlob(ctx, 500, quorums) assert.NoError(t, err) assert.Equal(t, isRotation([]uint64{1000, 500, 0}, mapRecordUsage(accountant.binRecords)), true) } diff --git a/api/clients/config.go b/api/clients/config.go index c4024a1df5..f4d9caa9fb 100644 --- a/api/clients/config.go +++ b/api/clients/config.go @@ -60,13 +60,6 @@ type EigenDAClientConfig struct { // that can retrieve blobs but cannot disperse blobs. SignerPrivateKeyHex string - // Payment signer private key in hex encoded format. This key connect to the wallet with payment registered on-chain - // if set to "", will result in a non-paying client and cannot disperse paid blobs. - PaymentSignerPrivateKeyHex string - - // Payment number of bins indicate how many bins are kept at all times; the minimum is set to 3. - PaymentNumBins uint32 - // Whether to disable TLS for an insecure connection when connecting to a local EigenDA disperser instance. DisableTLS bool @@ -119,9 +112,6 @@ func (c *EigenDAClientConfig) CheckAndSetDefaults() error { if len(c.SignerPrivateKeyHex) > 0 && len(c.SignerPrivateKeyHex) != 64 { return fmt.Errorf("a valid length SignerPrivateKeyHex needs to have 64 bytes") } - if len(c.PaymentSignerPrivateKeyHex) > 0 && len(c.PaymentSignerPrivateKeyHex) != 64 { - return fmt.Errorf("a valid length PaymentSignerPrivateKeyHex needs to have 64 bytes") - } if len(c.RPC) == 0 { return fmt.Errorf("EigenDAClientConfig.RPC not set") diff --git a/api/clients/disperser_client.go b/api/clients/disperser_client.go index 26f7ce2195..272dccacf2 100644 --- a/api/clients/disperser_client.go +++ b/api/clients/disperser_client.go @@ -3,7 +3,6 @@ package clients import ( "context" "crypto/tls" - "errors" "fmt" "sync" "time" @@ -55,7 +54,6 @@ type DisperserClient interface { // DisperseBlobAuthenticated disperses a blob with an authenticated request. // The BlobStatus returned will always be PROCESSSING if error is nil. DisperseBlobAuthenticated(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) - DispersePaidBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) } @@ -189,59 +187,6 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums return blobStatus, reply.GetRequestId(), nil } -// DispersePaidBlob disperses a blob with a payment header and signature. Similar to DisperseBlob but with signed payment header. -func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { - if c.accountant == nil { - return nil, nil, api.NewErrorInternal("not implemented") - } - - err := c.initOnceGrpcConnection() - if err != nil { - return nil, nil, fmt.Errorf("error initializing connection: %w", err) - } - - ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout) - defer cancel() - - quorumNumbers := make([]uint32, len(quorums)) - for i, q := range quorums { - quorumNumbers[i] = uint32(q) - } - - // check every 32 bytes of data are within the valid range for a bn254 field element - _, err = rs.ToFrArray(data) - if err != nil { - return nil, nil, fmt.Errorf("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617 %w", err) - } - - header, signature, err := c.accountant.AccountBlob(ctx, uint64(encoding.GetBlobLength(uint(len(data)))), quorums) - if header == nil { - return nil, nil, errors.New("accountant returned nil pointer to header") - } - if err != nil { - return nil, nil, err - } - - request := &disperser_rpc.DispersePaidBlobRequest{ - Data: data, - QuorumNumbers: quorumNumbers, - PaymentHeader: header, - PaymentSignature: signature, - } - - reply, err := c.client.DispersePaidBlob(ctxTimeout, request) - if err != nil { - return nil, nil, err - } - - blobStatus, err := disperser.FromBlobStatusProto(reply.GetResult()) - if err != nil { - return nil, nil, err - } - - return blobStatus, reply.GetRequestId(), nil -} - func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { err := c.initOnceGrpcConnection() if err != nil { diff --git a/api/clients/disperser_client_test.go b/api/clients/disperser_client_test.go index 00277657bd..cd762a34ff 100644 --- a/api/clients/disperser_client_test.go +++ b/api/clients/disperser_client_test.go @@ -14,7 +14,7 @@ import ( func TestPutBlobNoopSigner(t *testing.T) { config := clients.NewConfig("nohost", "noport", time.Second, false) - disperserClient, err := clients.NewDisperserClient(config, auth.NewLocalNoopSigner()) + disperserClient, err := clients.NewDisperserClient(config, auth.NewLocalNoopSigner(), nil) assert.NoError(t, err) test := []byte("test") diff --git a/api/clients/eigenda_client.go b/api/clients/eigenda_client.go index b60d1364b7..ec0ca55c4b 100644 --- a/api/clients/eigenda_client.go +++ b/api/clients/eigenda_client.go @@ -110,21 +110,7 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien disperserConfig := NewConfig(host, port, config.ResponseTimeout, !config.DisableTLS) - var paymentSigner core.PaymentSigner - if len(config.PaymentSignerPrivateKeyHex) == 64 { - paymentSigner, err = auth.NewPaymentSigner(config.PaymentSignerPrivateKeyHex) - if err != nil { - return nil, fmt.Errorf("new payment signer: %w", err) - } - } else if len(config.PaymentSignerPrivateKeyHex) == 0 { - paymentSigner = auth.NewNoopPaymentSigner() - } else { - return nil, fmt.Errorf("invalid length for signer private key") - } - - // a subsequent PR contains updates to fill in payment state - accountant := NewAccountant(&core.ActiveReservation{}, &core.OnDemandPayment{}, 0, 0, 0, paymentSigner, config.PaymentNumBins) - disperserClient, err := NewDisperserClient(disperserConfig, signer, accountant) + disperserClient, err := NewDisperserClient(disperserConfig, signer, nil) if err != nil { return nil, fmt.Errorf("new disperser-client: %w", err) } @@ -253,13 +239,8 @@ func (m *EigenDAClient) putBlob(ctxFinality context.Context, rawData []byte, res } // disperse blob // TODO: would be nice to add a trace-id key to the context, to be able to follow requests from batcher->proxy->eigenda - var requestID []byte // clients with a payment signer setting can disperse paid blobs - if len(m.Config.PaymentSignerPrivateKeyHex) > 0 { - _, requestID, err = m.Client.DispersePaidBlob(ctxFinality, data, customQuorumNumbers) - } else { - _, requestID, err = m.Client.DisperseBlobAuthenticated(ctxFinality, data, customQuorumNumbers) - } + _, requestID, err := m.Client.DisperseBlobAuthenticated(ctxFinality, data, customQuorumNumbers) if err != nil { // DisperserClient returned error is already a grpc error which can be a 400 (eg rate limited) or 500, // so we wrap the error such that clients can still use grpc's status.FromError() function to get the status code. diff --git a/api/clients/mock/disperser_client.go b/api/clients/mock/disperser_client.go index 6784a1afde..3763e81304 100644 --- a/api/clients/mock/disperser_client.go +++ b/api/clients/mock/disperser_client.go @@ -81,27 +81,6 @@ func (c *MockDisperserClient) DisperseBlob(ctx context.Context, data []byte, quo return status, key, err } -func (c *MockDisperserClient) DispersePaidBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { - args := c.Called(data, quorums) - var status *disperser.BlobStatus - if args.Get(0) != nil { - status = (args.Get(0)).(*disperser.BlobStatus) - } - var key []byte - if args.Get(1) != nil { - key = (args.Get(1)).([]byte) - } - var err error - if args.Get(2) != nil { - err = (args.Get(2)).(error) - } - - keyStr := base64.StdEncoding.EncodeToString(key) - c.mockRequestIDStore[keyStr] = data - - return status, key, err -} - func (c *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { args := c.Called(key) var reply *disperser_rpc.BlobStatusReply diff --git a/api/grpc/disperser/disperser.pb.go b/api/grpc/disperser/disperser.pb.go index 3b29214a8e..d10d4d7366 100644 --- a/api/grpc/disperser/disperser.pb.go +++ b/api/grpc/disperser/disperser.pb.go @@ -1432,38 +1432,33 @@ var file_disperser_disperser_proto_rawDesc = []byte{ 0x03, 0x12, 0x0d, 0x0a, 0x09, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, 0x45, 0x44, 0x10, 0x04, 0x12, 0x1b, 0x0a, 0x17, 0x49, 0x4e, 0x53, 0x55, 0x46, 0x46, 0x49, 0x43, 0x49, 0x45, 0x4e, 0x54, 0x5f, 0x53, 0x49, 0x47, 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x53, 0x10, 0x05, 0x12, 0x0e, 0x0a, - 0x0a, 0x44, 0x49, 0x53, 0x50, 0x45, 0x52, 0x53, 0x49, 0x4e, 0x47, 0x10, 0x06, 0x32, 0xb1, 0x03, + 0x0a, 0x44, 0x49, 0x53, 0x50, 0x45, 0x52, 0x53, 0x49, 0x4e, 0x47, 0x10, 0x06, 0x32, 0xd9, 0x02, 0x0a, 0x09, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x12, 0x4e, 0x0a, 0x0c, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, - 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x10, 0x44, - 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x50, 0x61, 0x69, 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x12, - 0x22, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x44, 0x69, 0x73, 0x70, - 0x65, 0x72, 0x73, 0x65, 0x50, 0x61, 0x69, 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, - 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, - 0x79, 0x22, 0x00, 0x12, 0x5f, 0x0a, 0x19, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, - 0x6c, 0x6f, 0x62, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, - 0x12, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x41, 0x75, 0x74, - 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x41, 0x75, - 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, - 0x28, 0x01, 0x30, 0x01, 0x12, 0x4b, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x53, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, - 0x72, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, - 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, - 0x00, 0x12, 0x4e, 0x0a, 0x0c, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, - 0x62, 0x12, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x52, 0x65, - 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x52, 0x65, - 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, - 0x00, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, - 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x69, 0x73, 0x70, 0x65, - 0x72, 0x73, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x5f, 0x0a, 0x19, 0x44, + 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x41, 0x75, 0x74, 0x68, 0x65, + 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x12, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, + 0x72, 0x73, 0x65, 0x72, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, + 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, + 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, + 0x74, 0x65, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4b, 0x0a, 0x0d, + 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1c, 0x2e, + 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x64, 0x69, + 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x0c, 0x52, 0x65, 0x74, + 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x70, + 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, + 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, + 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, + 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, + 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, + 0x70, 0x63, 0x2f, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1518,17 +1513,15 @@ var file_disperser_disperser_proto_depIdxs = []int32{ 16, // 12: disperser.BlobVerificationProof.batch_metadata:type_name -> disperser.BatchMetadata 17, // 13: disperser.BatchMetadata.batch_header:type_name -> disperser.BatchHeader 5, // 14: disperser.Disperser.DisperseBlob:input_type -> disperser.DisperseBlobRequest - 6, // 15: disperser.Disperser.DispersePaidBlob:input_type -> disperser.DispersePaidBlobRequest - 1, // 16: disperser.Disperser.DisperseBlobAuthenticated:input_type -> disperser.AuthenticatedRequest - 8, // 17: disperser.Disperser.GetBlobStatus:input_type -> disperser.BlobStatusRequest - 10, // 18: disperser.Disperser.RetrieveBlob:input_type -> disperser.RetrieveBlobRequest - 7, // 19: disperser.Disperser.DisperseBlob:output_type -> disperser.DisperseBlobReply - 7, // 20: disperser.Disperser.DispersePaidBlob:output_type -> disperser.DisperseBlobReply - 2, // 21: disperser.Disperser.DisperseBlobAuthenticated:output_type -> disperser.AuthenticatedReply - 9, // 22: disperser.Disperser.GetBlobStatus:output_type -> disperser.BlobStatusReply - 11, // 23: disperser.Disperser.RetrieveBlob:output_type -> disperser.RetrieveBlobReply - 19, // [19:24] is the sub-list for method output_type - 14, // [14:19] is the sub-list for method input_type + 1, // 15: disperser.Disperser.DisperseBlobAuthenticated:input_type -> disperser.AuthenticatedRequest + 8, // 16: disperser.Disperser.GetBlobStatus:input_type -> disperser.BlobStatusRequest + 10, // 17: disperser.Disperser.RetrieveBlob:input_type -> disperser.RetrieveBlobRequest + 7, // 18: disperser.Disperser.DisperseBlob:output_type -> disperser.DisperseBlobReply + 2, // 19: disperser.Disperser.DisperseBlobAuthenticated:output_type -> disperser.AuthenticatedReply + 9, // 20: disperser.Disperser.GetBlobStatus:output_type -> disperser.BlobStatusReply + 11, // 21: disperser.Disperser.RetrieveBlob:output_type -> disperser.RetrieveBlobReply + 18, // [18:22] is the sub-list for method output_type + 14, // [14:18] is the sub-list for method input_type 14, // [14:14] is the sub-list for extension type_name 14, // [14:14] is the sub-list for extension extendee 0, // [0:14] is the sub-list for field type_name diff --git a/api/grpc/disperser/disperser_grpc.pb.go b/api/grpc/disperser/disperser_grpc.pb.go index aac01c73b2..c6bb719a2b 100644 --- a/api/grpc/disperser/disperser_grpc.pb.go +++ b/api/grpc/disperser/disperser_grpc.pb.go @@ -20,7 +20,6 @@ const _ = grpc.SupportPackageIsVersion7 const ( Disperser_DisperseBlob_FullMethodName = "/disperser.Disperser/DisperseBlob" - Disperser_DispersePaidBlob_FullMethodName = "/disperser.Disperser/DispersePaidBlob" Disperser_DisperseBlobAuthenticated_FullMethodName = "/disperser.Disperser/DisperseBlobAuthenticated" Disperser_GetBlobStatus_FullMethodName = "/disperser.Disperser/GetBlobStatus" Disperser_RetrieveBlob_FullMethodName = "/disperser.Disperser/RetrieveBlob" @@ -43,11 +42,6 @@ type DisperserClient interface { // // INTERNAL (500): serious error, user should NOT retry. DisperseBlob(ctx context.Context, in *DisperseBlobRequest, opts ...grpc.CallOption) (*DisperseBlobReply, error) - // This API require valid payments to accept blob to disperse from clients. - // This executes the dispersal async, i.e. it returns once the request - // is accepted. The client could use GetBlobStatus() API to poll the the - // processing status of the blob. - DispersePaidBlob(ctx context.Context, in *DispersePaidBlobRequest, opts ...grpc.CallOption) (*DisperseBlobReply, error) // DisperseBlobAuthenticated is similar to DisperseBlob, except that it requires the // client to authenticate itself via the AuthenticationData message. The protoco is as follows: // 1. The client sends a DisperseBlobAuthenticated request with the DisperseBlobRequest message @@ -85,15 +79,6 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, in *DisperseBlobRequ return out, nil } -func (c *disperserClient) DispersePaidBlob(ctx context.Context, in *DispersePaidBlobRequest, opts ...grpc.CallOption) (*DisperseBlobReply, error) { - out := new(DisperseBlobReply) - err := c.cc.Invoke(ctx, Disperser_DispersePaidBlob_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, opts ...grpc.CallOption) (Disperser_DisperseBlobAuthenticatedClient, error) { stream, err := c.cc.NewStream(ctx, &Disperser_ServiceDesc.Streams[0], Disperser_DisperseBlobAuthenticated_FullMethodName, opts...) if err != nil { @@ -160,11 +145,6 @@ type DisperserServer interface { // // INTERNAL (500): serious error, user should NOT retry. DisperseBlob(context.Context, *DisperseBlobRequest) (*DisperseBlobReply, error) - // This API require valid payments to accept blob to disperse from clients. - // This executes the dispersal async, i.e. it returns once the request - // is accepted. The client could use GetBlobStatus() API to poll the the - // processing status of the blob. - DispersePaidBlob(context.Context, *DispersePaidBlobRequest) (*DisperseBlobReply, error) // DisperseBlobAuthenticated is similar to DisperseBlob, except that it requires the // client to authenticate itself via the AuthenticationData message. The protoco is as follows: // 1. The client sends a DisperseBlobAuthenticated request with the DisperseBlobRequest message @@ -193,9 +173,6 @@ type UnimplementedDisperserServer struct { func (UnimplementedDisperserServer) DisperseBlob(context.Context, *DisperseBlobRequest) (*DisperseBlobReply, error) { return nil, status.Errorf(codes.Unimplemented, "method DisperseBlob not implemented") } -func (UnimplementedDisperserServer) DispersePaidBlob(context.Context, *DispersePaidBlobRequest) (*DisperseBlobReply, error) { - return nil, status.Errorf(codes.Unimplemented, "method DispersePaidBlob not implemented") -} func (UnimplementedDisperserServer) DisperseBlobAuthenticated(Disperser_DisperseBlobAuthenticatedServer) error { return status.Errorf(codes.Unimplemented, "method DisperseBlobAuthenticated not implemented") } @@ -236,24 +213,6 @@ func _Disperser_DisperseBlob_Handler(srv interface{}, ctx context.Context, dec f return interceptor(ctx, in, info, handler) } -func _Disperser_DispersePaidBlob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(DispersePaidBlobRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DisperserServer).DispersePaidBlob(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Disperser_DispersePaidBlob_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DisperserServer).DispersePaidBlob(ctx, req.(*DispersePaidBlobRequest)) - } - return interceptor(ctx, in, info, handler) -} - func _Disperser_DisperseBlobAuthenticated_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(DisperserServer).DisperseBlobAuthenticated(&disperserDisperseBlobAuthenticatedServer{stream}) } @@ -327,10 +286,6 @@ var Disperser_ServiceDesc = grpc.ServiceDesc{ MethodName: "DisperseBlob", Handler: _Disperser_DisperseBlob_Handler, }, - { - MethodName: "DispersePaidBlob", - Handler: _Disperser_DispersePaidBlob_Handler, - }, { MethodName: "GetBlobStatus", Handler: _Disperser_GetBlobStatus_Handler, diff --git a/api/proto/disperser/disperser.proto b/api/proto/disperser/disperser.proto index 7d21ac3f18..f4bfa1384c 100644 --- a/api/proto/disperser/disperser.proto +++ b/api/proto/disperser/disperser.proto @@ -17,13 +17,6 @@ service Disperser { // INTERNAL (500): serious error, user should NOT retry. rpc DisperseBlob(DisperseBlobRequest) returns (DisperseBlobReply) {} - // This API require valid payments to accept blob to disperse from clients. - // This executes the dispersal async, i.e. it returns once the request - // is accepted. The client could use GetBlobStatus() API to poll the the - // processing status of the blob. - rpc DispersePaidBlob(DispersePaidBlobRequest) returns (DisperseBlobReply) {} - - // DisperseBlobAuthenticated is similar to DisperseBlob, except that it requires the // client to authenticate itself via the AuthenticationData message. The protoco is as follows: // 1. The client sends a DisperseBlobAuthenticated request with the DisperseBlobRequest message diff --git a/core/auth.go b/core/auth.go index 964d3fdcd0..7e7669aa21 100644 --- a/core/auth.go +++ b/core/auth.go @@ -50,7 +50,6 @@ func VerifySignature(message []byte, accountAddr geth.Address, sig []byte) error } type PaymentSigner interface { - // SignBlobPayment(header *commonpb.PaymentHeader) ([]byte, error) SignBlobPayment(header *PaymentMetadata) ([]byte, error) GetAccountID() string } diff --git a/core/auth/payment_signer_test.go b/core/auth/payment_signer_test.go index 8bccc07b22..e2c1a172ad 100644 --- a/core/auth/payment_signer_test.go +++ b/core/auth/payment_signer_test.go @@ -22,7 +22,7 @@ func TestPaymentSigner(t *testing.T) { t.Run("SignBlobPayment", func(t *testing.T) { header := &core.PaymentMetadata{ - AccountID: "", + AccountID: signer.GetAccountID(), BinIndex: 1, CumulativePayment: big.NewInt(1), } @@ -40,7 +40,7 @@ func TestPaymentSigner(t *testing.T) { header := &core.PaymentMetadata{ BinIndex: 1, CumulativePayment: big.NewInt(1), - AccountID: "", + AccountID: signer.GetAccountID(), } // Create an invalid signature @@ -53,7 +53,7 @@ func TestPaymentSigner(t *testing.T) { header := &core.PaymentMetadata{ BinIndex: 1, CumulativePayment: big.NewInt(1), - AccountID: "", + AccountID: signer.GetAccountID(), } signature, err := signer.SignBlobPayment(header) diff --git a/disperser/apiserver/payment_test.go b/disperser/apiserver/payment_test.go deleted file mode 100644 index c049380899..0000000000 --- a/disperser/apiserver/payment_test.go +++ /dev/null @@ -1,150 +0,0 @@ -package apiserver_test - -import ( - "context" - "crypto/rand" - "math/big" - "net" - "testing" - "time" - - "github.com/Layr-Labs/eigenda/core/auth" - "github.com/Layr-Labs/eigenda/core/meterer" - "github.com/Layr-Labs/eigenda/core/mock" - "github.com/Layr-Labs/eigenda/encoding" - "github.com/Layr-Labs/eigenda/encoding/utils/codec" - - pb "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/core" - "github.com/stretchr/testify/assert" - tmock "github.com/stretchr/testify/mock" - "google.golang.org/grpc/peer" -) - -func TestDispersePaidBlob(t *testing.T) { - - transactor := &mock.MockWriter{} - transactor.On("GetCurrentBlockNumber").Return(uint32(100), nil) - transactor.On("GetQuorumCount").Return(uint8(2), nil) - quorumParams := []core.SecurityParam{ - {QuorumID: 0, AdversaryThreshold: 80, ConfirmationThreshold: 100}, - {QuorumID: 1, AdversaryThreshold: 80, ConfirmationThreshold: 100}, - } - transactor.On("GetQuorumSecurityParams", tmock.Anything).Return(quorumParams, nil) - transactor.On("GetRequiredQuorumNumbers", tmock.Anything).Return([]uint8{0, 1}, nil) - - quorums := []uint32{0, 1} - - dispersalServer := newTestServer(transactor, t.Name()) - - data := make([]byte, 1024) - _, err := rand.Read(data) - assert.NoError(t, err) - - data = codec.ConvertByPaddingEmptyByte(data) - - p := &peer.Peer{ - Addr: &net.TCPAddr{ - IP: net.ParseIP("0.0.0.0"), - Port: 51001, - }, - } - ctx := peer.NewContext(context.Background(), p) - - transactor.On("GetRequiredQuorumNumbers", tmock.Anything).Return([]uint8{0, 1}, nil).Twice() - - pk := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdeb" - signer := auth.NewPaymentSigner(pk) - - symbolLength := encoding.GetBlobLength(uint(len(data))) - // disperse on-demand payment - for i := 1; i < 3; i++ { - pm := &core.PaymentMetadata{ - AccountID: signer.GetAccountID(), - BinIndex: 0, - CumulativePayment: big.NewInt(int64(int(symbolLength) * i * encoding.BYTES_PER_SYMBOL)), - } - sig, err := signer.SignBlobPayment(pm) - assert.NoError(t, err) - reply, err := dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ - Data: data, - QuorumNumbers: quorums, - PaymentHeader: pm.ConvertToProtoPaymentHeader(), - PaymentSignature: sig, - }) - assert.NoError(t, err) - assert.Equal(t, reply.GetResult(), pb.BlobStatus_PROCESSING) - assert.NotNil(t, reply.GetRequestId()) - } - - // exceeded payment limit - pm := &core.PaymentMetadata{ - AccountID: signer.GetAccountID(), - BinIndex: 0, - CumulativePayment: big.NewInt(int64(symbolLength*3)*encoding.BYTES_PER_SYMBOL - 1), - } - sig, err := signer.SignBlobPayment(pm) - assert.NoError(t, err) - _, err = dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ - Data: data, - QuorumNumbers: quorums, - PaymentHeader: pm.ConvertToProtoPaymentHeader(), - PaymentSignature: sig, - }) - assert.Error(t, err) - assert.Contains(t, err.Error(), "request claims a cumulative payment greater than the on-chain deposit") - - // disperse paid reservation (any quorum number) - // TODO: somehow meterer is not defined as a method or field in dispersalServer; reservationWindow we set was 1 - for i := 0; i < 2; i++ { - binIndex := meterer.GetBinIndex(uint64(time.Now().Unix()), 1) - pm := &core.PaymentMetadata{ - AccountID: signer.GetAccountID(), - BinIndex: binIndex, - CumulativePayment: big.NewInt(0), - } - sig, err = signer.SignBlobPayment(pm) - assert.NoError(t, err) - reply, err := dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ - Data: data, - QuorumNumbers: []uint32{1}, - PaymentHeader: pm.ConvertToProtoPaymentHeader(), - PaymentSignature: sig, - }) - assert.NoError(t, err) - assert.Equal(t, reply.GetResult(), pb.BlobStatus_PROCESSING) - assert.NotNil(t, reply.GetRequestId()) - - } - binIndex := meterer.GetBinIndex(uint64(time.Now().Unix()), 1) - pm = &core.PaymentMetadata{ - AccountID: signer.GetAccountID(), - BinIndex: binIndex, - CumulativePayment: big.NewInt(0), - } - sig, err = signer.SignBlobPayment(pm) - assert.NoError(t, err) - _, err = dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ - Data: data, - QuorumNumbers: []uint32{1}, - PaymentHeader: pm.ConvertToProtoPaymentHeader(), - PaymentSignature: sig, - }) - assert.Contains(t, err.Error(), "bin has already been filled") - - // invalid bin index - binIndex = meterer.GetBinIndex(uint64(time.Now().Unix())/2, 1) - pm = &core.PaymentMetadata{ - AccountID: signer.GetAccountID(), - BinIndex: binIndex, - CumulativePayment: big.NewInt(0), - } - sig, err = signer.SignBlobPayment(pm) - _, err = dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ - Data: data, - QuorumNumbers: []uint32{1}, - PaymentHeader: pm.ConvertToProtoPaymentHeader(), - PaymentSignature: sig, - }) - assert.Contains(t, err.Error(), "invalid bin index for reservation") -} diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 4dc7b3a573..cc76549d43 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -6,7 +6,6 @@ import ( "encoding/binary" "errors" "fmt" - "math/big" "net" "slices" "strings" @@ -319,43 +318,6 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut }, nil } -func (s *DispersalServer) DispersePaidBlob(ctx context.Context, req *pb.DispersePaidBlobRequest) (*pb.DisperseBlobReply, error) { - // If EnablePaymentMeter is false, meterer gets set to nil at start - // In that case, the function should not continue. (checking ) - if s.meterer == nil { - return nil, api.NewErrorInternal("payment feature is not enabled") - } - blob, err := s.validatePaidRequestAndGetBlob(ctx, req) - binIndex := req.PaymentHeader.BinIndex - cumulativePayment := new(big.Int).SetBytes(req.PaymentHeader.CumulativePayment) - //todo: before disperse blob, validate the signature - if err != nil { - for _, quorumID := range req.QuorumNumbers { - s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(req.GetData()), "DispersePaidBlob") - } - s.metrics.HandleInvalidArgRpcRequest("DispersePaidBlob") - return nil, api.NewErrorInvalidArg(err.Error()) - } - - if err = auth.VerifyPaymentSignature(core.ConvertToPaymentMetadata(req.GetPaymentHeader()), req.PaymentSignature); err != nil { - return nil, api.NewErrorInvalidArg("payment signature is invalid") - } - - paymentHeader := core.PaymentMetadata{ - AccountID: blob.RequestHeader.AccountID, - BinIndex: binIndex, - CumulativePayment: cumulativePayment, - } - reply, err := s.disperseBlob(ctx, blob, "", "DispersePaidBlob", &paymentHeader) - if err != nil { - // Note the DispersePaidBlob already updated metrics for this error. - s.logger.Info("failed to disperse blob", "err", err) - } else { - s.metrics.HandleSuccessfulRpcRequest("DispersePaidBlob") - } - return reply, err -} - func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, quorumID core.QuorumID) (*PerUserRateInfo, string, error) { unauthRates, ok := s.rateConfig.QuorumRateInfos[quorumID] if !ok { @@ -1078,98 +1040,3 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb return blob, nil } - -// TODO: refactor checks with validateRequestAndGetBlob; most checks are the same, but paid requests have different quorum requirements -func (s *DispersalServer) validatePaidRequestAndGetBlob(ctx context.Context, req *pb.DispersePaidBlobRequest) (*core.Blob, error) { - - data := req.GetData() - blobSize := len(data) - - // The blob size in bytes must be in range [1, maxBlobSize]. - if blobSize > s.maxBlobSize { - return nil, fmt.Errorf("blob size cannot exceed %v Bytes", s.maxBlobSize) - } - if blobSize == 0 { - return nil, fmt.Errorf("blob size must be greater than 0") - } - - if len(req.GetQuorumNumbers()) > 256 { - return nil, errors.New("number of custom_quorum_numbers must not exceed 256") - } - - // validate every 32 bytes is a valid field element - _, err := rs.ToFrArray(data) - if err != nil { - s.logger.Error("failed to convert a 32bytes as a field element", "err", err) - return nil, api.NewErrorInvalidArg(fmt.Sprintf("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617: %v", err)) - } - - quorumConfig, err := s.updateQuorumConfig(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get quorum config: %w", err) - } - - if len(req.GetQuorumNumbers()) > int(quorumConfig.QuorumCount) { - return nil, errors.New("number of custom_quorum_numbers must not exceed number of quorums") - } - - seenQuorums := make(map[uint8]struct{}) - - // TODO: validate payment signature against payment metadata - if err = auth.VerifyPaymentSignature(core.ConvertToPaymentMetadata(req.GetPaymentHeader()), req.GetPaymentSignature()); err != nil { - return nil, fmt.Errorf("payment signature is invalid: %w", err) - } - // Unlike regular blob dispersal request validation, there's no check with required quorums - // Because Reservation has their specific quorum requirements, and on-demand is only allowed and paid to the required quorums. - // Payment specific validations are done within the meterer library. - for i := range req.GetQuorumNumbers() { - - if req.GetQuorumNumbers()[i] > core.MaxQuorumID { - return nil, fmt.Errorf("custom_quorum_numbers must be in range [0, 254], but found %d", req.GetQuorumNumbers()[i]) - } - - quorumID := uint8(req.GetQuorumNumbers()[i]) - if quorumID >= quorumConfig.QuorumCount { - return nil, fmt.Errorf("custom_quorum_numbers must be in range [0, %d], but found %d", s.quorumConfig.QuorumCount-1, quorumID) - } - - if _, ok := seenQuorums[quorumID]; ok { - return nil, fmt.Errorf("custom_quorum_numbers must not contain duplicates") - } - seenQuorums[quorumID] = struct{}{} - - } - - if len(seenQuorums) == 0 { - return nil, fmt.Errorf("the blob must be sent to at least one quorum") - } - - params := make([]*core.SecurityParam, len(seenQuorums)) - i := 0 - for quorumID := range seenQuorums { - params[i] = &core.SecurityParam{ - QuorumID: core.QuorumID(quorumID), - AdversaryThreshold: quorumConfig.SecurityParams[quorumID].AdversaryThreshold, - ConfirmationThreshold: quorumConfig.SecurityParams[quorumID].ConfirmationThreshold, - } - err = params[i].Validate() - if err != nil { - return nil, fmt.Errorf("invalid request: %w", err) - } - i++ - } - - header := core.BlobRequestHeader{ - BlobAuthHeader: core.BlobAuthHeader{ - AccountID: req.PaymentHeader.AccountId, - }, - SecurityParams: params, - } - - blob := &core.Blob{ - RequestHeader: header, - Data: data, - } - - return blob, nil -} diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 18e4321ae8..ce5bce89cc 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -749,7 +749,10 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal panic("failed to create offchain store") } mt := meterer.NewMeterer(meterer.Config{}, mockState, store, logger) - mt.ChainPaymentState.RefreshOnchainPaymentState(context.Background(), nil) + err = mt.ChainPaymentState.RefreshOnchainPaymentState(context.Background(), nil) + if err != nil { + panic("failed to make initial query to the on-chain state") + } ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger) rateConfig := apiserver.RateConfig{ diff --git a/inabox/tests/integration_test.go b/inabox/tests/integration_test.go index 382454ca45..e30a0b2c90 100644 --- a/inabox/tests/integration_test.go +++ b/inabox/tests/integration_test.go @@ -40,6 +40,7 @@ var _ = Describe("Inabox Integration", func() { signer := auth.NewLocalBlobRequestSigner(privateKeyHex) privateKey, err := crypto.HexToECDSA(privateKeyHex[2:]) // Remove "0x" prefix + Expect(err).To(BeNil()) paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey.D.Bytes())) Expect(err).To(BeNil()) diff --git a/inabox/tests/payment_test.go b/inabox/tests/payment_test.go deleted file mode 100644 index b0d245139c..0000000000 --- a/inabox/tests/payment_test.go +++ /dev/null @@ -1,156 +0,0 @@ -package integration_test - -import ( - "bytes" - "context" - "crypto/rand" - "encoding/hex" - "time" - - "github.com/Layr-Labs/eigenda/api/clients" - disperserpb "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/core/auth" - "github.com/Layr-Labs/eigenda/disperser" - "github.com/Layr-Labs/eigenda/encoding" - "github.com/ethereum/go-ethereum/crypto" - - "github.com/Layr-Labs/eigenda/encoding/utils/codec" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -var _ = Describe("Inabox Integration", func() { - It("test payment metering", func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) - defer cancel() - - gasTipCap, gasFeeCap, err := ethClient.GetLatestGasCaps(ctx) - Expect(err).To(BeNil()) - - privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded" - signer := auth.NewLocalBlobRequestSigner(privateKeyHex) - // Disperser configs: rsv window 60s, min chargeable size 100 bytes, price per chargeable 100, global limit 500 - // -> need to check the mock, can't just use any account for the disperser client, consider using static wallets... - - // say with dataLength of 150 bytes, within a window, we can send 7 blobs with overflow of 50 bytes - // the later requests is then 250 bytes, try send 4 blobs within a second, 2 of them would fail but not charged for - // wait for a second, retry, and that should allow ondemand to work - privateKey, err := crypto.HexToECDSA(privateKeyHex[2:]) // Remove "0x" prefix - Expect(err).To(BeNil()) - paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey.D.Bytes())) - disp := clients.NewDisperserClient(&clients.Config{ - Hostname: "localhost", - Port: "32003", - Timeout: 10 * time.Second, - }, signer, clients.NewAccountant(&core.ActiveReservation{}, &core.OnDemandPayment{}, 60, 128, 128, paymentSigner, 3)) - - Expect(disp).To(Not(BeNil())) - - blobLength := uint32(4) - data := make([]byte, blobLength*encoding.BYTES_PER_SYMBOL) - _, err = rand.Read(data) - Expect(err).To(BeNil()) - - paddedData := codec.ConvertByPaddingEmptyByte(data) - - // requests that count towards either reservation or payments - paidBlobStatus := []disperser.BlobStatus{} - paidKeys := [][]byte{} - reservationBytesLimit := 1024 - paymentLimit := 512 - // TODO: payment calculation unit consistency - for i := 0; i < (int(reservationBytesLimit+paymentLimit))/int(blobLength); i++ { - blobStatus, key, err := disp.DispersePaidBlob(ctx, paddedData, []uint8{0}) - Expect(err).To(BeNil()) - Expect(key).To(Not(BeNil())) - Expect(blobStatus).To(Not(BeNil())) - Expect(*blobStatus).To(Equal(disperser.Processing)) - paidBlobStatus = append(paidBlobStatus, *blobStatus) - paidKeys = append(paidKeys, key) - } - - // requests that aren't covered by reservation or on-demand payment - blobStatus, key, err := disp.DispersePaidBlob(ctx, paddedData, []uint8{0}) - Expect(err).To(Not(BeNil())) - Expect(key).To(BeNil()) - Expect(blobStatus).To(BeNil()) - - ticker := time.NewTicker(time.Second * 1) - defer ticker.Stop() - - var replies = make([]*disperserpb.BlobStatusReply, len(paidBlobStatus)) - // now make sure all the paid blobs get confirmed - loop: - for { - select { - case <-ctx.Done(): - Fail("timed out") - case <-ticker.C: - notConfirmed := false - for i, key := range paidKeys { - reply, err := disp.GetBlobStatus(context.Background(), key) - Expect(err).To(BeNil()) - Expect(reply).To(Not(BeNil())) - status, err := disperser.FromBlobStatusProto(reply.GetStatus()) - Expect(err).To(BeNil()) - if *status != disperser.Confirmed { - notConfirmed = true - } - replies[i] = reply - paidBlobStatus[i] = *status - } - - if notConfirmed { - mineAnvilBlocks(numConfirmations + 1) - continue - } - - for _, reply := range replies { - blobHeader := blobHeaderFromProto(reply.GetInfo().GetBlobHeader()) - verificationProof := blobVerificationProofFromProto(reply.GetInfo().GetBlobVerificationProof()) - opts, err := ethClient.GetNoSendTransactOpts() - Expect(err).To(BeNil()) - tx, err := mockRollup.PostCommitment(opts, blobHeader, verificationProof) - Expect(err).To(BeNil()) - tx, err = ethClient.UpdateGas(ctx, tx, nil, gasTipCap, gasFeeCap) - Expect(err).To(BeNil()) - err = ethClient.SendTransaction(ctx, tx) - Expect(err).To(BeNil()) - mineAnvilBlocks(numConfirmations + 1) - _, err = ethClient.EnsureTransactionEvaled(ctx, tx, "PostCommitment") - Expect(err).To(BeNil()) - } - - break loop - } - } - for _, status := range paidBlobStatus { - Expect(status).To(Equal(disperser.Confirmed)) - } - - ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - for _, reply := range replies { - retrieved, err := retrievalClient.RetrieveBlob(ctx, - [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeaderHash()), - reply.GetInfo().GetBlobVerificationProof().GetBlobIndex(), - uint(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetReferenceBlockNumber()), - [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetBatchRoot()), - 0, // retrieve blob 1 from quorum 0 - ) - Expect(err).To(BeNil()) - restored := codec.RemoveEmptyByteFromPaddedBytes(retrieved) - Expect(bytes.TrimRight(restored, "\x00")).To(Equal(bytes.TrimRight(data, "\x00"))) - - _, err = retrievalClient.RetrieveBlob(ctx, - [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeaderHash()), - reply.GetInfo().GetBlobVerificationProof().GetBlobIndex(), - uint(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetReferenceBlockNumber()), - [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetBatchRoot()), - 1, // retrieve blob 1 from quorum 1 - ) - Expect(err).NotTo(BeNil()) - } - }) -}) diff --git a/test/integration_test.go b/test/integration_test.go index ffb1a90b71..738dd60a92 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -291,8 +291,8 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser panic("failed to make initial query to the on-chain state") } - meterer := meterer.NewMeterer(meterer.Config{}, mockState, offchainStore, logger) - server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, meterer, ratelimiter, rateConfig, testMaxBlobSize) + mt := meterer.NewMeterer(meterer.Config{}, mockState, offchainStore, logger) + server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, mt, ratelimiter, rateConfig, testMaxBlobSize) return TestDisperser{ batcher: batcher, diff --git a/tools/traffic/workers/mock_disperser.go b/tools/traffic/workers/mock_disperser.go index 22e960f4fb..43d9880ce2 100644 --- a/tools/traffic/workers/mock_disperser.go +++ b/tools/traffic/workers/mock_disperser.go @@ -33,15 +33,6 @@ func (m *MockDisperserClient) DisperseBlobAuthenticated( return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) } -func (m *MockDisperserClient) DispersePaidBlob( - ctx context.Context, - data []byte, - customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { - - args := m.mock.Called(data, customQuorums) - return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) -} - func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { args := m.mock.Called(key) return args.Get(0).(*disperser_rpc.BlobStatusReply), args.Error(1)