Skip to content

Commit

Permalink
refactor(x/tally): tally requests with missing reveals and separate f…
Browse files Browse the repository at this point in the history
…ilter from keeper
  • Loading branch information
hacheigriega committed Jan 13, 2025
1 parent c0ac549 commit a269136
Show file tree
Hide file tree
Showing 14 changed files with 369 additions and 182 deletions.
2 changes: 1 addition & 1 deletion proto/sedachain/tally/v1/tally.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ message Params {
uint64 filter_gas_cost_multiplier_mode = 3;
// FilterGasCostMultiplierStdDev is the gas cost multiplier for a filter type
// stddev.
uint64 filter_gas_cost_multiplier_stddev = 4;
uint64 filter_gas_cost_multiplier_std_dev = 4;
// GasCostCommitment is the gas cost for a commitment corresponding to an
// expired data request.
uint64 gas_cost_commitment = 5;
Expand Down
7 changes: 7 additions & 0 deletions scripts/mockgen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ if ! [ -x "$(command -v $mockgen_cmd)" ]; then
exit 1
fi

mockgen_version=$($mockgen_cmd -version 2>&1 | grep -E 'v[0-9]+\.[0-9]+\.[0-9]+' || echo "unknown")
required_version="v0.4.0"

if [ "$mockgen_version" != "$required_version" ]; then
echo "warning: required mockgen version is $required_version, but found $mockgen_version" >&2
fi

# Generate mocks for the given package
$mockgen_cmd -source=$GOPATH/pkg/mod/github.com/\!cosm\!wasm/[email protected]/x/wasm/types/exported_keepers.go -package testutil -destination=x/wasm-storage/keeper/testutil/wasm_keepers_mock.go
$mockgen_cmd -source=x/wasm-storage/types/expected_keepers.go -package testutil -destination=x/wasm-storage/keeper/testutil/expected_keepers_mock.go
Expand Down
125 changes: 58 additions & 67 deletions x/tally/keeper/endblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package keeper

import (
"encoding/json"
"errors"
"fmt"
"sort"
"strconv"
Expand Down Expand Up @@ -55,6 +56,11 @@ func (k Keeper) EndBlock(ctx sdk.Context) (err error) {
// ProcessTallies fetches from the core contract the list of requests
// to be tallied and then goes through it to filter and tally.
func (k Keeper) ProcessTallies(ctx sdk.Context, coreContract sdk.AccAddress) error {
params, err := k.GetParams(ctx)
if err != nil {
return err
}

// Fetch tally-ready data requests.
// TODO: Deal with offset and limits. (#313)
queryRes, err := k.wasmViewKeeper.QuerySmart(ctx, coreContract, []byte(`{"get_data_requests_by_status":{"status": "tallying", "offset": 0, "limit": 100}}`))
Expand Down Expand Up @@ -91,9 +97,7 @@ func (k Keeper) ProcessTallies(ctx sdk.Context, coreContract sdk.AccAddress) err
}

var distMsgs types.DistributionMessages
var err error
switch {
case len(req.Commits) == 0 || len(req.Commits) < int(req.ReplicationFactor):
if len(req.Commits) < int(req.ReplicationFactor) {
dataResults[i].Result = []byte(fmt.Sprintf("need %d commits; received %d", req.ReplicationFactor, len(req.Commits)))
dataResults[i].ExitCode = TallyExitCodeNotEnoughCommits
k.Logger(ctx).Info("data request's number of commits did not meet replication factor", "request_id", req.ID)
Expand All @@ -102,31 +106,16 @@ func (k Keeper) ProcessTallies(ctx sdk.Context, coreContract sdk.AccAddress) err
if err != nil {
return err
}
case len(req.Reveals) == 0 || len(req.Reveals) < int(req.ReplicationFactor):
dataResults[i].Result = []byte(fmt.Sprintf("need %d reveals; received %d", req.ReplicationFactor, len(req.Reveals)))
dataResults[i].ExitCode = TallyExitCodeNotEnoughReveals
k.Logger(ctx).Info("data request's number of reveals did not meet replication factor", "request_id", req.ID)

distMsgs, err = k.CalculateCommitterPayouts(ctx, req)
if err != nil {
return err
}
default:
tallyResults[i] = k.FilterAndTally(ctx, req)
dataResults[i].Result = tallyResults[i].result
} else {
_, tallyResults[i], distMsgs = k.FilterAndTally(ctx, req, params)
dataResults[i].Result = tallyResults[i].Result
//nolint:gosec // G115: We shouldn't get negative exit code anyway.
dataResults[i].ExitCode = uint32(tallyResults[i].exitInfo.ExitCode)
dataResults[i].Consensus = tallyResults[i].consensus
dataResults[i].GasUsed = tallyResults[i].execGasUsed + tallyResults[i].tallyGasUsed
dataResults[i].ExitCode = uint32(tallyResults[i].ExitInfo.ExitCode)
dataResults[i].Consensus = tallyResults[i].Consensus
dataResults[i].GasUsed = tallyResults[i].ExecGasUsed + tallyResults[i].TallyGasUsed

k.Logger(ctx).Info("completed tally", "request_id", req.ID)
k.Logger(ctx).Debug("tally result", "request_id", req.ID, "tally_result", tallyResults[i])

// TODO
distMsgs = types.DistributionMessages{
Messages: []types.DistributionMessage{},
RefundType: types.DistributionTypeNoConsensus,
}
}

processedReqs[req.ID] = distMsgs
Expand Down Expand Up @@ -160,12 +149,12 @@ func (k Keeper) ProcessTallies(ctx sdk.Context, coreContract sdk.AccAddress) err
sdk.NewAttribute(types.AttributeDataResultID, dataResults[i].Id),
sdk.NewAttribute(types.AttributeDataRequestID, dataResults[i].DrId),
sdk.NewAttribute(types.AttributeTypeConsensus, strconv.FormatBool(dataResults[i].Consensus)),
sdk.NewAttribute(types.AttributeTallyVMStdOut, strings.Join(tallyResults[i].stdout, "\n")),
sdk.NewAttribute(types.AttributeTallyVMStdErr, strings.Join(tallyResults[i].stderr, "\n")),
sdk.NewAttribute(types.AttributeExecGasUsed, fmt.Sprintf("%v", tallyResults[i].execGasUsed)),
sdk.NewAttribute(types.AttributeTallyGasUsed, fmt.Sprintf("%v", tallyResults[i].tallyGasUsed)),
sdk.NewAttribute(types.AttributeTallyVMStdOut, strings.Join(tallyResults[i].StdOut, "\n")),
sdk.NewAttribute(types.AttributeTallyVMStdErr, strings.Join(tallyResults[i].StdErr, "\n")),
sdk.NewAttribute(types.AttributeExecGasUsed, fmt.Sprintf("%v", tallyResults[i].ExecGasUsed)),
sdk.NewAttribute(types.AttributeTallyGasUsed, fmt.Sprintf("%v", tallyResults[i].TallyGasUsed)),
sdk.NewAttribute(types.AttributeTallyExitCode, fmt.Sprintf("%02x", dataResults[i].ExitCode)),
sdk.NewAttribute(types.AttributeProxyPubKeys, strings.Join(tallyResults[i].proxyPubKeys, "\n")),
sdk.NewAttribute(types.AttributeProxyPubKeys, strings.Join(tallyResults[i].ProxyPubKeys, "\n")),
),
)
}
Expand All @@ -174,70 +163,72 @@ func (k Keeper) ProcessTallies(ctx sdk.Context, coreContract sdk.AccAddress) err
}

type TallyResult struct {
consensus bool
stdout []string
stderr []string
result []byte
exitInfo tallyvm.ExitInfo
execGasUsed uint64
tallyGasUsed uint64
proxyPubKeys []string // data proxy pubkeys in basic consensus
Consensus bool
StdOut []string
StdErr []string
Result []byte
ExitInfo tallyvm.ExitInfo
ExecGasUsed uint64
TallyGasUsed uint64
ProxyPubKeys []string // data proxy pubkeys in basic consensus
}

// FilterAndTally builds and applies filter, executes tally program,
// and calculates payouts.
func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request) TallyResult {
func (k Keeper) FilterAndTally(ctx sdk.Context, req types.Request, params types.Params) (FilterResult, TallyResult, types.DistributionMessages) {
var result TallyResult

// Sort reveals and proxy public keys.
// Sort the reveals by their keys.
keys := make([]string, len(req.Reveals))
i := 0
for k := range req.Reveals {
keys[i] = k
i++
}
sort.Strings(keys)

reveals := make([]types.RevealBody, len(req.Reveals))
for i, k := range keys {
reveals[i] = req.Reveals[k]
sort.Strings(reveals[i].ProxyPubKeys)
}

// Phase I: Filtering
var filterResult FilterResult
filter, err := k.BuildFilter(ctx, req.ConsensusFilter, req.ReplicationFactor)
if err != nil {
result.result = []byte(err.Error())
result.exitInfo.ExitCode = TallyExitCodeInvalidFilterInput
} else {
filterResult, err = ApplyFilter(filter, reveals)
result.consensus = filterResult.Consensus
result.proxyPubKeys = filterResult.ProxyPubKeys
result.tallyGasUsed += filterResult.GasUsed

// Phase II: Tally Program Execution
filterResult, filterErr := ExecuteFilter(reveals, req.ConsensusFilter, req.ReplicationFactor, params)
result.Consensus = filterResult.Consensus
result.ProxyPubKeys = filterResult.ProxyPubKeys
result.TallyGasUsed += filterResult.GasUsed

// Phase II: Tally Program Execution
if filterErr == nil {
vmRes, err := k.ExecuteTallyProgram(ctx, req, filterResult, reveals)
if err != nil {
result.result = []byte(err.Error())
result.exitInfo.ExitCode = TallyExitCodeFilterError
result.Result = []byte(err.Error())
result.ExitInfo.ExitCode = TallyExitCodeExecError
} else {
vmRes, err := k.ExecuteTallyProgram(ctx, req, filterResult, reveals)
if err != nil {
result.result = []byte(err.Error())
result.exitInfo.ExitCode = TallyExitCodeExecError
} else {
result.result = vmRes.Result
result.exitInfo = vmRes.ExitInfo
result.stdout = vmRes.Stdout
result.stderr = vmRes.Stderr
}
result.tallyGasUsed += vmRes.GasUsed
result.Result = vmRes.Result
result.ExitInfo = vmRes.ExitInfo
result.StdOut = vmRes.Stdout
result.StdErr = vmRes.Stderr
}
result.TallyGasUsed += vmRes.GasUsed
} else {
result.Result = []byte(filterErr.Error())
if errors.Is(filterErr, types.ErrInvalidFilterInput) {
result.ExitInfo.ExitCode = TallyExitCodeInvalidFilterInput
} else {
result.ExitInfo.ExitCode = TallyExitCodeFilterError
}
}

// Phase III: Calculate Payouts
result.execGasUsed = calculateExecGasUsed(reveals)

return result
// TODO: Calculate gas used & payouts
result.ExecGasUsed = calculateExecGasUsed(reveals)
distMsgs := types.DistributionMessages{
Messages: []types.DistributionMessage{},
RefundType: types.DistributionTypeNoConsensus,
}
return filterResult, result, distMsgs
}

// logErrAndRet logs the base error along with the request ID for
Expand Down
6 changes: 3 additions & 3 deletions x/tally/keeper/endblock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestEndBlock(t *testing.T) {
numCommits: 2,
numReveals: 0,
timeout: true,
expExitCode: keeper.TallyExitCodeNotEnoughReveals,
expExitCode: keeper.TallyExitCodeFilterError,
},
{
name: "reveal timeout with 2 reveals",
Expand All @@ -88,12 +88,12 @@ func TestEndBlock(t *testing.T) {
numCommits: 3,
numReveals: 2,
timeout: true,
expExitCode: keeper.TallyExitCodeNotEnoughReveals,
expExitCode: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
drID := f.commitRevealDataRequest(t, tt.memo, tt.replicationFactor, tt.numCommits, tt.numReveals, tt.timeout)
drID := f.commitRevealDataRequest(t, tt.memo, "Ghkvq84TmIuEmU1ClubNxBjVXi8df5QhiNQEC5T8V6w=", tt.replicationFactor, tt.numCommits, tt.numReveals, tt.timeout)

err := f.tallyKeeper.EndBlock(f.Context())
require.NoError(t, err)
Expand Down
75 changes: 35 additions & 40 deletions x/tally/keeper/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"encoding/base64"
"fmt"

sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/sedaprotocol/seda-chain/x/tally/types"
)

Expand Down Expand Up @@ -43,45 +41,16 @@ func invertErrors(errors []bool) []bool {
return inverted
}

// BuildFilter builds a filter based on the requestor-provided input.
func (k Keeper) BuildFilter(ctx sdk.Context, filterInput string, replicationFactor uint16) (types.Filter, error) {
input, err := base64.StdEncoding.DecodeString(filterInput)
if err != nil {
return nil, err
}
if len(input) == 0 {
return nil, types.ErrInvalidFilterType
}

params, err := k.GetParams(ctx)
if err != nil {
return nil, err
}

var filter types.Filter
switch input[0] {
case filterTypeNone:
filter = types.NewFilterNone(params.FilterGasCostNone)
case filterTypeMode:
filter, err = types.NewFilterMode(input, params.FilterGasCostMultiplierMode, replicationFactor)
case filterTypeStdDev:
filter, err = types.NewFilterStdDev(input, params.FilterGasCostMultiplierStddev, replicationFactor)
default:
return nil, types.ErrInvalidFilterType
}
// ExecuteFilter builds a filter using the given filter input and applies it to
// the given reveals to determine consensus, proxy public keys in consensus, and
// outliers. It assumes that the reveals are sorted by their keys and that their
// proxy public keys are sorted.
func ExecuteFilter(reveals []types.RevealBody, filterInput string, replicationFactor uint16, params types.Params) (FilterResult, error) {
filter, err := BuildFilter(filterInput, replicationFactor, params)
if err != nil {
return nil, err
return FilterResult{}, types.ErrInvalidFilterInput.Wrap(err.Error())
}
return filter, nil
}

// ApplyFilter processes filter of the type specified in the first
// byte of consensus filter. It returns an outlier list, which is
// a boolean list where true at index i means that the reveal at
// index i is an outlier, consensus boolean, consensus data proxy
// public keys, and error. It assumes that the reveals and their
// proxy public keys are sorted.
func ApplyFilter(filter types.Filter, reveals []types.RevealBody) (FilterResult, error) {
var result FilterResult
result.Errors = make([]bool, len(reveals))
result.Outliers = make([]bool, len(reveals))
Expand All @@ -101,13 +70,12 @@ func ApplyFilter(filter types.Filter, reveals []types.RevealBody) (FilterResult,
maxFreq = freq[tuple]
}
}
if maxFreq*3 < len(reveals)*2 {
if maxFreq*3 < int(replicationFactor)*2 {
result.Consensus = false
return result, types.ErrNoBasicConsensus
}

outliers, consensus := filter.ApplyFilter(reveals, result.Errors)

switch {
case countErrors(result.Errors)*3 > len(reveals)*2:
result.Consensus = true
Expand All @@ -122,3 +90,30 @@ func ApplyFilter(filter types.Filter, reveals []types.RevealBody) (FilterResult,
return result, nil
}
}

// BuildFilter builds a filter based on the requestor-provided input.
func BuildFilter(filterInput string, replicationFactor uint16, params types.Params) (types.Filter, error) {
input, err := base64.StdEncoding.DecodeString(filterInput)
if err != nil {
return nil, err
}
if len(input) == 0 {
return nil, types.ErrInvalidFilterType
}

var filter types.Filter
switch input[0] {
case filterTypeNone:
filter = types.NewFilterNone(params.FilterGasCostNone)
case filterTypeMode:
filter, err = types.NewFilterMode(input, params.FilterGasCostMultiplierMode, replicationFactor)
case filterTypeStdDev:
filter, err = types.NewFilterStdDev(input, params.FilterGasCostMultiplierStdDev, replicationFactor)
default:
return nil, types.ErrInvalidFilterType
}
if err != nil {
return nil, err
}
return filter, nil
}
Loading

0 comments on commit a269136

Please sign in to comment.