diff --git a/lib/backend/k8s/syncer.go b/lib/backend/k8s/syncer.go index 59f99f661..68dfe5e00 100644 --- a/lib/backend/k8s/syncer.go +++ b/lib/backend/k8s/syncer.go @@ -171,10 +171,31 @@ func (k *realKubeAPI) getReadyStatus(key model.ReadyFlagKey) (*model.KVPair, err func newSyncer(kubeAPI kubeAPI, converter converter, callbacks api.SyncerCallbacks, disableNodePoll bool) *kubeSyncer { syn := &kubeSyncer{ - kubeAPI: kubeAPI, - converter: converter, - callbacks: callbacks, - tracker: map[string]model.Key{}, + kubeAPI: kubeAPI, + converter: converter, + callbacks: callbacks, + trackers: map[string]map[string]model.Key{ + KEY_NS: map[string]model.Key{}, + KEY_PO: map[string]model.Key{}, + KEY_NP: map[string]model.Key{}, + KEY_SNP: map[string]model.Key{}, + KEY_GC: map[string]model.Key{}, + KEY_HC: map[string]model.Key{}, + KEY_IP: map[string]model.Key{}, + KEY_NO: map[string]model.Key{}, + KEY_RS: map[string]model.Key{}, + }, + needsResync: map[string]bool{ + KEY_NS: true, + KEY_PO: true, + KEY_NP: true, + KEY_SNP: true, + KEY_GC: true, + KEY_HC: true, + KEY_IP: true, + KEY_NO: true, + KEY_RS: true, + }, disableNodePoll: disableNodePoll, stopChan: make(chan int), openWatchers: map[string]watch.Interface{}, @@ -187,10 +208,16 @@ type kubeSyncer struct { converter converter callbacks api.SyncerCallbacks OneShot bool - tracker map[string]model.Key disableNodePoll bool stopChan chan int openWatchers map[string]watch.Interface + + // Map of resource key to whether or not that resource needs a resync. + needsResync map[string]bool + + // Trackers is a map from individual resource keys (KEY_*) to the tracking map + // used to track sync state for that resource. + trackers map[string]map[string]model.Key } // Holds resource version information. @@ -215,53 +242,54 @@ func (syn *kubeSyncer) Stop() { } // sendUpdates sends updates to the callback and updates the resource -// tracker. -func (syn *kubeSyncer) sendUpdates(kvps []model.KVPair) { - updates := syn.convertKVPairsToUpdates(kvps) +// tracker for the given resourceType (e.g. KEY_NS). +func (syn *kubeSyncer) sendUpdates(kvps []model.KVPair, resourceType string) { + updates := syn.convertKVPairsToUpdates(kvps, resourceType) // Send to the callback and update the tracker. syn.callbacks.OnUpdates(updates) - syn.updateTracker(updates) + syn.updateTracker(updates, resourceType) } // convertKVPairsToUpdates converts a list of KVPairs to the list // of api.Update objects which should be sent to OnUpdates. It filters out // deletes for any KVPairs which we don't know about. -func (syn *kubeSyncer) convertKVPairsToUpdates(kvps []model.KVPair) []api.Update { +func (syn *kubeSyncer) convertKVPairsToUpdates(kvps []model.KVPair, resourceType string) []api.Update { updates := []api.Update{} for _, kvp := range kvps { - if _, ok := syn.tracker[kvp.Key.String()]; !ok && kvp.Value == nil { + if _, ok := syn.trackers[resourceType][kvp.Key.String()]; !ok && kvp.Value == nil { // The given KVPair is not in the tracker, and is a delete, so no need to // send a delete update. continue } - updates = append(updates, api.Update{KVPair: kvp, UpdateType: syn.getUpdateType(kvp)}) + updates = append(updates, api.Update{KVPair: kvp, UpdateType: syn.getUpdateType(kvp, resourceType)}) } return updates } -// updateTracker updates the global object tracker with the given update. +// updateTracker updates the per-resource object tracker with the given update. // updateTracker should be called after sending a update to the OnUpdates callback. -func (syn *kubeSyncer) updateTracker(updates []api.Update) { +func (syn *kubeSyncer) updateTracker(updates []api.Update, resourceType string) { for _, upd := range updates { + // Update that particular resource type's tracker. if upd.UpdateType == api.UpdateTypeKVDeleted { log.Debugf("Delete from tracker: %+v", upd.KVPair.Key) - delete(syn.tracker, upd.KVPair.Key.String()) + delete(syn.trackers[resourceType], upd.KVPair.Key.String()) } else { log.Debugf("Update tracker: %+v: %+v", upd.KVPair.Key, upd.KVPair.Revision) - syn.tracker[upd.KVPair.Key.String()] = upd.KVPair.Key + syn.trackers[resourceType][upd.KVPair.Key.String()] = upd.KVPair.Key } } } -func (syn *kubeSyncer) getUpdateType(kvp model.KVPair) api.UpdateType { +func (syn *kubeSyncer) getUpdateType(kvp model.KVPair, resourceType string) api.UpdateType { if kvp.Value == nil { // If the value is nil, then this is a delete. return api.UpdateTypeKVDeleted } // Not a delete. - if _, ok := syn.tracker[kvp.Key.String()]; !ok { + if _, ok := syn.trackers[resourceType][kvp.Key.String()]; !ok { // If not a delete and it does not exist in the tracker, this is an add. return api.UpdateTypeKVNew } else { @@ -270,15 +298,17 @@ func (syn *kubeSyncer) getUpdateType(kvp model.KVPair) api.UpdateType { } } -// Watcher names. +// Keys used to identify various bits of state stored on a per-resource basis. const ( KEY_NS = "Namespace" KEY_PO = "Pod" KEY_NP = "NetworkPolicy" KEY_SNP = "SystemNetworkPolicy" KEY_GC = "GlobalConfig" + KEY_HC = "HostConfig" KEY_IP = "IPPool" KEY_NO = "Node" + KEY_RS = "CalicoReadyState" ) func (syn *kubeSyncer) readFromKubernetesAPI() { @@ -293,13 +323,10 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { var kvp *model.KVPair var opts metav1.ListOptions - // Always perform an initial snapshot. - needsResync := true - log.Info("Starting Kubernetes API read loop") for { // If we need to resync, do so. - if needsResync { + if len(syn.needsResync) != 0 { // Set status to ResyncInProgress. log.Debugf("Resync required - latest versions: %+v", latestVersions) syn.callbacks.OnStatusUpdated(api.ResyncInProgress) @@ -311,8 +338,11 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { // Go through and delete anything that existed before, but doesn't anymore. syn.performSnapshotDeletes(existingKeys) - // Send the snapshot through. - syn.sendUpdates(snap) + // Send the snapshot through for each resource type that went through + // a resync. + for resourceType, s := range snap { + syn.sendUpdates(s, resourceType) + } log.Debugf("Snapshot complete - start watch from %+v", latestVersions) syn.callbacks.OnStatusUpdated(api.InSync) @@ -427,7 +457,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { // We resynced if we needed to, and have a complete set of watchers, so reset the // needsResync flag. - needsResync = false + syn.needsResync = map[string]bool{} // Select on the various watch channels. select { @@ -438,7 +468,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { case event = <-nsChan: log.Debugf("Incoming Namespace watch event. Type=%s", event.Type) if syn.eventNeedsResync(event) { - needsResync = true + syn.needsResync[KEY_NS] = true continue } else if syn.eventRestartsWatch(event, KEY_NS) { syn.closeWatcher(KEY_NS) @@ -447,12 +477,12 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { // Event is OK - parse it. kvps := syn.parseNamespaceEvent(event) latestVersions.namespaceVersion = kvps[0].Revision.(string) - syn.sendUpdates(kvps) + syn.sendUpdates(kvps, KEY_NS) continue case event = <-poChan: log.Debugf("Incoming Pod watch event. Type=%s", event.Type) if syn.eventNeedsResync(event) { - needsResync = true + syn.needsResync[KEY_PO] = true continue } else if syn.eventRestartsWatch(event, KEY_PO) { syn.closeWatcher(KEY_PO) @@ -463,12 +493,12 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { // Only send the update if we care about it. We filter // out a number of events that aren't useful for us. latestVersions.podVersion = kvp.Revision.(string) - syn.sendUpdates([]model.KVPair{*kvp}) + syn.sendUpdates([]model.KVPair{*kvp}, KEY_PO) } case event = <-npChan: log.Debugf("Incoming NetworkPolicy watch event. Type=%s", event.Type) if syn.eventNeedsResync(event) { - needsResync = true + syn.needsResync[KEY_NP] = true continue } else if syn.eventRestartsWatch(event, KEY_NP) { syn.closeWatcher(KEY_NP) @@ -477,233 +507,287 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { // Event is OK - parse it and send it over the channel. kvp = syn.parseNetworkPolicyEvent(event) latestVersions.networkPolicyVersion = kvp.Revision.(string) - syn.sendUpdates([]model.KVPair{*kvp}) + syn.sendUpdates([]model.KVPair{*kvp}, KEY_NP) case event = <-snpChan: log.Debugf("Incoming SystemNetworkPolicy watch event. Type=%s", event.Type) if syn.eventNeedsResync(event) { - needsResync = true + syn.needsResync[KEY_SNP] = true continue } else if syn.eventRestartsWatch(event, KEY_SNP) { + // Resources backed by TPRs need to be resynced on empty events. + syn.needsResync[KEY_SNP] = true syn.closeWatcher(KEY_SNP) continue } // Event is OK - parse it and send it over the channel. kvp = syn.parseSystemNetworkPolicyEvent(event) latestVersions.systemNetworkPolicyVersion = kvp.Revision.(string) - syn.sendUpdates([]model.KVPair{*kvp}) + syn.sendUpdates([]model.KVPair{*kvp}, KEY_SNP) case event = <-gcChan: log.Debugf("Incoming GlobalConfig watch event. Type=%s", event.Type) if syn.eventNeedsResync(event) { - needsResync = true + syn.needsResync[KEY_GC] = true continue } else if syn.eventRestartsWatch(event, KEY_GC) { + // Resources backed by TPRs need to be resynced on empty events. + syn.needsResync[KEY_GC] = true syn.closeWatcher(KEY_GC) continue } // Event is OK - parse it and send it over the channel. kvp = syn.parseGlobalConfigEvent(event) latestVersions.globalConfigVersion = kvp.Revision.(string) - syn.sendUpdates([]model.KVPair{*kvp}) + syn.sendUpdates([]model.KVPair{*kvp}, KEY_GC) case event = <-poolChan: log.Debugf("Incoming IPPool watch event. Type=%s", event.Type) if syn.eventNeedsResync(event) { - needsResync = true + syn.needsResync[KEY_IP] = true continue } else if syn.eventRestartsWatch(event, KEY_IP) { + // Resources backed by TPRs need to be resynced on empty events. + syn.needsResync[KEY_IP] = true syn.closeWatcher(KEY_IP) continue } // Event is OK - parse it and send it over the channel. kvp = syn.parseIPPoolEvent(event) latestVersions.poolVersion = kvp.Revision.(string) - syn.sendUpdates([]model.KVPair{*kvp}) + syn.sendUpdates([]model.KVPair{*kvp}, KEY_IP) case event = <-noChan: log.Debugf("Incoming Node watch event. Type=%s", event.Type) if syn.eventNeedsResync(event) { - needsResync = true + syn.needsResync[KEY_NO] = true continue } else if syn.eventRestartsWatch(event, KEY_NO) { + syn.needsResync[KEY_NO] = true syn.closeWatcher(KEY_NO) continue } // Event is OK - parse it and send it over the channel. kvp = syn.parseNodeEvent(event) latestVersions.nodeVersion = kvp.Revision.(string) - syn.sendUpdates([]model.KVPair{*kvp}) + syn.sendUpdates([]model.KVPair{*kvp}, KEY_NO) } } } -func (syn *kubeSyncer) performSnapshotDeletes(exists map[string]bool) { +func (syn *kubeSyncer) performSnapshotDeletes(existsMap map[string]map[string]bool) { log.Info("Checking for any deletes for snapshot") - deletes := []model.KVPair{} - log.Debugf("Keys in snapshot: %+v", exists) - for cachedKey, k := range syn.tracker { - // Check each cached key to see if it exists in the snapshot. If it doesn't, - // we need to send a delete for it. - if _, stillExists := exists[cachedKey]; !stillExists { - log.Debugf("Cached key not in snapshot: %+v", cachedKey) - deletes = append(deletes, model.KVPair{Key: k, Value: nil}) + for resourceType, exists := range existsMap { + log.Debugf("%s keys in snapshot: %+v", resourceType, exists) + deletes := []model.KVPair{} + for cachedKey, k := range syn.trackers[resourceType] { + // Check each cached key to see if it exists in the snapshot. If it doesn't, + // we need to send a delete for it. + if _, stillExists := exists[cachedKey]; !stillExists { + log.Debugf("Cached %s key not in snapshot: %+v", resourceType, cachedKey) + deletes = append(deletes, model.KVPair{Key: k, Value: nil}) + } } + log.Infof("Sending %s snapshot deletes: %+v", resourceType, deletes) + syn.sendUpdates(deletes, resourceType) } - log.Infof("Sending snapshot deletes: %+v", deletes) - syn.sendUpdates(deletes) } // performSnapshot returns a list of existing objects in the datastore, // a mapping of model.Key objects representing the objects which exist in the datastore, and // populates the provided resourceVersions with the latest k8s resource version // for each. -func (syn *kubeSyncer) performSnapshot(versions *resourceVersions) ([]model.KVPair, map[string]bool) { +func (syn *kubeSyncer) performSnapshot(versions *resourceVersions) (map[string][]model.KVPair, map[string]map[string]bool) { opts := metav1.ListOptions{} - var snap []model.KVPair - var keys map[string]bool + var snap map[string][]model.KVPair + var keys map[string]map[string]bool // Loop until we successfully are able to accesss the API. for { // Initialize the values to return. - snap = []model.KVPair{} - keys = map[string]bool{} + snap = map[string][]model.KVPair{} + keys = map[string]map[string]bool{} - // Get Namespaces (Profiles) - log.Info("Syncing Namespaces") - nsList, err := syn.kubeAPI.NamespaceList(opts) - if err != nil { - log.Warnf("Error syncing Namespaces, retrying: %s", err) - time.Sleep(1 * time.Second) - continue - } - log.Info("Received Namespace List() response") + log.Infof("Needs resync: %+v", syn.needsResync) - versions.namespaceVersion = nsList.ListMeta.ResourceVersion - for _, ns := range nsList.Items { - // The Syncer API expects a profile to be broken into its underlying - // components - rules, tags, labels. - profile, err := syn.converter.namespaceToProfile(&ns) + // Resync Namespaces only if needed. + if syn.needsResync[KEY_NS] { + log.Info("Syncing Namespaces") + nsList, err := syn.kubeAPI.NamespaceList(opts) if err != nil { - log.Panicf("%s", err) + log.Warnf("Error syncing Namespaces, retrying: %s", err) + time.Sleep(1 * time.Second) + continue } - rules, tags, labels := compat.ToTagsLabelsRules(profile) - rules.Revision = profile.Revision - tags.Revision = profile.Revision - labels.Revision = profile.Revision + log.Debug("Received Namespace List() response") - snap = append(snap, *rules, *tags, *labels) - keys[rules.Key.String()] = true - keys[tags.Key.String()] = true - keys[labels.Key.String()] = true - } + // Ensure maps are initialized. + snap[KEY_NS] = []model.KVPair{} + keys[KEY_NS] = map[string]bool{} - // Get NetworkPolicies (Policies) - log.Info("Syncing NetworkPolicy") - npList, err := syn.kubeAPI.NetworkPolicyList() - if err != nil { - log.Warnf("Error querying NetworkPolicies during snapshot, retrying: %s", err) - time.Sleep(1 * time.Second) - continue + versions.namespaceVersion = nsList.ListMeta.ResourceVersion + for _, ns := range nsList.Items { + // The Syncer API expects a profile to be broken into its underlying + // components - rules, tags, labels. + profile, err := syn.converter.namespaceToProfile(&ns) + if err != nil { + log.Panicf("%s", err) + } + rules, tags, labels := compat.ToTagsLabelsRules(profile) + rules.Revision = profile.Revision + tags.Revision = profile.Revision + labels.Revision = profile.Revision + + snap[KEY_NS] = append(snap[KEY_NS], *rules, *tags, *labels) + keys[KEY_NS][rules.Key.String()] = true + keys[KEY_NS][tags.Key.String()] = true + keys[KEY_NS][labels.Key.String()] = true + } } - log.Info("Received NetworkPolicy List() response") - versions.networkPolicyVersion = npList.ListMeta.ResourceVersion - for _, np := range npList.Items { - pol, _ := syn.converter.networkPolicyToPolicy(&np) - snap = append(snap, *pol) - keys[pol.Key.String()] = true - } + // Resync NetworkPolicy only if needed. + if syn.needsResync[KEY_NP] { + log.Info("Syncing NetworkPolicy") + npList, err := syn.kubeAPI.NetworkPolicyList() + if err != nil { + log.Warnf("Error querying NetworkPolicies during snapshot, retrying: %s", err) + time.Sleep(1 * time.Second) + continue + } + log.Debug("Received NetworkPolicy List() response") - // Get SystemNetworkPolicies (Policies) - log.Info("Syncing SystemNetworkPolicy") - snpList, err := syn.kubeAPI.SystemNetworkPolicyList() - if err != nil { - log.Warnf("Error querying SystemNetworkPolicies during snapshot, retrying: %s", err) - time.Sleep(1 * time.Second) - continue - } - log.Info("Received NetworkPolicy List() response") + // Ensure maps are initialized. + snap[KEY_NP] = []model.KVPair{} + keys[KEY_NP] = map[string]bool{} - versions.systemNetworkPolicyVersion = snpList.Metadata.ResourceVersion - for _, np := range snpList.Items { - pol := resources.ThirdPartyToSystemNetworkPolicy(&np) - snap = append(snap, *pol) - keys[pol.Key.String()] = true + versions.networkPolicyVersion = npList.ListMeta.ResourceVersion + for _, np := range npList.Items { + pol, _ := syn.converter.networkPolicyToPolicy(&np) + snap[KEY_NP] = append(snap[KEY_NP], *pol) + keys[KEY_NP][pol.Key.String()] = true + } } - // Get Pods (WorkloadEndpoints) - log.Info("Syncing Pods") - poList, err := syn.kubeAPI.PodList("", opts) - if err != nil { - log.Warnf("Error querying Pods during snapshot, retrying: %s", err) - time.Sleep(1 * time.Second) - continue + // Resync SystemNetworkPolicy only if needed. + if syn.needsResync[KEY_SNP] { + log.Info("Syncing SystemNetworkPolicy") + snpList, err := syn.kubeAPI.SystemNetworkPolicyList() + if err != nil { + log.Warnf("Error querying SystemNetworkPolicies during snapshot, retrying: %s", err) + time.Sleep(1 * time.Second) + continue + } + log.Debug("Received NetworkPolicy List() response") + + // Ensure maps are initialized. + snap[KEY_SNP] = []model.KVPair{} + keys[KEY_SNP] = map[string]bool{} + + versions.systemNetworkPolicyVersion = snpList.Metadata.ResourceVersion + for _, np := range snpList.Items { + pol := resources.ThirdPartyToSystemNetworkPolicy(&np) + snap[KEY_SNP] = append(snap[KEY_SNP], *pol) + keys[KEY_SNP][pol.Key.String()] = true + } } - log.Info("Received Pod List() response") - versions.podVersion = poList.ListMeta.ResourceVersion - for _, po := range poList.Items { - // Ignore any updates for pods which are not ready / valid. - if !syn.converter.isReadyCalicoPod(&po) { - log.Debugf("Skipping pod %s/%s", po.ObjectMeta.Namespace, po.ObjectMeta.Name) + // Resync Pods only if needed. + if syn.needsResync[KEY_PO] { + log.Info("Syncing Pods") + poList, err := syn.kubeAPI.PodList("", opts) + if err != nil { + log.Warnf("Error querying Pods during snapshot, retrying: %s", err) + time.Sleep(1 * time.Second) continue } + log.Debug("Received Pod List() response") + + // Ensure maps are initialized. + snap[KEY_PO] = []model.KVPair{} + keys[KEY_PO] = map[string]bool{} + + versions.podVersion = poList.ListMeta.ResourceVersion + for _, po := range poList.Items { + // Ignore any updates for pods which are not ready / valid. + if !syn.converter.isReadyCalicoPod(&po) { + log.Debugf("Skipping pod %s/%s", po.ObjectMeta.Namespace, po.ObjectMeta.Name) + continue + } + + // Convert to a workload endpoint. + wep, err := syn.converter.podToWorkloadEndpoint(&po) + if err != nil { + log.WithError(err).Error("Failed to convert pod to workload endpoint") + continue + } + snap[KEY_PO] = append(snap[KEY_PO], *wep) + keys[KEY_PO][wep.Key.String()] = true + } + } - // Convert to a workload endpoint. - wep, err := syn.converter.podToWorkloadEndpoint(&po) + // Resync GlobalConfig only if needed. + if syn.needsResync[KEY_GC] { + log.Info("Syncing GlobalConfig") + confList, resourceVersion, err := syn.kubeAPI.GlobalConfigList(model.GlobalConfigListOptions{}) if err != nil { - log.WithError(err).Error("Failed to convert pod to workload endpoint") + log.Warnf("Error querying GlobalConfig during snapshot, retrying: %s", err) + time.Sleep(1 * time.Second) continue } - snap = append(snap, *wep) - keys[wep.Key.String()] = true - } + log.Debug("Received GlobalConfig List() response") - // Sync GlobalConfig. - log.Info("Syncing GlobalConfig") - confList, resourceVersion, err := syn.kubeAPI.GlobalConfigList(model.GlobalConfigListOptions{}) - if err != nil { - log.Warnf("Error querying GlobalConfig during snapshot, retrying: %s", err) - time.Sleep(1 * time.Second) - continue - } - log.Info("Received GlobalConfig List() response") + // Ensure maps are initialized. + snap[KEY_GC] = []model.KVPair{} + keys[KEY_GC] = map[string]bool{} - versions.globalConfigVersion = resourceVersion - for _, c := range confList { - snap = append(snap, *c) - keys[c.Key.String()] = true + versions.globalConfigVersion = resourceVersion + for _, c := range confList { + snap[KEY_GC] = append(snap[KEY_GC], *c) + keys[KEY_GC][c.Key.String()] = true + } } - // Sync Hostconfig. - log.Info("Syncing HostConfig") - hostConfList, err := syn.kubeAPI.HostConfigList(model.HostConfigListOptions{}) - if err != nil { - log.Warnf("Error querying HostConfig during snapshot, retrying: %s", err) - time.Sleep(1 * time.Second) - continue - } - log.Info("Received HostConfig List() response") + // Resync HostConfig only if needed. + if syn.needsResync[KEY_HC] { + log.Info("Syncing HostConfig") + hostConfList, err := syn.kubeAPI.HostConfigList(model.HostConfigListOptions{}) + if err != nil { + log.Warnf("Error querying HostConfig during snapshot, retrying: %s", err) + time.Sleep(1 * time.Second) + continue + } + log.Debug("Received HostConfig List() response") - for _, h := range hostConfList { - snap = append(snap, *h) - keys[h.Key.String()] = true - } + // Ensure maps are initialized. + snap[KEY_HC] = []model.KVPair{} + keys[KEY_HC] = map[string]bool{} - // Sync IP Pools. - log.Info("Syncing IP Pools") - poolList, resourceVersion, err := syn.kubeAPI.IPPoolList(model.IPPoolListOptions{}) - if err != nil { - log.Warnf("Error querying IP Pools during snapshot, retrying: %s", err) - time.Sleep(1 * time.Second) - continue + for _, h := range hostConfList { + snap[KEY_HC] = append(snap[KEY_HC], *h) + keys[KEY_HC][h.Key.String()] = true + } } - log.Info("Received IP Pools List() response") - versions.poolVersion = resourceVersion - for _, p := range poolList { - snap = append(snap, *p) - keys[p.Key.String()] = true + // Resync IP Pools only if needed. + if syn.needsResync[KEY_IP] { + log.Info("Syncing IP Pools") + poolList, resourceVersion, err := syn.kubeAPI.IPPoolList(model.IPPoolListOptions{}) + if err != nil { + log.Warnf("Error querying IP Pools during snapshot, retrying: %s", err) + time.Sleep(1 * time.Second) + continue + } + log.Debug("Received IP Pools List() response") + + // Ensure maps are initialized. + snap[KEY_IP] = []model.KVPair{} + keys[KEY_IP] = map[string]bool{} + + versions.poolVersion = resourceVersion + for _, p := range poolList { + snap[KEY_IP] = append(snap[KEY_IP], *p) + keys[KEY_IP][p.Key.String()] = true + } } - if !syn.disableNodePoll { + // Resync Nodes only if needed. + if !syn.disableNodePoll && syn.needsResync[KEY_NO] { log.Info("Syncing Nodes") noList, err := syn.kubeAPI.NodeList(opts) if err != nil { @@ -711,30 +795,34 @@ func (syn *kubeSyncer) performSnapshot(versions *resourceVersions) ([]model.KVPa time.Sleep(1 * time.Second) continue } - log.Info("Received Node List() response") + log.Debug("Received Node List() response") + + // Ensure maps are initialized. + snap[KEY_NO] = []model.KVPair{} + keys[KEY_NO] = map[string]bool{} versions.nodeVersion = noList.ListMeta.ResourceVersion for _, no := range noList.Items { node, err := resources.K8sNodeToCalico(&no) if err != nil { - log.Panicf("%s", err) + log.WithError(err).Fatal("Error converting node") } if node != nil { - snap = append(snap, *node) - keys[node.Key.String()] = true + snap[KEY_NO] = append(snap[KEY_NO], *node) + keys[KEY_NO][node.Key.String()] = true } } } - // Include ready state. + // Include ready state always. ready, err := syn.kubeAPI.getReadyStatus(model.ReadyFlagKey{}) if err != nil { log.Warnf("Error querying ready status during snapshot, retrying: %s", err) time.Sleep(1 * time.Second) continue } - snap = append(snap, *ready) - keys[ready.Key.String()] = true + snap[KEY_RS] = []model.KVPair{*ready} + keys[KEY_RS] = map[string]bool{ready.Key.String(): true} log.Infof("Snapshot resourceVersions: %+v", versions) log.Debugf("Created snapshot: %+v", snap) diff --git a/lib/backend/k8s/syncer_test.go b/lib/backend/k8s/syncer_test.go index 7204ccb22..5d84fb006 100644 --- a/lib/backend/k8s/syncer_test.go +++ b/lib/backend/k8s/syncer_test.go @@ -53,6 +53,7 @@ func (tw *testWatch) ResultChan() <-chan watch.Event { type testClient struct { openWatchers []*testWatch podC chan watch.Event + poolC chan watch.Event state map[model.Key]interface{} stateMutex sync.Mutex listCalls int @@ -115,7 +116,7 @@ func (tc *testClient) GlobalConfigWatch(opts metav1.ListOptions) (w watch.Interf } func (tc *testClient) IPPoolWatch(opts metav1.ListOptions) (w watch.Interface, err error) { - w = tc.newWatch("IP pool", make(chan watch.Event)) + w = tc.newWatch("IP pool", tc.poolC) err = nil return } @@ -187,6 +188,24 @@ func (tc *testClient) getReadyStatus(key model.ReadyFlagKey) (*model.KVPair, err return &model.KVPair{Key: key, Value: true}, nil } +// getNumListCalls returns the number of List() calls performed by the syncer +// against the Kubernetes API throuout the test. +func (tc *testClient) getNumListCalls() int { + tc.stateMutex.Lock() + defer tc.stateMutex.Unlock() + log.WithField("listCalls", tc.listCalls).Info("") + return tc.listCalls +} + +// getNumWatchCalls returns the number of Watches performed by the syncer +// against the Kubernetes API through the test. +func (tc *testClient) getNumWatchCalls() int { + tc.stateMutex.Lock() + defer tc.stateMutex.Unlock() + log.WithField("watchCalls", tc.watchCalls).Info("") + return tc.watchCalls +} + var _ = Describe("Test Syncer", func() { var ( tc *testClient @@ -196,6 +215,7 @@ var _ = Describe("Test Syncer", func() { BeforeEach(func() { tc = &testClient{ podC: make(chan watch.Event), + poolC: make(chan watch.Event), state: map[model.Key]interface{}{}, } syn = newSyncer(tc, converter{}, tc, false) @@ -216,31 +236,49 @@ var _ = Describe("Test Syncer", func() { }) It("should not resync when one watch times out", func() { - getNumListCalls := func() interface{} { - tc.stateMutex.Lock() - defer tc.stateMutex.Unlock() - log.WithField("listCalls", tc.listCalls).Info("") - return tc.listCalls - } - getNumWatchCalls := func() interface{} { - tc.stateMutex.Lock() - defer tc.stateMutex.Unlock() - log.WithField("watchCalls", tc.watchCalls).Info("") - return tc.watchCalls - } // Initial resync makes 8 list calls and 7 watch calls. const ( LIST_CALLS = 8 WATCH_CALLS = 7 ) - Eventually(getNumListCalls).Should(BeNumerically("==", LIST_CALLS)) - Eventually(getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS)) + Eventually(tc.getNumListCalls).Should(BeNumerically("==", LIST_CALLS)) + Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS)) + // Simulate timeout of the pod watch. tc.podC <- watch.Event{Object: nil} + // Expect a new watch call. - Eventually(getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS+1)) + Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS+1)) // But no new list calls. - Expect(getNumListCalls()).To(BeNumerically("==", LIST_CALLS)) + Expect(tc.getNumListCalls()).To(BeNumerically("==", LIST_CALLS)) + }) + + It("should resync resources individually", func() { + // Initial resync makes 8 list calls and 7 watch calls. + const ( + LIST_CALLS = 8 + WATCH_CALLS = 7 + ) + Eventually(tc.getNumListCalls).Should(BeNumerically("==", LIST_CALLS)) + Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS)) + + // Simulate error on pod watch. + tc.podC <- watch.Event{Type: watch.Error, Object: nil} + // Expect a single new list call, but that each watcher is restarted. + Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS+7)) + Expect(tc.getNumListCalls()).To(BeNumerically("==", LIST_CALLS+1)) + + // Simulate error on IP Pool watch. + tc.poolC <- watch.Event{Type: watch.Error, Object: nil} + // Expect a single new list call, but that each watcher is restarted. + Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS+14)) + Expect(tc.getNumListCalls()).To(BeNumerically("==", LIST_CALLS+2)) + + // Simulate empty event on IP Pool watch (resourceVersion too old for TPRs) + tc.poolC <- watch.Event{Object: nil} + // Expect a single new list call, but that each watcher is restarted. + Eventually(tc.getNumWatchCalls).Should(BeNumerically("==", WATCH_CALLS+21)) + Expect(tc.getNumListCalls()).To(BeNumerically("==", LIST_CALLS+3)) }) It("should correctly handle pod being deleted in resync", func() {