Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Commit

Permalink
Update different nodes share a thread
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Jan 11, 2022
1 parent 4ae7a77 commit cf20d83
Showing 1 changed file with 49 additions and 42 deletions.
91 changes: 49 additions & 42 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,6 @@ func (c *Controller) lockPod(ctx context.Context, pod *corev1.Pod) error {
}

func (c *Controller) LockPodStatus(ctx context.Context) error {
for _, node := range c.nodes {
err := c.lockPodStatus(ctx, node)
if err != nil {
return err
}
}
return nil
}

func (c *Controller) lockPodStatus(ctx context.Context, nodeName string) error {
lockCh := make(chan *corev1.Pod)
go func() {
for {
Expand All @@ -166,10 +156,20 @@ func (c *Controller) lockPodStatus(ctx context.Context, nodeName string) error {
}
}()

for _, node := range c.nodes {
err := c.lockPodStatus(ctx, lockCh, node)
if err != nil {
return err
}
}
return nil
}

func (c *Controller) lockPodStatus(ctx context.Context, ch chan<- *corev1.Pod, nodeName string) error {
lockPendingOpt := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
}
err := c.lockPodStatusWithNode(ctx, lockCh, lockPendingOpt)
err := c.lockPodStatusWithNode(ctx, ch, lockPendingOpt)
if err != nil {
return err
}
Expand Down Expand Up @@ -245,20 +245,50 @@ func (c *Controller) heartbeatNode(ctx context.Context, node *corev1.Node) error
}

func (c *Controller) LockNodeStatus(ctx context.Context) error {
nodes := make([]*corev1.Node, 0, len(c.nodes))
for _, node := range c.nodes {
err := c.lockNodeStatus(ctx, node)
n, err := c.lockNodeStatus(ctx, node)
if err != nil {
return err
}
nodes = append(nodes, n)
}

for _, node := range nodes {
err := c.heartbeatNode(ctx, node)
if err != nil {
return err
}
}

heartbeatInterval := 30 * time.Second
th := time.NewTimer(heartbeatInterval)
go func() {
for {
select {
case <-th.C:
th.Reset(heartbeatInterval)
for _, node := range nodes {
err := c.heartbeatNode(ctx, node)
if err != nil {
log.Printf("Error update heartbeat %s", err)
}
}

case <-ctx.Done():
log.Printf("Stop locking nodes %s status", c.nodes)
return
}
}
}()
return nil
}

func (c *Controller) lockNodeStatus(ctx context.Context, nodeName string) error {
func (c *Controller) lockNodeStatus(ctx context.Context, nodeName string) (*corev1.Node, error) {
node, err := c.clientSet.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return err
return nil, err
}
node = &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -267,46 +297,23 @@ func (c *Controller) lockNodeStatus(ctx context.Context, nodeName string) error
}
sum, err := toTemplateJson(c.nodeTemplate, node, c.funcMap)
if err != nil {
return err
return nil, err
}
err = json.Unmarshal(sum, &node)
if err != nil {
return err
return nil, err
}
node, err = c.clientSet.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
if err != nil {
return err
return nil, err
}
}

err = c.lockNode(ctx, node)
if err != nil {
return err
}

err = c.heartbeatNode(ctx, node)
if err != nil {
return err
return nil, err
}

heartbeatInterval := 30 * time.Second
th := time.NewTimer(heartbeatInterval)
go func() {
for {
select {
case <-th.C:
th.Reset(heartbeatInterval)
err = c.heartbeatNode(ctx, node)
if err != nil {
log.Printf("Error update heartbeat %s", err)
}
case <-ctx.Done():
log.Printf("Stop locking nodes %s status", c.nodes)
return
}
}
}()
return nil
return node, nil
}

func (c *Controller) configurePod(pod *corev1.Pod) (bool, error) {
Expand Down

0 comments on commit cf20d83

Please sign in to comment.