Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT - add provider verification probing data #1911

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions proto/lavanet/lava/pairing/relay.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ message ProbeRequest {
uint64 guid = 1;
string spec_id = 2;
string api_interface = 3;
bool with_verifications = 4;
}

message ProbeReply {
Expand All @@ -26,6 +27,12 @@ message ProbeReply {
bytes finalized_blocks_hashes = 3;
uint64 lava_epoch = 4;
uint64 lava_latest_block = 5;
repeated Verification verifications = 6;
}

message Verification {
string name = 1;
bool passed = 2;
}

message RelaySession {
Expand Down
64 changes: 58 additions & 6 deletions protocol/chainlib/chain_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,73 @@ const (
ChainFetcherHeaderName = "X-LAVA-Provider"
)

type ChainFetcherIf interface {
type IChainFetcher interface {
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
FetchLatestBlockNum(ctx context.Context) (int64, error)
FetchBlockHashByNum(ctx context.Context, blockNum int64) (string, error)
FetchEndpoint() lavasession.RPCProviderEndpoint
Validate(ctx context.Context) error
GetVerificationsStatus() []*pairingtypes.Verification
CustomMessage(ctx context.Context, path string, data []byte, connectionType string, apiName string) ([]byte, error)
}

type ChainFetcher struct {
endpoint *lavasession.RPCProviderEndpoint
chainRouter ChainRouter
chainParser ChainParser
cache *performance.Cache
latestBlock int64
endpoint *lavasession.RPCProviderEndpoint
chainRouter ChainRouter
chainParser ChainParser
cache *performance.Cache
latestBlock int64
verificationsStatus common.SafeSyncMap[string, bool]
cachedVerifications atomic.Value // holds []*pairingtypes.Verification for faster access
cacheValid atomic.Bool
}

func (cf *ChainFetcher) GetVerificationsStatus() []*pairingtypes.Verification {
// Try to get from cache first
if cf.cacheValid.Load() {
value, ok := cf.cachedVerifications.Load().([]*pairingtypes.Verification)
if ok {
return value
} else {
utils.LavaFormatError("invalid usage of cachedVerifications, could not cast result into []*pairingtypes.Verification type", nil, utils.Attribute{Key: "cachedVerifications", Value: cf.cachedVerifications.Load()})
}
}

// If not in cache, create new slice
verifications := make([]*pairingtypes.Verification, 0)
cf.verificationsStatus.Range(func(name string, passed bool) bool {
verifications = append(verifications, &pairingtypes.Verification{
Name: name,
Passed: passed,
})
return true
})

// Store in cache
cf.cachedVerifications.Store(verifications)
cf.cacheValid.Store(true)
return verifications
}

// Add this method to invalidate cache when verification status changes
func (cf *ChainFetcher) invalidateVerificationsCache() {
cf.cacheValid.Store(false)
}

func (cf *ChainFetcher) FetchEndpoint() lavasession.RPCProviderEndpoint {
return *cf.endpoint
}

func (cf *ChainFetcher) getVerificationsKey(verification VerificationContainer, apiInterface string, chainId string) string {
key := chainId + "-" + apiInterface + "-" + verification.Name
if verification.Addon != "" {
key += "-" + verification.Addon
}
if verification.Extension != "" {
key += "-" + verification.Extension
}
return key
}

func (cf *ChainFetcher) Validate(ctx context.Context) error {
for _, url := range cf.endpoint.NodeUrls {
addons := url.Addons
Expand All @@ -67,6 +114,8 @@ func (cf *ChainFetcher) Validate(ctx context.Context) error {
if err != nil {
return err
}
// invalidating cache as value might change
defer cf.invalidateVerificationsCache()
for _, verification := range verifications {
if slices.Contains(url.SkipVerifications, verification.Name) {
utils.LavaFormatDebug("Skipping Verification", utils.LogAttr("verification", verification.Name))
Expand All @@ -81,9 +130,12 @@ func (cf *ChainFetcher) Validate(ctx context.Context) error {
}
}
if err != nil {
cf.verificationsStatus.Store(cf.getVerificationsKey(verification, cf.endpoint.ApiInterface, cf.endpoint.ChainID), false)
if verification.Severity == spectypes.ParseValue_Fail {
return utils.LavaFormatError("invalid Verification on provider startup", err, utils.Attribute{Key: "Addons", Value: addons}, utils.Attribute{Key: "verification", Value: verification.Name})
}
} else {
cf.verificationsStatus.Store(cf.getVerificationsKey(verification, cf.endpoint.ApiInterface, cf.endpoint.ChainID), true)
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion protocol/chainlib/chain_fetcher_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions protocol/common/safe_sync_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (ssm *SafeSyncMap[K, V]) Load(key K) (ret V, ok bool, err error) {
}
ret, ok = value.(V)
if !ok {
return ret, false, utils.LavaFormatError("invalid usage of syncmap, could not cast result into a PolicyUpdater", nil)
return ret, false, utils.LavaFormatError("invalid usage of sync map, could not cast result into V type", nil)
}
return ret, true, nil
}
Expand All @@ -37,7 +37,7 @@ func (ssm *SafeSyncMap[K, V]) LoadOrStore(key K, value V) (ret V, loaded bool, e
var ok bool
ret, ok = actual.(V)
if !ok {
return ret, false, utils.LavaFormatError("invalid usage of sync map, could not cast result into a PolicyUpdater", nil)
return ret, false, utils.LavaFormatError("invalid usage of sync map, could not cast result into V type", nil)
}
return ret, true, nil
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc
chainTracker.StartAndServe(ctx)
reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser)
mockReliabilityManager := NewMockReliabilityManager(reliabilityManager)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, nil, numberOfRetriesOnNodeErrorsProviderSide)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, nil, nil, numberOfRetriesOnNodeErrorsProviderSide)
listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health")
err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint)
require.NoError(t, err)
Expand Down
57 changes: 52 additions & 5 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/client/tx"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dgraph-io/ristretto/v2"
"github.com/lavanet/lava/v4/app"
"github.com/lavanet/lava/v4/protocol/chainlib"
"github.com/lavanet/lava/v4/protocol/chainlib/chainproxy"
Expand Down Expand Up @@ -48,6 +49,11 @@ const (
ShardIDFlagName = "shard-id"
StickinessHeaderName = "sticky-header"
DefaultShardID uint = 0

CacheMaxCost = 10 * 1024 // each item cost would be 1
CacheNumCounters = 1000 // expect 2000 items
VerificationsTTL = 30 * time.Second
VerificationsCacheKey = "verifications"
)

var (
Expand Down Expand Up @@ -123,7 +129,7 @@ type rpcProviderHealthCheckMetricsOptions struct {
type RPCProvider struct {
providerStateTracker ProviderStateTrackerInf
rpcProviderListeners map[string]*ProviderListener
lock sync.Mutex
lock sync.RWMutex
// all of the following members need to be concurrency proof
providerMetricsManager *metrics.ProviderMetricsManager
rewardServer *rewardserver.RewardServer
Expand All @@ -145,6 +151,19 @@ type RPCProvider struct {
staticSpecPath string
relayLoadLimit uint64
providerLoadManagersPerChain *common.SafeSyncMap[string, *ProviderLoadManager]

verificationsResponseCache *ristretto.Cache[string, []*pairingtypes.Verification]
allChainsAndAPIInterfacesVerificationStatusFetchers []IVerificationsStatus
}

func (rpcp *RPCProvider) AddVerificationStatusFetcher(fetcher IVerificationsStatus) {
rpcp.lock.Lock()
defer rpcp.lock.Unlock()
if rpcp.allChainsAndAPIInterfacesVerificationStatusFetchers == nil {
rpcp.allChainsAndAPIInterfacesVerificationStatusFetchers = []IVerificationsStatus{}
}
rpcp.allChainsAndAPIInterfacesVerificationStatusFetchers = append(rpcp.allChainsAndAPIInterfacesVerificationStatusFetchers, fetcher)
rpcp.verificationsResponseCache.Clear()
}

func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
Expand Down Expand Up @@ -363,6 +382,21 @@ func GetAllNodeUrlsInternalPaths(nodeUrls []common.NodeUrl) []string {
return paths
}

func (rpcp *RPCProvider) GetVerificationsStatus() []*pairingtypes.Verification {
verifications, ok := rpcp.verificationsResponseCache.Get(VerificationsCacheKey)
if ok {
return verifications
}
rpcp.lock.RLock()
defer rpcp.lock.RUnlock()
verifications = make([]*pairingtypes.Verification, 0)
for _, fetcher := range rpcp.allChainsAndAPIInterfacesVerificationStatusFetchers {
verifications = append(verifications, fetcher.GetVerificationsStatus()...)
}
rpcp.verificationsResponseCache.SetWithTTL(VerificationsCacheKey, verifications, 1, VerificationsTTL)
return verifications
}

func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator) error {
err := rpcProviderEndpoint.Validate()
if err != nil {
Expand Down Expand Up @@ -416,7 +450,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
rpcp.providerMetricsManager.SetLatestBlock(chainID, rpcProviderEndpoint.NetworkAddress.Address, uint64(block))
}
}
var chainFetcher chainlib.ChainFetcherIf
var chainFetcher chainlib.IChainFetcher
if enabled, _ := chainParser.DataReliabilityParams(); enabled {
chainFetcher = chainlib.NewChainFetcher(
ctx,
Expand All @@ -431,6 +465,8 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
utils.LavaFormatDebug("verifications only ChainFetcher for spec", utils.LogAttr("chainId", rpcEndpoint.ChainID))
chainFetcher = chainlib.NewVerificationsOnlyChainFetcher(ctx, chainRouter, chainParser, rpcProviderEndpoint)
}
// so we can fetch failed verifications we need to add the chainFetcher before returning
rpcp.AddVerificationStatusFetcher(chainFetcher)

// check the chain fetcher verification works, if it doesn't we disable the chain+apiInterface and this triggers a boot retry
err = chainFetcher.Validate(ctx)
Expand Down Expand Up @@ -500,11 +536,11 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
}

// Add the chain fetcher to the spec validator
// check the chain fetcher verification works, if it doesn't we disable the chain+apiInterface and this triggers a boot retry
err = specValidator.AddChainFetcher(ctx, &chainFetcher, chainID)
if err != nil {
return utils.LavaFormatError("panic severity critical error, failed validating chain", err, utils.Attribute{Key: "rpcProviderEndpoint", Value: rpcProviderEndpoint})
}

providerMetrics := rpcp.providerMetricsManager.AddProviderMetrics(chainID, apiInterface, rpcProviderEndpoint.NetworkAddress.Address)

reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, rpcp.providerStateTracker, rpcp.addr.String(), chainRouter, chainParser)
Expand All @@ -527,7 +563,8 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
utils.LavaFormatTrace("Creating provider node subscription manager", utils.LogAttr("rpcProviderEndpoint", rpcProviderEndpoint))
providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey)
}
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, loadManager, numberOfRetriesAllowedOnNodeErrors)

rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, loadManager, rpcp, numberOfRetriesAllowedOnNodeErrors)
// set up grpc listener
var listener *ProviderListener
func() {
Expand Down Expand Up @@ -799,7 +836,17 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
relayLoadLimit,
}

rpcProvider := RPCProvider{}
verificationsResponseCache, err := ristretto.NewCache(
&ristretto.Config[string, []*pairingtypes.Verification]{
NumCounters: CacheNumCounters,
MaxCost: CacheMaxCost,
BufferItems: 64,
IgnoreInternalCost: true,
})
if err != nil {
utils.LavaFormatFatal("failed setting up cache for verificationsResponseCache", err)
}
rpcProvider := RPCProvider{verificationsResponseCache: verificationsResponseCache}
err = rpcProvider.Start(&rpcProviderStartOptions)
return err
},
Expand Down
14 changes: 14 additions & 0 deletions protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type RPCProviderServer struct {
StaticProvider bool
providerStateMachine *ProviderStateMachine
providerLoadManager *ProviderLoadManager
verificationsStatusGetter IVerificationsStatus
}

type ReliabilityManagerInf interface {
Expand All @@ -92,6 +93,10 @@ type StateTrackerInf interface {
GetVirtualEpoch(epoch uint64) uint64
}

type IVerificationsStatus interface {
GetVerificationsStatus() (status []*pairingtypes.Verification)
}

func (rpcps *RPCProviderServer) SetProviderUniqueId(uniqueId string) {
rpcps.providerUniqueId = uniqueId
}
Expand All @@ -114,6 +119,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests(
providerNodeSubscriptionManager *chainlib.ProviderNodeSubscriptionManager,
staticProvider bool,
providerLoadManager *ProviderLoadManager,
verificationsStatusGetter IVerificationsStatus,
numberOfRetries int,
) {
rpcps.cache = cache
Expand All @@ -138,6 +144,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests(
rpcps.providerNodeSubscriptionManager = providerNodeSubscriptionManager
rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID, lavaprotocol.NewRelayRetriesManager(), chainRouter, numberOfRetries)
rpcps.providerLoadManager = providerLoadManager
rpcps.verificationsStatusGetter = verificationsStatusGetter

rpcps.initRelaysMonitor(ctx)
}
Expand Down Expand Up @@ -1232,12 +1239,19 @@ func (rpcps *RPCProviderServer) GetLatestBlockData(ctx context.Context, blockDis

func (rpcps *RPCProviderServer) Probe(ctx context.Context, probeReq *pairingtypes.ProbeRequest) (*pairingtypes.ProbeReply, error) {
latestB, _ := rpcps.reliabilityManager.GetLatestBlockNum()
verificationsStatus := []*pairingtypes.Verification{}
if probeReq.WithVerifications {
if rpcps.verificationsStatusGetter != nil {
verificationsStatus = rpcps.verificationsStatusGetter.GetVerificationsStatus()
}
}
probeReply := &pairingtypes.ProbeReply{
Guid: probeReq.GetGuid(),
LatestBlock: latestB,
FinalizedBlocksHashes: []byte{},
LavaEpoch: rpcps.providerSessionManager.GetCurrentEpochAtomic(),
LavaLatestBlock: uint64(rpcps.stateTracker.LatestBlock()),
Verifications: verificationsStatus,
}
trailer := metadata.Pairs(common.VersionMetadataKey, upgrade.GetCurrentVersion().ProviderVersion)
trailer.Append(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId)
Expand Down
Loading
Loading