Skip to content

Commit

Permalink
Merge pull request kubernetes#20148 from davidopp/flake3
Browse files Browse the repository at this point in the history
Manually revert kubernetes#19580 and kubernetes#17754.
  • Loading branch information
brendandburns committed Jan 26, 2016
2 parents 23b4bfb + 85f88b8 commit ca51446
Showing 1 changed file with 21 additions and 96 deletions.
117 changes: 21 additions & 96 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package node
import (
"errors"
"fmt"
"math"
"net"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -115,12 +113,6 @@ type NodeController struct {
nodeStore cache.StoreToNodeLister

forcefullyDeletePod func(*api.Pod)
availableCIDRs sets.Int
// Calculate the maximum num of CIDRs we could give out based on nc.clusterCIDR
// The flag denoting if the node controller is newly started or restarted(after crash)
needSync bool
maxCIDRs int
generatedCIDR bool
}

// NewNodeController returns a new node controller to sync instances from cloudprovider.
Expand Down Expand Up @@ -168,10 +160,6 @@ func NewNodeController(
clusterCIDR: clusterCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) { forcefullyDeletePod(kubeClient, p) },
availableCIDRs: make(sets.Int),
needSync: true,
maxCIDRs: 0,
generatedCIDR: false,
}

nc.podStore.Store, nc.podController = framework.NewInformer(
Expand Down Expand Up @@ -206,27 +194,10 @@ func NewNodeController(
return nc
}

// generateAvailabeCIDRs does generating new Available CIDRs index and insert
// them into availableCIDRs set. Everytime it will generate 256 new CIDRs,
// once there are no more CIDRs in the network, return false
func (nc *NodeController) generateAvailableCIDRs() {
nc.generatedCIDR = true
// Generate all available CIDRs here, since there will not be manay
// available CIDRs. Set will be small, it will use less than 1MB memory

cidrSize, _ := nc.clusterCIDR.Mask.Size()
nc.maxCIDRs = int(math.Pow(2, (float64)(24-cidrSize)))

for i := 0; i <= nc.maxCIDRs; i++ {
nc.availableCIDRs.Insert(i)
}
}

// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(period time.Duration) {
go nc.nodeController.Run(util.NeverStop)
go nc.podController.Run(util.NeverStop)

// Incorporate the results of node status pushed from kubelet to master.
go util.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
Expand Down Expand Up @@ -291,34 +262,19 @@ func (nc *NodeController) Run(period time.Duration) {
}, nodeEvictionPeriod, util.NeverStop)
}

// translateCIDRs translates pod CIDR index to the CIDR that could be
// assigned to node. It will also check for overflow which make sure CIDR is valid
func translateCIDRs(clusterCIDR *net.IPNet, num int) string {
// Generates num pod CIDRs that could be assigned to nodes.
func generateCIDRs(clusterCIDR *net.IPNet, num int) sets.String {
res := sets.NewString()
cidrIP := clusterCIDR.IP.To4()
// TODO: Make the CIDRs configurable.
b1 := (num / 256) + int(cidrIP[1])
b2 := (num % 256) + int(cidrIP[2])
if b2 > 255 {
b2 = b2 % 256
b1 = b1 + 1
}
res := fmt.Sprintf("%d.%d.%d.0/24", cidrIP[0], b1, b2)
for i := 0; i < num; i++ {
// TODO: Make the CIDRs configurable.
b1 := byte(i >> 8)
b2 := byte(i % 256)
res.Insert(fmt.Sprintf("%d.%d.%d.0/24", cidrIP[0], cidrIP[1]+b1, cidrIP[2]+b2))
}
return res
}

// translateCIDRtoIndex does translating CIDR to index of CIDR
func (nc *NodeController) translateCIDRtoIndex(freeCIDR string) int {
CIDRsArray := strings.Split(freeCIDR, ".")
if len(CIDRsArray) < 3 {
return -1
}
cidrIP := nc.clusterCIDR.IP.To4()
CIDRsIndexOne, _ := strconv.Atoi(CIDRsArray[1])
CIDRsIndexTwo, _ := strconv.Atoi(CIDRsArray[2])
release := (CIDRsIndexOne-int(cidrIP[1]))*256 + CIDRsIndexTwo - int(cidrIP[2])
return release
}

// getCondition returns a condition object for the specific condition
// type, nil if the condition is not set.
func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition {
Expand Down Expand Up @@ -396,17 +352,6 @@ func forcefullyDeletePod(c client.Interface, pod *api.Pod) {
}
}

// releaseCIDR does translating CIDR back to CIDR index and insert this index
// back to availableCIDRs set
func (nc *NodeController) releaseCIDR(freeCIDR string) {
release := nc.translateCIDRtoIndex(freeCIDR)
if release >= 0 && release <= nc.maxCIDRs {
nc.availableCIDRs.Insert(release)
} else {
glog.V(4).Info("CIDR %s is invalid", freeCIDR)
}
}

// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.
Expand Down Expand Up @@ -520,14 +465,11 @@ func (nc *NodeController) monitorNodeStatus() error {
nc.evictPods(node.Name)
continue
}
assignedCIDR := node.Spec.PodCIDR
if err = nc.kubeClient.Nodes().Delete(node.Name); err != nil {

if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil {
glog.Errorf("Unable to delete node %s: %v", node.Name, err)
continue
}
if assignedCIDR != "" {
nc.releaseCIDR(assignedCIDR)
}
}
}
}
Expand All @@ -537,48 +479,31 @@ func (nc *NodeController) monitorNodeStatus() error {

// reconcileNodeCIDRs looks at each node and assigns it a valid CIDR
// if it doesn't currently have one.
// Examines the availableCIDRs set first, if no more CIDR in it, generate more.
func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) {
glog.V(4).Infof("Reconciling cidrs for %d nodes", len(nodes.Items))
// check if the this node controller is restarted because of crash
// this will only be ran once when the controller being restarted(because of crashed) or newly start
if nc.needSync {
// if it is crashed, restore the availableCIDRs by generating CIDRs, insert them into availableCIDRs set
// and delete assigned CIDRs from the the availableCIDRs set
for _, node := range nodes.Items {
if node.Spec.PodCIDR != "" {
if nc.availableCIDRs.Has(nc.translateCIDRtoIndex(node.Spec.PodCIDR)) {
nc.availableCIDRs.Delete(nc.translateCIDRtoIndex(node.Spec.PodCIDR))
} else {
glog.V(4).Info("Node %s CIDR error, its CIDR is invalid, will reassign CIDR", node.Name)
node.Spec.PodCIDR = ""
if _, err := nc.kubeClient.Nodes().Update(&node); err != nil {
nc.recordNodeStatusChange(&node, "CIDRAssignmentFailed")
}
break
}
}
// TODO(roberthbailey): This seems inefficient. Why re-calculate CIDRs
// on each sync period?
availableCIDRs := generateCIDRs(nc.clusterCIDR, len(nodes.Items))
for _, node := range nodes.Items {
if node.Spec.PodCIDR != "" {
glog.V(4).Infof("CIDR %s is already being used by node %s", node.Spec.PodCIDR, node.Name)
availableCIDRs.Delete(node.Spec.PodCIDR)
}
nc.needSync = false
}
for _, node := range nodes.Items {
if node.Spec.PodCIDR == "" {
CIDRsNum, found := nc.availableCIDRs.PopAny()
if !found && !nc.generatedCIDR {
nc.generateAvailableCIDRs()
CIDRsNum, found = nc.availableCIDRs.PopAny()
}
podCIDR, found := availableCIDRs.PopAny()
if !found {
nc.recordNodeStatusChange(&node, "CIDRNotAvailable")
continue
}
podCIDR := translateCIDRs(nc.clusterCIDR, CIDRsNum)
glog.V(4).Info("Assigning node %s CIDR %s", node.Name, podCIDR)
glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR)
node.Spec.PodCIDR = podCIDR
if _, err := nc.kubeClient.Nodes().Update(&node); err != nil {
nc.recordNodeStatusChange(&node, "CIDRAssignmentFailed")
}
}

}
}

Expand Down

0 comments on commit ca51446

Please sign in to comment.