Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement v2 client GET functionality #972

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
68 changes: 68 additions & 0 deletions api/clients/codecs/mock/blob_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package mock

import mock "github.com/stretchr/testify/mock"

// BlobCodec is an autogenerated mock type for the BlobCodec type
type BlobCodec struct {
mock.Mock
}

// DecodeBlob provides a mock function with given fields: encodedData
func (_m *BlobCodec) DecodeBlob(encodedData []byte) ([]byte, error) {
ret := _m.Called(encodedData)

if len(ret) == 0 {
panic("no return value specified for DecodeBlob")
}

var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok {
return rf(encodedData)
}
if rf, ok := ret.Get(0).(func([]byte) []byte); ok {
r0 = rf(encodedData)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}

if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(encodedData)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// EncodeBlob provides a mock function with given fields: rawData
func (_m *BlobCodec) EncodeBlob(rawData []byte) ([]byte, error) {
ret := _m.Called(rawData)

if len(ret) == 0 {
panic("no return value specified for EncodeBlob")
}

var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok {
return rf(rawData)
}
if rf, ok := ret.Get(0).(func([]byte) []byte); ok {
r0 = rf(rawData)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}

if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(rawData)
} else {
r1 = ret.Error(1)
}

return r0, r1
}
2 changes: 1 addition & 1 deletion api/clients/mock/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewRelayClient() *MockRelayClient {
}

func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
args := c.Called(blobKey)
args := c.Called(ctx, relayKey, blobKey)
if args.Get(0) == nil {
return nil, args.Error(1)
}
Expand Down
33 changes: 33 additions & 0 deletions api/clients/v2/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package v2

import (
"github.com/Layr-Labs/eigenda/api/clients/codecs"
"time"
)

// VerificationMode is an enum that represents the different ways that a blob may be encoded/decoded between
// the client and the disperser.
type VerificationMode uint

const (
// TODO: write good docs here for IFFT and NoIFFT (I need to update my understanding to be able to write this)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks the "PointVerificationMode" field in config already documents what these two mean.

Actually, I am not entirely sure if we want to call out "how" (i.e. IFFT and then FFT) at the interface level. What about BlobFormat or something (point v.s. coeffs) that describes the blobs, not how they are processed underneath? cc @samlaf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks the "PointVerificationMode" field in config already documents what these two mean.

There's a description there, but I'm personally a bit confused by what it says, so I want to clarify to be able to write a good description here.

What about BlobFormat or something

Makes sense, I'll defer to you and Sam for the proper name here

IFFT VerificationMode = iota
NoIFFT
)

// EigenDAClientConfig contains configuration values for EigenDAClient
type EigenDAClientConfig struct {
// The blob encoding version to use when writing and reading blobs
BlobEncodingVersion codecs.BlobEncodingVersion

// If PointVerificationMode is IFFT, then the client codec will do an IFFT on blobs before they are dispersed, and
// will do an FFT on blobs after receiving them. This makes it possible to open points on the KZG commitment to prove
// that the field elements correspond to the commitment.
//
// If PointVerificationMode is NoIFFT, the blob must be supplied in its entirety, to perform a verification
// that any part of the data matches the KZG commitment.
PointVerificationMode VerificationMode

// The timeout duration for relay calls
RelayTimeout time.Duration
}
160 changes: 160 additions & 0 deletions api/clients/v2/eigenda_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package v2

import (
"context"
"errors"
"fmt"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/clients/codecs"
core "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/cockroachdb/errors/join"
"math/rand"
)

// EigenDAClient provides the ability to get blobs from the relay subsystem, and to send new blobs to the disperser.
//
// This struct is not threadsafe.
type EigenDAClient struct {
log logging.Logger
// doesn't need to be cryptographically secure, as it's only used to distribute load across relays
random *rand.Rand
config *EigenDAClientConfig
codec codecs.BlobCodec
relayClient clients.RelayClient
}

// BuildEigenDAClient builds an EigenDAClient from config structs.
func BuildEigenDAClient(
log logging.Logger,
config *EigenDAClientConfig,
relayClientConfig *clients.RelayClientConfig) (*EigenDAClient, error) {

relayClient, err := clients.NewRelayClient(relayClientConfig, log)
if err != nil {
return nil, fmt.Errorf("new relay client: %w", err)
}

codec, err := createCodec(config)
if err != nil {
return nil, err
}

return NewEigenDAClient(log, rand.New(rand.NewSource(rand.Int63())), config, relayClient, codec)
}

// NewEigenDAClient assembles an EigenDAClient from subcomponents that have already been constructed and initialized.
func NewEigenDAClient(
log logging.Logger,
random *rand.Rand,
config *EigenDAClientConfig,
relayClient clients.RelayClient,
codec codecs.BlobCodec) (*EigenDAClient, error) {

return &EigenDAClient{
log: log,
random: random,
config: config,
codec: codec,
relayClient: relayClient,
}, nil
}

// GetBlob iteratively attempts to retrieve a given blob with key blobKey from the relays listed in the blobCertificate.
//
// The relays are attempted in random order.
//
// The returned blob is decoded.
func (c *EigenDAClient) GetBlob(
ctx context.Context,
blobKey core.BlobKey,
blobCertificate core.BlobCertificate) ([]byte, error) {

relayKeyCount := len(blobCertificate.RelayKeys)

if relayKeyCount == 0 {
return nil, errors.New("relay key count is zero")
}

// create a randomized array of indices, so that it isn't always the first relay in the list which gets hit
indices := c.random.Perm(relayKeyCount)

// TODO (litt3): consider creating a utility which can deprioritize relays that fail to respond (or respond maliciously)

// iterate over relays in random order, until we are able to get the blob from someone
for _, val := range indices {
relayKey := blobCertificate.RelayKeys[val]

data, err := c.getBlobWithTimeout(ctx, relayKey, blobKey)

// if GetBlob returned an error, try calling a different relay
if err != nil {
c.log.Warn("blob couldn't be retrieved from relay", "blobKey", blobKey, "relayKey", relayKey, "error", err)
continue
}

// An honest relay should never send an empty blob
if len(data) == 0 {
c.log.Warn("blob received from relay had length 0", "blobKey", blobKey, "relayKey", relayKey)
continue
}

// An honest relay should never send a blob which cannot be decoded
decodedData, err := c.codec.DecodeBlob(data)
if err != nil {
c.log.Warn("error decoding blob from relay", "blobKey", blobKey, "relayKey", relayKey, "error", err)
continue
}

return decodedData, nil
}

return nil, fmt.Errorf("unable to retrieve blob from any relay. relay count: %d", relayKeyCount)
}

// getBlobWithTimeout attempts to get a blob from a given relay, and times out based on config.RelayTimeout
func (c *EigenDAClient) getBlobWithTimeout(
ctx context.Context,
relayKey core.RelayKey,
blobKey core.BlobKey) ([]byte, error) {

timeoutCtx, cancel := context.WithTimeout(ctx, c.config.RelayTimeout)
defer cancel()

return c.relayClient.GetBlob(timeoutCtx, relayKey, blobKey)
}

// GetCodec returns the codec the client uses for encoding and decoding blobs
func (c *EigenDAClient) GetCodec() codecs.BlobCodec {
return c.codec
}

// Close is responsible for calling close on all internal clients. This method will do its best to close all internal
// clients, even if some closes fail.
//
// Any and all errors returned from closing internal clients will be joined and returned.
//
// This method should only be called once.
func (c *EigenDAClient) Close() error {
relayClientErr := c.relayClient.Close()

// TODO: this is using join, since there will be more subcomponents requiring closing after adding PUT functionality
return join.Join(relayClientErr)
}

// createCodec creates the codec based on client config values
func createCodec(config *EigenDAClientConfig) (codecs.BlobCodec, error) {
lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.BlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("create low level codec: %w", err)
}

switch config.PointVerificationMode {
case NoIFFT:
return codecs.NewNoIFFTCodec(lowLevelCodec), nil
case IFFT:
return codecs.NewIFFTCodec(lowLevelCodec), nil
default:
return nil, fmt.Errorf("unsupported point verification mode: %d", config.PointVerificationMode)
}
}
Loading
Loading