diff --git a/pkg/gateway/concurrent/gateway.go b/pkg/gateway/concurrent/gateway.go index 873e615ad6..9e834628c0 100644 --- a/pkg/gateway/concurrent/gateway.go +++ b/pkg/gateway/concurrent/gateway.go @@ -16,6 +16,7 @@ package concurrent import ( "context" + "fmt" "net" corev1 "k8s.io/api/core/v1" @@ -31,16 +32,16 @@ var _ manager.Runnable = &RunnableGateway{} type RunnableGateway struct { Client client.Client - PodName string - DeploymentName string - Namespace string + PodName string + GatewayName string + Namespace string Socket net.Listener GuestConnections ipc.GuestConnections } // NewRunnableGateway creates a new Runnable. -func NewRunnableGateway(cl client.Client, podName, deploymentName, namespace string, containerNames []string) (*RunnableGateway, error) { +func NewRunnableGateway(cl client.Client, podName, gatewayName, namespace string, containerNames []string) (*RunnableGateway, error) { guestConnections := ipc.NewGuestConnections(containerNames) socket, err := ipc.CreateListenSocket(unixSocketPath) @@ -56,7 +57,7 @@ func NewRunnableGateway(cl client.Client, podName, deploymentName, namespace str return &RunnableGateway{ Client: cl, PodName: podName, - DeploymentName: deploymentName, + GatewayName: gatewayName, Namespace: namespace, Socket: socket, GuestConnections: guestConnections, @@ -67,7 +68,7 @@ func NewRunnableGateway(cl client.Client, podName, deploymentName, namespace str func (rg *RunnableGateway) Start(ctx context.Context) error { defer rg.Close() - pods, err := ListAllGatewaysReplicas(ctx, rg.Client, rg.Namespace, rg.DeploymentName) + pods, err := ListAllGatewaysReplicas(ctx, rg.Client, rg.Namespace, rg.GatewayName) if err != nil { return err } @@ -84,6 +85,10 @@ func (rg *RunnableGateway) Start(ctx context.Context) error { } } + if activePod == nil { + return fmt.Errorf("active gateway pod not found") + } + if err := AddActiveGatewayLabel(ctx, rg.Client, client.ObjectKeyFromObject(activePod)); err != nil { return err } diff --git a/pkg/gateway/concurrent/k8s.go b/pkg/gateway/concurrent/k8s.go index 17350fb5d0..75ee108ce8 100644 --- a/pkg/gateway/concurrent/k8s.go +++ b/pkg/gateway/concurrent/k8s.go @@ -77,11 +77,11 @@ func RemoveActiveGatewayLabel(ctx context.Context, cl client.Client, key client. return nil } -// ListAllGatewaysReplicas returns the list of all the gateways replicas of the same deployment. -func ListAllGatewaysReplicas(ctx context.Context, cl client.Client, namespace, deploymentName string) ([]corev1.Pod, error) { +// ListAllGatewaysReplicas returns the list of all the gateways replicas of the same gateway. +func ListAllGatewaysReplicas(ctx context.Context, cl client.Client, namespace, gatewayName string) ([]corev1.Pod, error) { podList := &corev1.PodList{} if err := cl.List(ctx, podList, client.InNamespace(namespace), client.MatchingLabels{ - consts.K8sAppNameKey: deploymentName, + consts.K8sAppNameKey: gatewayName, }); err != nil { return nil, err } diff --git a/pkg/utils/maps/maps.go b/pkg/utils/maps/maps.go index 0d356369e6..55a01b6853 100644 --- a/pkg/utils/maps/maps.go +++ b/pkg/utils/maps/maps.go @@ -17,7 +17,7 @@ package maps import ( "fmt" "maps" - "sort" + "slices" "strings" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -179,16 +179,14 @@ func UpdateCache(annots, template map[string]string, cacheKey string) map[string // SerializeMap convert a map in a string of concatenated keys seprated by commas. func SerializeMap(m map[string]string) string { serialized := "" - keys := make([]string, len(m)) - i := 0 + keys := make([]string, 0, len(m)) for k := range m { - keys[i] = k - i++ + keys = append(keys, k) } - sort.Strings(keys) + slices.Sort(keys) for _, k := range keys { - serialized += fmt.Sprintf("%s,", m[k]) + serialized += fmt.Sprintf("%s,", k) } return serialized } diff --git a/test/e2e/cruise/network/network_test.go b/test/e2e/cruise/network/network_test.go index 61c8488fbf..fd5f96faaa 100644 --- a/test/e2e/cruise/network/network_test.go +++ b/test/e2e/cruise/network/network_test.go @@ -44,7 +44,7 @@ import ( const ( // clustersRequired is the number of clusters required in this E2E test. - clustersRequired = 2 + clustersRequired = 3 // testName is the name of this E2E test. testName = "NETWORK" // StressMax is the maximum number of stress iterations. @@ -126,16 +126,29 @@ var _ = Describe("Liqo E2E", func() { When("\"liqoctl test network\" runs", func() { It("should succeed both before and after gateway pods restart", func() { // Run the tests. - Eventually(runLiqoctlNetworkTests(defaultArgs), timeout, interval).Should(Succeed()) + Eventually(func() error { + return runLiqoctlNetworkTests(defaultArgs) + }, timeout, interval).Should(Succeed()) // Restart the gateway pods. for i := range testContext.Clusters { RestartPods(testContext.Clusters[i].ControllerClient) } + // Check if there is only one active gateway pod per remote cluster. + for i := range testContext.Clusters { + numActiveGateway := testContext.Clusters[i].NumPeeredConsumers + testContext.Clusters[i].NumPeeredProviders + Eventually(func() error { + return checkUniqueActiveGatewayPod(testContext.Clusters[i].ControllerClient, numActiveGateway) + }, timeout, interval).Should(Succeed()) + } + // Run the tests again. - Eventually(runLiqoctlNetworkTests(defaultArgs), timeout, interval).Should(Succeed()) + Eventually(func() error { + return runLiqoctlNetworkTests(defaultArgs) + }, timeout, interval).Should(Succeed()) }) + It("should succeed both before and after gateway pods restart (stress gateway deletion and run basic tests)", func() { args := defaultArgs args.basic = true @@ -146,12 +159,22 @@ var _ = Describe("Liqo E2E", func() { RestartPods(testContext.Clusters[j].ControllerClient) } + // Check if there is only one active gateway pod per remote cluster. + for j := range testContext.Clusters { + numActiveGateway := testContext.Clusters[j].NumPeeredConsumers + testContext.Clusters[j].NumPeeredProviders + Eventually(func() error { + return checkUniqueActiveGatewayPod(testContext.Clusters[j].ControllerClient, numActiveGateway) + }, timeout, interval).Should(Succeed()) + } + if i == stressMax-1 { args.remove = true } // Run the tests. - Eventually(runLiqoctlNetworkTests(args), timeout, interval).Should(Succeed()) + Eventually(func() error { + return runLiqoctlNetworkTests(args) + }, timeout, interval).Should(Succeed()) } }) }) @@ -299,7 +322,6 @@ func RestartPods(cl client.Client) { if err := cl.List(ctx, deploymentList, &client.ListOptions{ LabelSelector: labels.SelectorFromSet(labels.Set{ gateway.GatewayComponentKey: gateway.GatewayComponentGateway, - concurrent.ActiveGatewayKey: concurrent.ActiveGatewayValue, }), }); err != nil { return err @@ -314,3 +336,25 @@ func RestartPods(cl client.Client) { return nil }, timeout, interval).Should(Succeed()) } + +// checkUniqueActiveGatewayPod checks if there is only one active gateway pod. +func checkUniqueActiveGatewayPod(cl client.Client, numActiveGateway int) error { + // Sleep few seconds to be sure that the new leader is elected. + time.Sleep(2 * time.Second) + + podList := &corev1.PodList{} + if err := cl.List(ctx, podList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{ + gateway.GatewayComponentKey: gateway.GatewayComponentGateway, + concurrent.ActiveGatewayKey: concurrent.ActiveGatewayValue, + }), + }); err != nil { + return fmt.Errorf("unable to list active gateway pods: %w", err) + } + + if len(podList.Items) != numActiveGateway { + return fmt.Errorf("expected %d active gateway pod, got %d", numActiveGateway, len(podList.Items)) + } + + return nil +} diff --git a/test/e2e/pipeline/infra/cluster-api/setup.sh b/test/e2e/pipeline/infra/cluster-api/setup.sh index 15a4f0edc3..7c84ad75f3 100755 --- a/test/e2e/pipeline/infra/cluster-api/setup.sh +++ b/test/e2e/pipeline/infra/cluster-api/setup.sh @@ -38,8 +38,8 @@ WORKDIR=$(dirname "$FILEPATH") source "$WORKDIR/../../utils.sh" # shellcheck disable=SC1091 -# shellcheck source=./cni.sh -source "$WORKDIR/cni.sh" +# shellcheck source=../cni.sh +source "$WORKDIR/../cni.sh" export K8S_VERSION=${K8S_VERSION:-"1.29.7"} K8S_VERSION=$(echo -n "$K8S_VERSION" | sed 's/v//g') # remove the leading v diff --git a/test/e2e/pipeline/infra/cluster-api/cni.sh b/test/e2e/pipeline/infra/cni.sh similarity index 100% rename from test/e2e/pipeline/infra/cluster-api/cni.sh rename to test/e2e/pipeline/infra/cni.sh diff --git a/test/e2e/pipeline/infra/kind/pre-requirements.sh b/test/e2e/pipeline/infra/kind/pre-requirements.sh index 83f49014c3..cad72e8095 100755 --- a/test/e2e/pipeline/infra/kind/pre-requirements.sh +++ b/test/e2e/pipeline/infra/kind/pre-requirements.sh @@ -47,7 +47,7 @@ install_kubectl "${OS}" "${ARCH}" "${K8S_VERSION}" install_helm "${OS}" "${ARCH}" -KIND_VERSION="v0.23.0" +KIND_VERSION="v0.24.0" echo "Downloading Kind ${KIND_VERSION}" diff --git a/test/e2e/pipeline/infra/kind/setup.sh b/test/e2e/pipeline/infra/kind/setup.sh index fbb3a3e066..ed6b63eaee 100755 --- a/test/e2e/pipeline/infra/kind/setup.sh +++ b/test/e2e/pipeline/infra/kind/setup.sh @@ -34,6 +34,10 @@ WORKDIR=$(dirname "$FILEPATH") # shellcheck source=../../utils.sh source "$WORKDIR/../../utils.sh" +# shellcheck disable=SC1091 +# shellcheck source=../cni.sh +source "$WORKDIR/../cni.sh" + KIND="${BINDIR}/kind" CLUSTER_NAME=cluster @@ -59,6 +63,11 @@ do echo "Creating cluster ${CLUSTER_NAME}${i}..." ${KIND} create cluster --name "${CLUSTER_NAME}${i}" --kubeconfig "${TMPDIR}/kubeconfigs/liqo_kubeconf_${i}" --config "${TMPDIR}/liqo-cluster-${CLUSTER_NAME}${i}.yaml" --wait 2m + # Install CNI if kindnet disabled + if [[ ${DISABLE_KINDNET} == "true" ]]; then + "install_${CNI}" "${TMPDIR}/kubeconfigs/liqo_kubeconf_${i}" + fi + # Install metrics-server install_metrics_server "${TMPDIR}/kubeconfigs/liqo_kubeconf_${i}" done diff --git a/test/e2e/pipeline/installer/liqoctl/setup.sh b/test/e2e/pipeline/installer/liqoctl/setup.sh index f47ddd4a18..e8fcb368b8 100755 --- a/test/e2e/pipeline/installer/liqoctl/setup.sh +++ b/test/e2e/pipeline/installer/liqoctl/setup.sh @@ -63,7 +63,7 @@ LIQO_VERSION="${LIQO_VERSION:-$(git rev-parse HEAD)}" export SERVICE_CIDR=10.100.0.0/16 export POD_CIDR=10.200.0.0/16 export POD_CIDR_OVERLAPPING=${POD_CIDR_OVERLAPPING:-"false"} -export HA_REPLICAS=3 +export HA_REPLICAS=2 for i in $(seq 1 "${CLUSTER_NUMBER}"); do diff --git a/test/e2e/postinstall/basic_test.go b/test/e2e/postinstall/basic_test.go index acbdbb8f15..0c551cceb6 100644 --- a/test/e2e/postinstall/basic_test.go +++ b/test/e2e/postinstall/basic_test.go @@ -26,9 +26,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" + "k8s.io/utils/ptr" "github.com/liqotech/liqo/pkg/consts" + gwconsts "github.com/liqotech/liqo/pkg/gateway" fcutils "github.com/liqotech/liqo/pkg/utils/foreigncluster" + "github.com/liqotech/liqo/pkg/vkMachinery" "github.com/liqotech/liqo/test/e2e/testutils/tester" "github.com/liqotech/liqo/test/e2e/testutils/util" ) @@ -92,8 +95,28 @@ var _ = Describe("Liqo E2E", func() { for _, tenantNs := range tenantNsList.Items { Eventually(func() bool { readyPods, notReadyPods, err := util.ArePodsUp(ctx, cluster.NativeClient, tenantNs.Name) + Expect(err).ToNot(HaveOccurred()) klog.Infof("Tenant pods status: %d ready, %d not ready", len(readyPods), len(notReadyPods)) - return err == nil && len(notReadyPods) == 0 && len(readyPods) == util.NumPodsInTenantNs(true, cluster.Role) + // Get deployment gateway + gwDeployments, err := cluster.NativeClient.AppsV1().Deployments(tenantNs.Name).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", gwconsts.GatewayComponentKey, gwconsts.GatewayComponentGateway), + }) + Expect(err).ToNot(HaveOccurred()) + Expect(gwDeployments.Items).To(HaveLen(1)) + gwReplicas := int(ptr.Deref(gwDeployments.Items[0].Spec.Replicas, 1)) + + // Get deployment virtual-kubelet if role is consumer + vkReplicas := 0 + if fcutils.IsConsumer(cluster.Role) { + vkDeployments, err := cluster.NativeClient.AppsV1().Deployments(tenantNs.Name).List(ctx, metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(vkMachinery.KubeletBaseLabels).String(), + }) + Expect(err).ToNot(HaveOccurred()) + Expect(vkDeployments.Items).To(HaveLen(1)) + vkReplicas = int(ptr.Deref(vkDeployments.Items[0].Spec.Replicas, 1)) + } + return len(notReadyPods) == 0 && + len(readyPods) == util.NumPodsInTenantNs(true, cluster.Role, gwReplicas, vkReplicas) }, timeout, interval).Should(BeTrue()) } }, diff --git a/test/e2e/testutils/util/pod.go b/test/e2e/testutils/util/pod.go index 0c700511b9..b08bb7371b 100644 --- a/test/e2e/testutils/util/pod.go +++ b/test/e2e/testutils/util/pod.go @@ -79,15 +79,15 @@ func ArePodsUp(ctx context.Context, clientset kubernetes.Interface, namespace st } // NumPodsInTenantNs returns the number of pods that should be present in a tenant namespace. -func NumPodsInTenantNs(networkingEnabled bool, role liqov1beta1.RoleType) int { +func NumPodsInTenantNs(networkingEnabled bool, role liqov1beta1.RoleType, gwReplicas, vkReplicas int) int { count := 0 // If the network is enabled, it should have the gateway pod. if networkingEnabled { - count += 3 + count += gwReplicas } // If the cluster is a consumer, it should have the virtual-kubelet pod. if fcutils.IsConsumer(role) { - count++ + count += vkReplicas } return count }