Skip to content

Commit

Permalink
feat(x/tally): data provider payout
Browse files Browse the repository at this point in the history
  • Loading branch information
hacheigriega committed Jan 15, 2025
1 parent 28f0a6b commit bc9e427
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 52 deletions.
2 changes: 2 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ import (
"github.com/sedaprotocol/seda-chain/app/keepers"
appparams "github.com/sedaprotocol/seda-chain/app/params"
"github.com/sedaprotocol/seda-chain/app/utils"

// Used in cosmos-sdk when registering the route for swagger docs.
_ "github.com/sedaprotocol/seda-chain/client/docs/statik"
"github.com/sedaprotocol/seda-chain/cmd/sedad/gentx"
Expand Down Expand Up @@ -685,6 +686,7 @@ func NewApp(
runtime.NewKVStoreService(keys[tallytypes.StoreKey]),
app.WasmStorageKeeper,
app.BatchingKeeper,
app.DataProxyKeeper,
contractKeeper,
app.WasmKeeper,
authtypes.NewModuleAddress(govtypes.ModuleName).String(),
Expand Down
9 changes: 9 additions & 0 deletions x/batching/keeper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import (
"github.com/sedaprotocol/seda-chain/x/batching"
batchingkeeper "github.com/sedaprotocol/seda-chain/x/batching/keeper"
"github.com/sedaprotocol/seda-chain/x/batching/types"
dataproxykeeper "github.com/sedaprotocol/seda-chain/x/data-proxy/keeper"
dataproxytypes "github.com/sedaprotocol/seda-chain/x/data-proxy/types"
"github.com/sedaprotocol/seda-chain/x/pubkey"
pubkeykeeper "github.com/sedaprotocol/seda-chain/x/pubkey/keeper"
pubkeytypes "github.com/sedaprotocol/seda-chain/x/pubkey/types"
Expand Down Expand Up @@ -221,6 +223,12 @@ func initFixture(tb testing.TB) *fixture {
)
stakingKeeper.SetPubKeyKeeper(pubKeyKeeper)

dataProxyKeeper := dataproxykeeper.NewKeeper(
cdc,
runtime.NewKVStoreService(keys[dataproxytypes.StoreKey]),
authtypes.NewModuleAddress("gov").String(),
)

batchingKeeper := batchingkeeper.NewKeeper(
cdc,
runtime.NewKVStoreService(keys[types.StoreKey]),
Expand All @@ -238,6 +246,7 @@ func initFixture(tb testing.TB) *fixture {
runtime.NewKVStoreService(keys[tallytypes.StoreKey]),
wasmStorageKeeper,
batchingKeeper,
dataProxyKeeper,
contractKeeper,
viewKeeper,
authority.String(),
Expand Down
52 changes: 38 additions & 14 deletions x/tally/keeper/endblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (k Keeper) ProcessTallies(ctx sdk.Context, coreContract sdk.AccAddress) err
dataResults[i].ExitCode = TallyExitCodeNoReveals
k.Logger(ctx).Info("data request has no reveals", "request_id", req.ID)

distMsgs, err = k.CalculateCommitterPayouts(ctx, req)
distMsgs, err = k.CalculateCommitterPayouts(ctx, req, gasPriceInt)
if err != nil {
return err
}
Expand All @@ -127,6 +127,7 @@ func (k Keeper) ProcessTallies(ctx sdk.Context, coreContract sdk.AccAddress) err
if !ok {
return fmt.Errorf("invalid gas price: %s", req.GasPrice) // TODO improve error handling
}
// TODO also make sure gas price is not 0

_, tallyResults[i], distMsgs = k.FilterAndTally(ctx, req, params, gasPriceInt)
dataResults[i].Result = tallyResults[i].Result
Expand Down Expand Up @@ -214,13 +215,13 @@ func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request, params types.
sort.Strings(reveals[i].ProxyPubKeys)
}

// Phase I: Filtering
// Phase 1: Filtering
filterResult, filterErr := ExecuteFilter(reveals, req.ConsensusFilter, req.ReplicationFactor, params)
result.Consensus = filterResult.Consensus
result.ProxyPubKeys = filterResult.ProxyPubKeys
result.TallyGasUsed += filterResult.GasUsed

// Phase II: Tally Program Execution
// Phase 2: Tally Program Execution
if filterErr == nil {
vmRes, err := k.ExecuteTallyProgram(ctx, req, filterResult, reveals)
if err != nil {
Expand All @@ -242,21 +243,44 @@ func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request, params types.
}
}

// Phase III: Calculate Payouts
var distMsgs types.DistributionMessages
if filterErr == nil || errors.Is(filterErr, types.ErrConsensusInError) {
var gasUsed uint64
if req.ReplicationFactor == 1 || areGasReportsUniform(reveals) {
distMsgs.Messages, gasUsed = CalculateUniformPayouts(keys, reveals[0].GasUsed, req.ExecGasLimit, req.ReplicationFactor, gasPrice)
// Phase 3: Calculate Payouts
// Calculate data proxy payouts if basic consensus was reached.
var proxyDistMsgs types.DistributionMessages
var proxyGasUsedPerExec uint64
if filterErr == nil || !errors.Is(filterErr, types.ErrNoBasicConsensus) {
var err error
proxyDistMsgs, proxyGasUsedPerExec, err = k.CalculateDataProxyPayouts(ctx, result.ProxyPubKeys, gasPrice)
if err != nil {

Check failure on line 253 in x/tally/keeper/endblock.go

View workflow job for this annotation

GitHub Actions / golangci

empty-block: this block is empty, you can remove it (revive)
// TODO error handling
}
}

// Calculate executor payouts.
var execDistMsgs types.DistributionMessages
if filterErr == nil || !errors.Is(filterErr, types.ErrNoBasicConsensus) {
gasReports := make([]uint64, len(reveals))
for i, reveal := range reveals {
gasReports[i] = max(0, reveal.GasUsed-proxyGasUsedPerExec)
}

var execGasUsed uint64
if req.ReplicationFactor == 1 || areGasReportsUniform(gasReports) {
execDistMsgs.Messages, execGasUsed = CalculateUniformPayouts(keys, gasReports[0], req.ExecGasLimit, req.ReplicationFactor, gasPrice)
} else {
distMsgs.Messages, gasUsed = CalculateDivergentPayouts(keys, reveals, req.ExecGasLimit, req.ReplicationFactor, gasPrice)
execDistMsgs.Messages, execGasUsed = CalculateDivergentPayouts(keys, gasReports, req.ExecGasLimit, req.ReplicationFactor, gasPrice)
}
result.ExecGasUsed = execGasUsed
} else {
var err error
execDistMsgs, err = k.CalculateCommitterPayouts(ctx, req, gasPrice)
if err != nil {

Check failure on line 276 in x/tally/keeper/endblock.go

View workflow job for this annotation

GitHub Actions / golangci

empty-block: this block is empty, you can remove it (revive)
// TODO error handling
}
distMsgs.RefundType = types.DistributionTypeExecutorReward // TODO double check
result.ExecGasUsed = gasUsed
}
// TODO: else pay committers?

return filterResult, result, distMsgs
return filterResult, result, types.DistributionMessages{
Messages: append(proxyDistMsgs.Messages, execDistMsgs.Messages...),
}
}

// logErrAndRet logs the base error along with the request ID for
Expand Down
10 changes: 10 additions & 0 deletions x/tally/keeper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import (
"github.com/sedaprotocol/seda-chain/integration"
batchingkeeper "github.com/sedaprotocol/seda-chain/x/batching/keeper"
batchingtypes "github.com/sedaprotocol/seda-chain/x/batching/types"
dataproxykeeper "github.com/sedaprotocol/seda-chain/x/data-proxy/keeper"
dataproxytypes "github.com/sedaprotocol/seda-chain/x/data-proxy/types"
pubkeykeeper "github.com/sedaprotocol/seda-chain/x/pubkey/keeper"
pubkeytypes "github.com/sedaprotocol/seda-chain/x/pubkey/types"
"github.com/sedaprotocol/seda-chain/x/staking"
Expand Down Expand Up @@ -220,6 +222,12 @@ func initFixture(t testing.TB) *fixture {
)
stakingKeeper.SetPubKeyKeeper(pubKeyKeeper)

dataProxyKeeper := dataproxykeeper.NewKeeper(
cdc,
runtime.NewKVStoreService(keys[dataproxytypes.StoreKey]),
authtypes.NewModuleAddress("gov").String(),
)

batchingKeeper := batchingkeeper.NewKeeper(
cdc,
runtime.NewKVStoreService(keys[batchingtypes.StoreKey]),
Expand All @@ -231,11 +239,13 @@ func initFixture(t testing.TB) *fixture {
wasmKeeper,
addresscodec.NewBech32Codec(params.Bech32PrefixValAddr),
)

tallyKeeper := keeper.NewKeeper(
cdc,
runtime.NewKVStoreService(keys[types.StoreKey]),
wasmStorageKeeper,
batchingKeeper,
dataProxyKeeper,
contractKeeper,
wasmKeeper,
authority.String(),
Expand Down
4 changes: 3 additions & 1 deletion x/tally/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type Keeper struct {
wasmStorageKeeper types.WasmStorageKeeper
batchingKeeper types.BatchingKeeper
dataProxyKeeper types.DataProxyKeeper
wasmKeeper wasmtypes.ContractOpsKeeper
wasmViewKeeper wasmtypes.ViewKeeper
authority string
Expand All @@ -26,12 +27,13 @@ type Keeper struct {
params collections.Item[types.Params]
}

func NewKeeper(cdc codec.BinaryCodec, storeService storetypes.KVStoreService, wsk types.WasmStorageKeeper, bk types.BatchingKeeper, wk wasmtypes.ContractOpsKeeper, wvk wasmtypes.ViewKeeper, authority string) Keeper {
func NewKeeper(cdc codec.BinaryCodec, storeService storetypes.KVStoreService, wsk types.WasmStorageKeeper, bk types.BatchingKeeper, dpk types.DataProxyKeeper, wk wasmtypes.ContractOpsKeeper, wvk wasmtypes.ViewKeeper, authority string) Keeper {
sb := collections.NewSchemaBuilder(storeService)

k := Keeper{
wasmStorageKeeper: wsk,
batchingKeeper: bk,
dataProxyKeeper: dpk,
wasmKeeper: wk,
wasmViewKeeper: wvk,
params: collections.NewItem(sb, types.ParamsPrefix, "params", codec.CollValue[types.Params](cdc)),
Expand Down
76 changes: 52 additions & 24 deletions x/tally/keeper/payout.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package keeper

import (
"encoding/hex"
"sort"

"cosmossdk.io/math"
Expand All @@ -10,13 +11,44 @@ import (
"github.com/sedaprotocol/seda-chain/x/tally/types"
)

// CalculateCommitterPayouts constructs distribution messages that
// pay the fixed gas cost for each commiter of a given data request.
func (k Keeper) CalculateCommitterPayouts(ctx sdk.Context, req types.Request, gasPrice math.Int) (types.DistributionMessages, error) {
result := types.DistributionMessages{
Messages: []types.DistributionMessage{},
RefundType: types.DistributionTypeTimedOut,
// CalculateDataProxyPayouts returns payouts for the data proxies and
// returns the the gas used by the data proxies per executor.
func (k Keeper) CalculateDataProxyPayouts(ctx sdk.Context, proxyPubKeys []string, gasPrice math.Int) (types.DistributionMessages, uint64, error) {
var result types.DistributionMessages
if len(proxyPubKeys) == 0 {
return result, 0, nil
}

gasUsed := math.NewInt(0)
distMsgs := make([]types.DistributionMessage, len(proxyPubKeys))
for i, pubKey := range proxyPubKeys {
pubKeyBytes, err := hex.DecodeString(pubKey)
if err != nil {
return types.DistributionMessages{}, 0, err
}
proxyConfig, err := k.dataProxyKeeper.GetDataProxyConfig(ctx, pubKeyBytes)
if err != nil {
return types.DistributionMessages{}, 0, err
}
gasUsed = gasUsed.Add(proxyConfig.Fee.Amount.Quo(gasPrice))

distMsgs[i] = types.DistributionMessage{
Kind: types.DistributionKind{
ExecutorReward: &types.DistributionExecutorReward{
Identity: pubKey,
Amount: proxyConfig.Fee.Amount,
},
},
}
}
result.Messages = distMsgs
return types.DistributionMessages{}, gasUsed.Uint64(), nil // TODO may panic
}

// CalculateCommitterPayouts returns the fixed payouts for the committers of a
// given data request.
func (k Keeper) CalculateCommitterPayouts(ctx sdk.Context, req types.Request, gasPrice math.Int) (types.DistributionMessages, error) {
var result types.DistributionMessages
if len(req.Commits) == 0 {
return result, nil
}
Expand Down Expand Up @@ -44,18 +76,17 @@ func (k Keeper) CalculateCommitterPayouts(ctx sdk.Context, req types.Request, ga
Amount: payout,
},
},
Type: types.DistributionTypeTimedOut,
}
}
result.Messages = distMsgs
return result, nil
}

// CalculateUniformPayouts calculates payouts for the executors when their gas
// reports are uniformly at "gasUsed". It also returns the total execution gas
// reports are uniformly at "gasReport". It also returns the total execution gas
// consumption.
func CalculateUniformPayouts(executors []string, gasUsed, execGasLimit uint64, replicationFactor uint16, gasPrice math.Int) ([]types.DistributionMessage, uint64) {
adjGasUsed := max(gasUsed, execGasLimit/uint64(replicationFactor))
func CalculateUniformPayouts(executors []string, gasReport, execGasLimit uint64, replicationFactor uint16, gasPrice math.Int) ([]types.DistributionMessage, uint64) {
adjGasUsed := max(gasReport, execGasLimit/uint64(replicationFactor))
payout := gasPrice.Mul(math.NewIntFromUint64(adjGasUsed))

distMsgs := make([]types.DistributionMessage, len(executors))
Expand All @@ -67,22 +98,20 @@ func CalculateUniformPayouts(executors []string, gasUsed, execGasLimit uint64, r
Amount: payout,
},
},
Type: types.DistributionTypeExecutorReward,
}
}
return distMsgs, adjGasUsed * uint64(replicationFactor)
}

// CalculateDivergentPayouts calculates payouts for the executors of the given
// reveals when their gas reports are divergent. It also returns the total
// execution gas consumption.
// CalculateDivergentPayouts calculates payouts for the executors given their
// divergent gas reports. It also returns the total execution gas consumption.
// It assumes that the i-th executor is the one who revealed the i-th reveal.
func CalculateDivergentPayouts(executors []string, reveals []types.RevealBody, execGasLimit uint64, replicationFactor uint16, gasPrice math.Int) ([]types.DistributionMessage, uint64) {
adjGasUsed := make([]uint64, len(reveals))
func CalculateDivergentPayouts(executors []string, gasReports []uint64, execGasLimit uint64, replicationFactor uint16, gasPrice math.Int) ([]types.DistributionMessage, uint64) {
adjGasUsed := make([]uint64, len(gasReports))
var lowestGasUsed uint64
var lowestReporterIndex int
for i, reveal := range reveals {
adjGasUsed[i] = min(reveal.GasUsed, execGasLimit/uint64(replicationFactor))
for i, gasReport := range gasReports {
adjGasUsed[i] = min(gasReport, execGasLimit/uint64(replicationFactor))
if i == 0 || adjGasUsed[i] < lowestGasUsed {
lowestReporterIndex = i
lowestGasUsed = adjGasUsed[i]
Expand All @@ -107,7 +136,6 @@ func CalculateDivergentPayouts(executors []string, reveals []types.RevealBody, e
Amount: payout,
},
},
Type: types.DistributionTypeExecutorReward,
}
}
return distMsgs, totalGasUsed
Expand All @@ -122,13 +150,13 @@ func median(arr []uint64) uint64 {

// areGasReportsUniform returns true if the gas reports of the given reveals are
// uniform.
func areGasReportsUniform(reveals []types.RevealBody) bool {
if len(reveals) == 0 {
func areGasReportsUniform(reports []uint64) bool {
if len(reports) == 0 {
return true
}
firstGas := reveals[0].GasUsed
for i := 1; i < len(reveals); i++ {
if reveals[i].GasUsed != firstGas {
firstGas := reports[0]
for i := 1; i < len(reports); i++ {
if reports[i] != firstGas {
return false
}
}
Expand Down
14 changes: 1 addition & 13 deletions x/tally/types/abci_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,11 @@ func (u *RevealBody) TryHash() (string, error) {
}

type DistributionMessages struct {
Messages []DistributionMessage `json:"messages"`
RefundType DistributionType `json:"refund_type"`
Messages []DistributionMessage `json:"messages"`
}

type DistributionMessage struct {
Kind DistributionKind `json:"kind"`
Type DistributionType `json:"type"`
}

type DistributionKind struct {
Expand All @@ -135,16 +133,6 @@ type DistributionExecutorReward struct {
Identity string `json:"identity"`
}

type DistributionType string

const (
DistributionTypeTallyReward DistributionType = "tally_reward"
DistributionTypeExecutorReward DistributionType = "executor_reward"
DistributionTypeTimedOut DistributionType = "timed_out"
DistributionTypeNoConsensus DistributionType = "no_consensus"
DistributionTypeRemainderRefund DistributionType = "remainder_refund"
)

func MarshalSudoRemoveDataRequests(processedReqs map[string]DistributionMessages) ([]byte, error) {
return json.Marshal(struct {
SudoRemoveDataRequests struct {
Expand Down
5 changes: 5 additions & 0 deletions x/tally/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ import (
"context"

batchingtypes "github.com/sedaprotocol/seda-chain/x/batching/types"
dataproxytypes "github.com/sedaprotocol/seda-chain/x/data-proxy/types"
)

type BatchingKeeper interface {
SetDataResultForBatching(ctx context.Context, result batchingtypes.DataResult) error
}

type DataProxyKeeper interface {
GetDataProxyConfig(ctx context.Context, pubKey []byte) (result dataproxytypes.ProxyConfig, err error)
}

0 comments on commit bc9e427

Please sign in to comment.