Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(coordinator): assign static prover first and avoid reassigning failed task to same prover #1584

Merged
merged 33 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
791fcaf
feat(coordinator): assign static prover first and avoid reassigning f…
yiweichi Dec 30, 2024
71acdb3
fix: lint
yiweichi Dec 30, 2024
8ce5121
fix: GetUnassignedBatchCount
yiweichi Dec 30, 2024
3e0589e
Merge branch 'develop' into feat-coordinator-assign-logic
yiweichi Dec 30, 2024
df92616
chore: remove extra files
yiweichi Dec 30, 2024
a75075d
fix: err log
yiweichi Dec 30, 2024
1c5d88d
fix: orm GetTaskOfProver
yiweichi Dec 30, 2024
e4c0779
fix: comments
yiweichi Dec 30, 2024
0d1c303
chore: auto version bump [bot]
yiweichi Jan 6, 2025
25796df
fix: comments
yiweichi Jan 8, 2025
e72523d
Merge branch 'feat-coordinator-assign-logic' of https://github.com/sc…
yiweichi Jan 8, 2025
e51e95d
chore: auto version bump [bot]
yiweichi Jan 8, 2025
e0aa5be
fix: lint
yiweichi Jan 8, 2025
c2075c8
Merge branch 'develop' into feat-coordinator-assign-logic
yiweichi Jan 8, 2025
77d3a75
Update coordinator/internal/logic/provertask/bundle_prover_task.go
yiweichi Jan 9, 2025
5ee2388
Update coordinator/internal/logic/provertask/batch_prover_task.go
yiweichi Jan 9, 2025
816d5ce
Update coordinator/internal/logic/provertask/batch_prover_task.go
yiweichi Jan 9, 2025
8f0ab50
Update coordinator/internal/logic/provertask/bundle_prover_task.go
yiweichi Jan 9, 2025
d8ad10f
Update coordinator/internal/logic/provertask/chunk_prover_task.go
yiweichi Jan 9, 2025
64e88ce
Update coordinator/internal/orm/prover_task.go
yiweichi Jan 9, 2025
dd3c9e1
Update coordinator/internal/logic/provertask/chunk_prover_task.go
yiweichi Jan 9, 2025
b11f010
feat: handle could prover name when assign task
yiweichi Jan 12, 2025
5ad00e7
feat: handle could prover name when assign task
yiweichi Jan 12, 2025
37deef1
fix: test
yiweichi Jan 12, 2025
078ac88
add jwt IdentityHandler
yiweichi Jan 12, 2025
27ae174
fix: test
yiweichi Jan 12, 2025
37ac215
fix: test
yiweichi Jan 13, 2025
cc57700
fix: typos
yiweichi Jan 13, 2025
f725c2e
fix: comments
yiweichi Jan 13, 2025
aae19ab
fix: lint
yiweichi Jan 13, 2025
35f22b4
fix: login verify
yiweichi Jan 13, 2025
1f05c6e
Apply suggestions from code review
yiweichi Jan 14, 2025
1e63677
add comments for IsExternalProverNameMatch
yiweichi Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"runtime/debug"
)

var tag = "v4.4.85"
var tag = "v4.4.86"

var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {
Expand Down
1 change: 1 addition & 0 deletions coordinator/conf/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"prover_manager": {
"provers_per_session": 1,
"session_attempts": 5,
"external_prover_threshold": 32,
"bundle_collection_time_sec": 180,
"batch_collection_time_sec": 180,
"chunk_collection_time_sec": 180,
Expand Down
2 changes: 2 additions & 0 deletions coordinator/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type ProverManager struct {
// Number of attempts that a session can be retried if previous attempts failed.
// Currently we only consider proving timeout as failure here.
SessionAttempts uint8 `json:"session_attempts"`
// Threshold for activating the external prover based on unassigned task count.
ExternalProverThreshold int64 `json:"external_prover_threshold"`
// Zk verifier config.
Verifier *VerifierConfig `json:"verifier"`
// BatchCollectionTimeSec batch Proof collection time (in seconds).
Expand Down
24 changes: 24 additions & 0 deletions coordinator/internal/logic/provertask/batch_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -63,6 +64,18 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
yiweichi marked this conversation as resolved.
Show resolved Hide resolved
unassignedBatchCount, getCountError := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if getCountError != nil {
log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
yiweichi marked this conversation as resolved.
Show resolved Hide resolved
return nil, ErrCoordinatorInternalFailure
yiweichi marked this conversation as resolved.
Show resolved Hide resolved
}
// Assign external prover if unassigned task number exceeds threshold
if unassignedBatchCount < bp.cfg.ProverManager.ExternalProverThreshold {
return nil, nil
}
}

var batchTask *orm.Batch
for i := 0; i < 5; i++ {
var getTaskError error
Expand All @@ -88,6 +101,17 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, nil
}

// Don't dispatch the same failing job to the same prover
proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
colinlyguo marked this conversation as resolved.
Show resolved Hide resolved
if getTaskError != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError)
yiweichi marked this conversation as resolved.
Show resolved Hide resolved
return nil, ErrCoordinatorInternalFailure
}
if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid {
log.Debug("get empty batch, the prover already failed this task", "height", getTaskParameter.ProverHeight)
return nil, nil
}

rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
Expand Down
24 changes: 24 additions & 0 deletions coordinator/internal/logic/provertask/bundle_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -63,6 +64,18 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat

maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
unassignedBundleCount, getCountError := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if getCountError != nil {
log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
yiweichi marked this conversation as resolved.
Show resolved Hide resolved
yiweichi marked this conversation as resolved.
Show resolved Hide resolved
return nil, ErrCoordinatorInternalFailure
}
// Assign external prover if unassigned task number exceeds threshold
if unassignedBundleCount < bp.cfg.ProverManager.ExternalProverThreshold {
return nil, nil
}
}

var bundleTask *orm.Bundle
for i := 0; i < 5; i++ {
var getTaskError error
Expand All @@ -88,6 +101,17 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
return nil, nil
}

// Don't dispatch the same failing job to the same prover
proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
if getTaskError != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError)
yiweichi marked this conversation as resolved.
Show resolved Hide resolved
return nil, ErrCoordinatorInternalFailure
}
if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid {
log.Debug("get empty bundle, the prover already failed this task", "height", getTaskParameter.ProverHeight)
return nil, nil
}

rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update bundle attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
Expand Down
23 changes: 23 additions & 0 deletions coordinator/internal/logic/provertask/chunk_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -61,6 +62,18 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
unassignedChunkCount, getCountError := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight)
if getCountError != nil {
log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
yiweichi marked this conversation as resolved.
Show resolved Hide resolved
return nil, ErrCoordinatorInternalFailure
}
// Assign external prover if unassigned task number exceeds threshold
if unassignedChunkCount < cp.cfg.ProverManager.ExternalProverThreshold {
return nil, nil
}
}

var chunkTask *orm.Chunk
for i := 0; i < 5; i++ {
var getTaskError error
Expand All @@ -86,6 +99,16 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
return nil, nil
}

// Don't dispatch the same failing job to the same prover
proverTask, getTaskError := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
if getTaskError != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError)
yiweichi marked this conversation as resolved.
Show resolved Hide resolved
return nil, ErrCoordinatorInternalFailure
}
if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid {
log.Debug("get empty chunk, the prover already failed this task", "height", getTaskParameter.ProverHeight)
return nil, nil
}
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
Expand Down
5 changes: 5 additions & 0 deletions coordinator/internal/logic/provertask/prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ var (
getTaskCounterVec *prometheus.CounterVec = nil
)

var (
// ExternalProverNamePrefix prefix of prover name
ExternalProverNamePrefix = "external"
)

// ProverTask the interface of a collector who send data to prover
type ProverTask interface {
Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error)
Expand Down
16 changes: 16 additions & 0 deletions coordinator/internal/orm/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,22 @@ func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTo
return &batch, nil
}

// GetUnassignedBatchCount retrieves unassigned batch count based on the specified limit.
func (o *Batch) GetUnassignedBatchCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) {
var count int64
db := o.db.WithContext(ctx)
db = db.Model(&Batch{})
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
db = db.Where("batch.deleted_at IS NULL")
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("Batch.GetUnassignedBatchCount error: %w", err)
}
return count, nil
}

// GetAssignedBatch retrieves assigned batch based on the specified limit.
// The returned batches are sorted in ascending order by their index.
func (o *Batch) GetAssignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
Expand Down
15 changes: 15 additions & 0 deletions coordinator/internal/orm/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ func (o *Bundle) GetUnassignedBundle(ctx context.Context, maxActiveAttempts, max
return &bundle, nil
}

// GetUnassignedBundleCount retrieves unassigned bundle count based on the specified limit.
func (o *Bundle) GetUnassignedBundleCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) {
var count int64
db := o.db.WithContext(ctx)
db = db.Model(&Bundle{})
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("bundle.deleted_at IS NULL")
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("Bundle.GetUnassignedBundleCount error: %w", err)
}
return count, nil
}

// GetAssignedBundle retrieves assigned bundle based on the specified limit.
// The returned bundle sorts in ascending order by their index.
func (o *Bundle) GetAssignedBundle(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Bundle, error) {
Expand Down
16 changes: 16 additions & 0 deletions coordinator/internal/orm/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ func (o *Chunk) GetUnassignedChunk(ctx context.Context, maxActiveAttempts, maxTo
return &chunk, nil
}

// GetUnassignedChunkCount retrieves unassigned chunk count based on the specified limit.
func (o *Chunk) GetUnassignedChunkCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (int64, error) {
var count int64
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("end_block_number <= ?", height)
db = db.Where("chunk.deleted_at IS NULL")
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("Chunk.GetUnassignedChunkCount error: %w", err)
}
return count, nil
}

// GetAssignedChunk retrieves assigned chunk based on the specified limit.
// The returned chunks are sorted in ascending order by their index.
func (o *Chunk) GetAssignedChunk(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, height uint64) (*Chunk, error) {
Expand Down
18 changes: 18 additions & 0 deletions coordinator/internal/orm/prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,24 @@ func (o *ProverTask) GetAssignedTaskOfOtherProvers(ctx context.Context, taskType
return proverTasks, nil
}

// GetTaskOfOtherProvers get the chunk/batch task of prover
func (o *ProverTask) GetTaskOfProver(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey, proverVersion string) (*ProverTask, error) {
yiweichi marked this conversation as resolved.
Show resolved Hide resolved
db := o.db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_type", int(taskType))
db = db.Where("task_id", taskID)
db = db.Where("prover_public_key", proverPublicKey)
db = db.Where("prover_version", proverVersion)
db = db.Limit(1)

var proverTask ProverTask
err := db.Find(&proverTask).Error
if err != nil {
return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey:%s", err, taskID, proverPublicKey)
yiweichi marked this conversation as resolved.
Show resolved Hide resolved
}
return &proverTask, nil
}

// GetProvingStatusByTaskID retrieves the proving status of a prover task
func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskType message.ProofType, taskID string) (types.ProverProveStatus, error) {
db := o.db.WithContext(ctx)
Expand Down
Loading