Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor & data provider payouts #464

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ func NewApp(
runtime.NewKVStoreService(keys[tallytypes.StoreKey]),
app.WasmStorageKeeper,
app.BatchingKeeper,
app.DataProxyKeeper,
contractKeeper,
app.WasmKeeper,
authtypes.NewModuleAddress(govtypes.ModuleName).String(),
Expand Down
19 changes: 16 additions & 3 deletions proto/sedachain/tally/v1/tally.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
syntax = "proto3";
package sedachain.tally.v1;

import "cosmos_proto/cosmos.proto";
import "gogoproto/gogo.proto";
import "amino/amino.proto";

option go_package = "github.com/sedaprotocol/seda-chain/x/tally/types";

// Params defines the parameters for the tally module.
Expand All @@ -15,7 +19,16 @@ message Params {
// FilterGasCostMultiplierStdDev is the gas cost multiplier for a filter type
// stddev.
uint64 filter_gas_cost_multiplier_std_dev = 4;
// GasCostCommitment is the gas cost for a commitment corresponding to an
// expired data request.
uint64 gas_cost_commitment = 5;
// GasCostBase is the base gas cost for a data request.
uint64 gas_cost_base = 5;
// GasCostCommit is the gas cost for a commit charged under certain scenarios.
uint64 gas_cost_commit = 6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We charge the base gas cost for every data request.

When the request times out or when there is no basic consensus, we charge the commit cost for each executor that did commit (CalculateCommitterPayouts())

// BurnRatio is the ratio of the gas cost to be burned in case of reduced
// payout scenarios.
string burn_ratio = 7 [
(cosmos_proto.scalar) = "cosmos.Dec",
(gogoproto.customtype) = "cosmossdk.io/math.LegacyDec",
(amino.dont_omitempty) = true,
(gogoproto.nullable) = false
];
}
9 changes: 9 additions & 0 deletions x/batching/keeper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import (
"github.com/sedaprotocol/seda-chain/x/batching"
batchingkeeper "github.com/sedaprotocol/seda-chain/x/batching/keeper"
"github.com/sedaprotocol/seda-chain/x/batching/types"
dataproxykeeper "github.com/sedaprotocol/seda-chain/x/data-proxy/keeper"
dataproxytypes "github.com/sedaprotocol/seda-chain/x/data-proxy/types"
"github.com/sedaprotocol/seda-chain/x/pubkey"
pubkeykeeper "github.com/sedaprotocol/seda-chain/x/pubkey/keeper"
pubkeytypes "github.com/sedaprotocol/seda-chain/x/pubkey/types"
Expand Down Expand Up @@ -221,6 +223,12 @@ func initFixture(tb testing.TB) *fixture {
)
stakingKeeper.SetPubKeyKeeper(pubKeyKeeper)

dataProxyKeeper := dataproxykeeper.NewKeeper(
cdc,
runtime.NewKVStoreService(keys[dataproxytypes.StoreKey]),
authtypes.NewModuleAddress("gov").String(),
)

batchingKeeper := batchingkeeper.NewKeeper(
cdc,
runtime.NewKVStoreService(keys[types.StoreKey]),
Expand All @@ -238,6 +246,7 @@ func initFixture(tb testing.TB) *fixture {
runtime.NewKVStoreService(keys[tallytypes.StoreKey]),
wasmStorageKeeper,
batchingKeeper,
dataProxyKeeper,
contractKeeper,
viewKeeper,
authority.String(),
Expand Down
117 changes: 86 additions & 31 deletions x/tally/keeper/endblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"cosmossdk.io/math"

sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/sedaprotocol/seda-wasm-vm/tallyvm/v2"
Expand Down Expand Up @@ -81,7 +82,7 @@ func (k Keeper) ProcessTallies(ctx sdk.Context, coreContract sdk.AccAddress) err

// Loop through the list to apply filter, execute tally, and post
// execution result.
processedReqs := make(map[string]types.DistributionMessages)
processedReqs := make(map[string][]types.Distribution)
tallyResults := make([]TallyResult, len(tallyList))
dataResults := make([]batchingtypes.DataResult, len(tallyList))
for i, req := range tallyList {
Expand All @@ -97,23 +98,31 @@ func (k Keeper) ProcessTallies(ctx sdk.Context, coreContract sdk.AccAddress) err
SedaPayload: req.SedaPayload,
}

gasPriceInt, ok := math.NewIntFromString(req.GasPrice)
gasPrice, ok := math.NewIntFromString(req.GasPrice)
if !ok {
return fmt.Errorf("invalid gas price: %s", req.GasPrice) // TODO improve error handling
return fmt.Errorf("invalid gas price: %s", req.GasPrice)
}

var distMsgs types.DistributionMessages
if len(req.Commits) < int(req.ReplicationFactor) {
var gasCalc GasCalculation
switch {
case len(req.Commits) < int(req.ReplicationFactor):
dataResults[i].Result = []byte(fmt.Sprintf("need %d commits; received %d", req.ReplicationFactor, len(req.Commits)))
dataResults[i].ExitCode = TallyExitCodeNotEnoughCommits
k.Logger(ctx).Info("data request's number of commits did not meet replication factor", "request_id", req.ID)

distMsgs, err = k.CalculateCommitterPayouts(ctx, req, gasPriceInt)
if err != nil {
return err
}
} else {
_, tallyResults[i], distMsgs = k.FilterAndTally(ctx, req, params)
gasCalc.Executors, dataResults[i].GasUsed = CommitGasUsed(req, params.GasCostCommit, req.ExecGasLimit)
dataResults[i].GasUsed += params.GasCostBase
gasCalc.Burn += params.GasCostBase
case len(req.Reveals) == 0:
dataResults[i].Result = []byte("no reveals")
dataResults[i].ExitCode = TallyExitCodeNoReveals
k.Logger(ctx).Info("data request has no reveals", "request_id", req.ID)

gasCalc.Executors, dataResults[i].GasUsed = CommitGasUsed(req, params.GasCostCommit, req.ExecGasLimit)
dataResults[i].GasUsed += params.GasCostBase
gasCalc.Burn += params.GasCostBase
default:
_, tallyResults[i], gasCalc = k.FilterAndTally(ctx, req, params, gasPrice)
dataResults[i].Result = tallyResults[i].Result
//nolint:gosec // G115: We shouldn't get negative exit code anyway.
dataResults[i].ExitCode = uint32(tallyResults[i].ExitInfo.ExitCode)
Expand All @@ -124,7 +133,10 @@ func (k Keeper) ProcessTallies(ctx sdk.Context, coreContract sdk.AccAddress) err
k.Logger(ctx).Debug("tally result", "request_id", req.ID, "tally_result", tallyResults[i])
}

processedReqs[req.ID] = distMsgs
processedReqs[req.ID] = k.DistributionsFromGasCalculation(ctx, req.ID, gasCalc, gasPrice, params.BurnRatio)

// Total gas used should not exceed the gas limit specified by the request.
dataResults[i].GasUsed = min(req.TallyGasLimit+req.ExecGasLimit, dataResults[i].GasUsed)
dataResults[i].Id, err = dataResults[i].TryHash()
if err != nil {
return err
Expand Down Expand Up @@ -181,10 +193,8 @@ type TallyResult struct {

// FilterAndTally builds and applies filter, executes tally program,
// and calculates payouts.
func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request, params types.Params) (FilterResult, TallyResult, types.DistributionMessages) {
var result TallyResult

// Sort the reveals by their keys.
func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request, params types.Params, gasPrice math.Int) (FilterResult, TallyResult, GasCalculation) {
// Sort the reveals by their keys (executors).
keys := make([]string, len(req.Reveals))
i := 0
for k := range req.Reveals {
Expand All @@ -199,17 +209,26 @@ func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request, params types.
sort.Strings(reveals[i].ProxyPubKeys)
}

// Phase I: Filtering
// Phase 1: Filtering
filterResult, filterErr := ExecuteFilter(reveals, req.ConsensusFilter, req.ReplicationFactor, params)
result.Consensus = filterResult.Consensus
result.ProxyPubKeys = filterResult.ProxyPubKeys
result.TallyGasUsed += filterResult.GasUsed
filterResult.GasUsed += params.GasCostBase

// Begin tracking tally and gas calculation results.
result := TallyResult{
Consensus: filterResult.Consensus,
ProxyPubKeys: filterResult.ProxyPubKeys,
TallyGasUsed: filterResult.GasUsed,
}
var gasCalc GasCalculation
gasCalc.Burn += filterResult.GasUsed

// Phase II: Tally Program Execution
// Phase 2: Tally Program Execution
var vmRes tallyvm.VmResult
var tallyErr error
if filterErr == nil {
vmRes, err := k.ExecuteTallyProgram(ctx, req, filterResult, reveals)
if err != nil {
result.Result = []byte(err.Error())
vmRes, tallyErr = k.ExecuteTallyProgram(ctx, req, filterResult, reveals)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The execution needs to be aware of the base gas fee that was already consumed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the base gas be taken from the execution gas limit while the tally program uses the tally gas limit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the base gas is charged from the tally gas limit. If we were to take it from the execution gas limit all the overlay nodes would also have to consider the base gas cost when calculating their individual limits. And we said it is to cover some of the computational load a DR causes for a validator that are not charged directly like the filter and tally VM; think basic consensus check, median gas price calculation, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK added. Perhaps BuildFilter() should check if the filter gas cost is within the tally limit?

if tallyErr != nil {
result.Result = []byte(tallyErr.Error())
result.ExitInfo.ExitCode = TallyExitCodeExecError
} else {
result.Result = vmRes.Result
Expand All @@ -218,6 +237,7 @@ func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request, params types.
result.StdErr = vmRes.Stderr
}
result.TallyGasUsed += vmRes.GasUsed
gasCalc.Burn += vmRes.GasUsed
} else {
result.Result = []byte(filterErr.Error())
if errors.Is(filterErr, types.ErrInvalidFilterInput) {
Expand All @@ -227,14 +247,49 @@ func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request, params types.
}
}

// Phase III: Calculate Payouts
// TODO: Calculate gas used & payouts
result.ExecGasUsed = calculateExecGasUsed(reveals)
distMsgs := types.DistributionMessages{
Messages: []types.DistributionMessage{},
RefundType: types.DistributionTypeNoConsensus,
// Phase 3: Calculate data proxy and executor gas consumption.
remainingGasLimit := req.ExecGasLimit // track execution gas limit

// Calculate data proxy gas consumption if basic consensus was reached.
var proxyGasUsedPerExec uint64
if filterErr == nil || !errors.Is(filterErr, types.ErrNoBasicConsensus) {
var proxyGasUsed uint64
gasCalc.Proxies, proxyGasUsed = k.ProxyGasUsed(ctx, result.ProxyPubKeys, gasPrice, req.ExecGasLimit, req.ReplicationFactor)

remainingGasLimit -= proxyGasUsed
proxyGasUsedPerExec = proxyGasUsed / uint64(req.ReplicationFactor)
result.ExecGasUsed += proxyGasUsed
}
return filterResult, result, distMsgs

// Calculate executor gas consumption.
switch {
case errors.Is(filterErr, types.ErrNoBasicConsensus):
var commitGasUsed uint64
gasCalc.Executors, commitGasUsed = CommitGasUsed(req, params.GasCostCommit, remainingGasLimit)
result.ExecGasUsed += commitGasUsed
case errors.Is(filterErr, types.ErrInvalidFilterInput) || errors.Is(filterErr, types.ErrNoConsensus) || tallyErr != nil:
gasCalc.ReducedPayout = true
fallthrough
default: // filterErr == ErrConsensusInError || filterErr == nil
gasReports := make([]uint64, len(reveals))
for i, reveal := range reveals {
if proxyGasUsedPerExec > reveal.GasUsed {
gasReports[i] = 0
} else {
gasReports[i] = reveal.GasUsed - proxyGasUsedPerExec
}
}

var execGasUsed uint64
if areGasReportsUniform(gasReports) {
gasCalc.Executors, execGasUsed = ExecutorGasUsedUniform(keys, gasReports[0], remainingGasLimit, req.ReplicationFactor)
} else {
gasCalc.Executors, execGasUsed = ExecutorGasUsedDivergent(keys, gasReports, remainingGasLimit, req.ReplicationFactor)
}
result.ExecGasUsed += execGasUsed
}

return filterResult, result, gasCalc
}

// logErrAndRet logs the base error along with the request ID for
Expand Down
13 changes: 10 additions & 3 deletions x/tally/keeper/endblock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ func TestEndBlock(t *testing.T) {
expExitCode: keeper.TallyExitCodeNotEnoughCommits,
},
{
name: "reveal timeout",
name: "reveal timeout with no reveals",
memo: "cmV2ZWFsIHRpbWVvdXQ=",
replicationFactor: 2,
numCommits: 2,
numReveals: 0,
timeout: true,
expExitCode: keeper.TallyExitCodeFilterError,
expExitCode: keeper.TallyExitCodeNoReveals,
},
{
name: "reveal timeout with 2 reveals",
Expand All @@ -93,11 +93,18 @@ func TestEndBlock(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
drID := f.commitRevealDataRequest(t, tt.memo, "Ghkvq84TmIuEmU1ClubNxBjVXi8df5QhiNQEC5T8V6w=", tt.replicationFactor, tt.numCommits, tt.numReveals, tt.timeout)
drID, stakers := f.commitRevealDataRequest(t, tt.memo, "Ghkvq84TmIuEmU1ClubNxBjVXi8df5QhiNQEC5T8V6w=", tt.replicationFactor, tt.numCommits, tt.numReveals, tt.timeout)

beforeBalance := f.bankKeeper.GetBalance(f.Context(), stakers[0].address, bondDenom)

err := f.tallyKeeper.EndBlock(f.Context())
require.NoError(t, err)

// TODO query get_staker pending_withdrawal and check diff
afterBalance := f.bankKeeper.GetBalance(f.Context(), stakers[0].address, bondDenom)
diff := afterBalance.Sub(beforeBalance)
require.Equal(t, "0aseda", diff.String())

dataResult, err := f.batchingKeeper.GetLatestDataResult(f.Context(), drID)
require.NoError(t, err)
require.Equal(t, tt.expExitCode, dataResult.ExitCode)
Expand Down
12 changes: 6 additions & 6 deletions x/tally/keeper/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,9 @@ func invertErrors(errors []bool) []bool {
// outliers. It assumes that the reveals are sorted by their keys and that their
// proxy public keys are sorted.
func ExecuteFilter(reveals []types.RevealBody, filterInput string, replicationFactor uint16, params types.Params) (FilterResult, error) {
filter, err := BuildFilter(filterInput, replicationFactor, params)
if err != nil {
return FilterResult{}, types.ErrInvalidFilterInput.Wrap(err.Error())
}

var result FilterResult
result.Errors = make([]bool, len(reveals))
result.Outliers = make([]bool, len(reveals))
result.GasUsed = filter.GasCost()

// Determine basic consensus on tuple of (exit_code_success, proxy_pub_keys)
var maxFreq int
Expand All @@ -75,6 +69,12 @@ func ExecuteFilter(reveals []types.RevealBody, filterInput string, replicationFa
return result, types.ErrNoBasicConsensus
}

filter, err := BuildFilter(filterInput, replicationFactor, params)
if err != nil {
return result, types.ErrInvalidFilterInput.Wrap(err.Error())
}
result.GasUsed = filter.GasCost()

outliers, consensus := filter.ApplyFilter(reveals, result.Errors)
switch {
case countErrors(result.Errors)*3 > len(reveals)*2:
Expand Down
Loading
Loading