Skip to content

Commit

Permalink
Yolgun/revert wait for first consumer (#42)
Browse files Browse the repository at this point in the history
* Revert "fix scheduler panic issue (#39)"

This reverts commit 176de8f.

* Revert "fix waitForFirstConsumer support (#35)"

This reverts commit ef7ea7a

Co-authored-by: Yunus Olgun <[email protected]>
  • Loading branch information
yolgun and yolgun authored Oct 24, 2022
1 parent e3b4238 commit 380f81e
Show file tree
Hide file tree
Showing 25 changed files with 106 additions and 1,305 deletions.
25 changes: 0 additions & 25 deletions LICENSES/vendor/github.com/agiledragon/gomonkey/v2/LICENSE

This file was deleted.

1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module volcano.sh/volcano
go 1.17

require (
github.com/agiledragon/gomonkey/v2 v2.9.0
github.com/fsnotify/fsnotify v1.4.9
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/hashicorp/go-multierror v1.0.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/agiledragon/gomonkey/v2 v2.9.0 h1:PDiKKybR596O6FHW+RVSG0Z7uGCBNbmbUXh3uCNQ7Hc=
github.com/agiledragon/gomonkey/v2 v2.9.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
251 changes: 68 additions & 183 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package cache
import (
"context"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -56,7 +53,6 @@ import (
vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
"volcano.sh/volcano/cmd/scheduler/app/options"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/metrics"
)

func init() {
Expand Down Expand Up @@ -115,44 +111,27 @@ type SchedulerCache struct {
deletedJobs workqueue.RateLimitingInterface

informerFactory informers.SharedInformerFactory

BindFlowChannel chan *schedulingapi.TaskInfo
bindCache []*schedulingapi.TaskInfo
batchNum int
}

type defaultBinder struct {
kubeclient *kubernetes.Clientset
}

//Bind will send bind request to api server
func (db *defaultBinder) Bind(kubeClient *kubernetes.Clientset, tasks []*schedulingapi.TaskInfo) (error, []*schedulingapi.TaskInfo) {
var errTasks []*schedulingapi.TaskInfo
for _, task := range tasks {
p := task.Pod
if err := kubeClient.CoreV1().Pods(p.Namespace).Bind(context.TODO(),
&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: p.Annotations},
Target: v1.ObjectReference{
Kind: "Node",
Name: task.NodeName,
},
func (db *defaultBinder) Bind(p *v1.Pod, hostname string) error {
if err := db.kubeclient.CoreV1().Pods(p.Namespace).Bind(context.TODO(),
&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: p.Annotations},
Target: v1.ObjectReference{
Kind: "Node",
Name: hostname,
},
metav1.CreateOptions{}); err != nil {
klog.Errorf("Failed to bind pod <%v/%v> to node %s : %#v", p.Namespace, p.Name, task.NodeName, err)
errTasks = append(errTasks, task)
}
}

if len(errTasks) > 0 {
return fmt.Errorf("failed to bind pods"), errTasks
},
metav1.CreateOptions{}); err != nil {
klog.Errorf("Failed to bind pod <%v/%v>: %#v", p.Namespace, p.Name, err)
return err
}

return nil, nil
}

func NewBinder() *defaultBinder {
return &defaultBinder{}
return nil
}

type defaultEvictor struct {
Expand Down Expand Up @@ -268,38 +247,15 @@ func (dvb *defaultVolumeBinder) AllocateVolumes(task *schedulingapi.TaskInfo, ho
return err
}

// RevertVolumes clean cache generated by AllocateVolumes
func (dvb *defaultVolumeBinder) RevertVolumes(task *schedulingapi.TaskInfo, podVolumes *volumescheduling.PodVolumes) {
if podVolumes != nil {
klog.Infof("Revert assumed volumes for task %v/%v on node %s", task.Namespace, task.Name, task.NodeName)
dvb.volumeBinder.RevertAssumedPodVolumes(podVolumes)
task.VolumeReady = false
task.PodVolumes = nil
}
}

// GetPodVolumes get pod volume on the host
func (dvb *defaultVolumeBinder) GetPodVolumes(task *schedulingapi.TaskInfo,
node *v1.Node) (podVolumes *volumescheduling.PodVolumes, err error) {
boundClaims, claimsToBind, unboundClaimsImmediate, err := dvb.volumeBinder.GetPodVolumes(task.Pod)
boundClaims, claimsToBind, _, err := dvb.volumeBinder.GetPodVolumes(task.Pod)
if err != nil {
return nil, err
}
if len(unboundClaimsImmediate) > 0 {
return nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
}

podVolumes, reasons, err := dvb.volumeBinder.FindPodVolumes(task.Pod, boundClaims, claimsToBind, node)
if err != nil {
return nil, err
} else if len(reasons) > 0 {
var errors []string
for _, reason := range reasons {
errors = append(errors, string(reason))
}
return nil, fmt.Errorf(strings.Join(errors, ","))
}

podVolumes, _, err = dvb.volumeBinder.FindPodVolumes(task.Pod, boundClaims, claimsToBind, node)
return podVolumes, err
}

Expand Down Expand Up @@ -362,15 +318,8 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: eventClient.CoreV1().Events("")})
sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName})

sc.BindFlowChannel = make(chan *schedulingapi.TaskInfo, 5000)
sc.Binder = GetBindMethod()

var batchNum int
batchNum, err = strconv.Atoi(os.Getenv("BATCH_BIND_NUM"))
if err == nil && batchNum > 0 {
sc.batchNum = batchNum
} else {
sc.batchNum = 1
sc.Binder = &defaultBinder{
kubeclient: sc.kubeClient,
}

sc.Evictor = &defaultEvictor{
Expand Down Expand Up @@ -499,8 +448,6 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {

// Cleanup jobs.
go wait.Until(sc.processCleanupJob, 0, stopCh)

go wait.Until(sc.processBindTask, time.Millisecond*20, stopCh)
}

// WaitForCacheSync sync the cache with the api server
Expand Down Expand Up @@ -598,24 +545,60 @@ func (sc *SchedulerCache) Evict(taskInfo *schedulingapi.TaskInfo, reason string)
}

// Bind binds task to the target host.
func (sc *SchedulerCache) Bind(tasks []*schedulingapi.TaskInfo) error {
go func(taskArray []*schedulingapi.TaskInfo) {
tmp := time.Now()
err, errTasks := sc.Binder.Bind(sc.kubeClient, taskArray)
if err == nil {
klog.V(3).Infof("bind ok, latency %v", time.Since(tmp))
for _, task := range tasks {
sc.Recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v",
task.Namespace, task.Name, task.NodeName)
}
func (sc *SchedulerCache) Bind(taskInfo *schedulingapi.TaskInfo, hostname string) error {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

job, task, err := sc.findJobAndTask(taskInfo)

if err != nil {
return err
}

node, found := sc.Nodes[hostname]
if !found {
return fmt.Errorf("failed to bind Task %v to host %v, host does not exist",
task.UID, hostname)
}

originalStatus := task.Status
if err := job.UpdateTaskStatus(task, schedulingapi.Binding); err != nil {
return err
}

// Add task to the node.
if err := node.AddTask(task); err != nil {
// After failing to update task to a node we need to revert task status from Releasing,
// otherwise task might be stuck in the Releasing state indefinitely.
if err := job.UpdateTaskStatus(task, originalStatus); err != nil {
klog.Errorf("Task <%s/%s> will be resynchronized after failing to revert status "+
"from %s to %s after failing to update Task on Node <%s>: %v",
task.Namespace, task.Name, task.Status, originalStatus, node.Name, err)
sc.resyncTask(task)
}
return err
}

p := task.Pod
go func() {
taskID := schedulingapi.PodKey(p)

sc.Lock()
node.AddBindingTask(taskID)
sc.Unlock()

defer func() {
sc.Lock()
node.RemoveBindingTask(taskID)
sc.Unlock()
}()

if err := sc.Binder.Bind(p, hostname); err != nil {
sc.resyncTask(task)
} else {
for _, task := range errTasks {
klog.V(2).Infof("resyncTask task %s", task.Name)
sc.VolumeBinder.RevertVolumes(task, task.PodVolumes)
sc.resyncTask(task)
}
sc.Recorder.Eventf(p, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", p.Namespace, p.Name, hostname)
}
}(tasks)
}()

return nil
}
Expand All @@ -635,11 +618,6 @@ func (sc *SchedulerCache) BindVolumes(task *schedulingapi.TaskInfo, podVolumes *
return sc.VolumeBinder.BindVolumes(task, podVolumes)
}

// RevertVolumes clean cache generated by AllocateVolumes
func (sc *SchedulerCache) RevertVolumes(task *schedulingapi.TaskInfo, podVolumes *volumescheduling.PodVolumes) {
sc.VolumeBinder.RevertVolumes(task, podVolumes)
}

// Client returns the kubernetes clientSet
func (sc *SchedulerCache) Client() kubernetes.Interface {
return sc.kubeClient
Expand Down Expand Up @@ -759,99 +737,6 @@ func (sc *SchedulerCache) processResyncTask() {
}
}

func (sc *SchedulerCache) AddBindTask(taskInfo *schedulingapi.TaskInfo) error {
klog.V(5).Infof("add bind task %v/%v", taskInfo.Namespace, taskInfo.Name)
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
job, task, err := sc.findJobAndTask(taskInfo)
if err != nil {
return err
}

node, found := sc.Nodes[taskInfo.NodeName]
if !found {
return fmt.Errorf("failed to bind Task %v to host %v, host does not exist",
task.UID, taskInfo.NodeName)
}

originalStatus := task.Status
if err := job.UpdateTaskStatus(task, schedulingapi.Binding); err != nil {
return err
}

// Add task to the node.
if err := node.AddTask(task); err != nil {
// After failing to update task to a node we need to revert task status from Releasing,
// otherwise task might be stuck in the Releasing state indefinitely.
if err := job.UpdateTaskStatus(task, originalStatus); err != nil {
klog.Errorf("Task <%s/%s> will be resynchronized after failing to revert status "+
"from %s to %s after failing to update Task on Node <%s>: %v",
task.Namespace, task.Name, task.Status, originalStatus, node.Name, err)
sc.resyncTask(task)
}
return err
}

sc.BindFlowChannel <- taskInfo

return nil
}

func (sc *SchedulerCache) processBindTask() {
for {
select {
case taskInfo, ok := <-sc.BindFlowChannel:
if !ok {
return
}

sc.bindCache = append(sc.bindCache, taskInfo)
if len(sc.bindCache) == sc.batchNum {
sc.BindTask()
}
}

if len(sc.BindFlowChannel) == 0 {
break
}
}

if len(sc.bindCache) == 0 {
return
}

sc.BindTask()
}

func (sc *SchedulerCache) BindTask() {
klog.V(5).Infof("batch bind task count %d", len(sc.bindCache))
successfulTasks := make([]*schedulingapi.TaskInfo, 0)
for _, task := range sc.bindCache {
if err := sc.VolumeBinder.BindVolumes(task, task.PodVolumes); err != nil {
klog.Errorf("task %s/%s bind Volumes failed: %#v", task.Namespace, task.Name, err)
sc.VolumeBinder.RevertVolumes(task, task.PodVolumes)
sc.resyncTask(task)
} else {
successfulTasks = append(successfulTasks, task)
klog.V(5).Infof("task %s/%s bind Volumes done", task.Namespace, task.Name)
}
}

bindTasks := make([]*schedulingapi.TaskInfo, len(successfulTasks))
copy(bindTasks, successfulTasks)
if err := sc.Bind(bindTasks); err != nil {
klog.Errorf("failed to bind task count %d: %#v", len(bindTasks), err)
return
}

for _, task := range successfulTasks {
metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time))
}

sc.bindCache = sc.bindCache[0:0]
return
}

// Snapshot returns the complete snapshot of the cluster from cache
func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {
sc.Mutex.Lock()
Expand Down
Loading

0 comments on commit 380f81e

Please sign in to comment.