Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: fix HA replicas checks #2804

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions pkg/gateway/concurrent/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package concurrent

import (
"context"
"fmt"
"net"

corev1 "k8s.io/api/core/v1"
Expand All @@ -31,16 +32,16 @@ var _ manager.Runnable = &RunnableGateway{}
type RunnableGateway struct {
Client client.Client

PodName string
DeploymentName string
Namespace string
PodName string
GatewayName string
Namespace string

Socket net.Listener
GuestConnections ipc.GuestConnections
}

// NewRunnableGateway creates a new Runnable.
func NewRunnableGateway(cl client.Client, podName, deploymentName, namespace string, containerNames []string) (*RunnableGateway, error) {
func NewRunnableGateway(cl client.Client, podName, gatewayName, namespace string, containerNames []string) (*RunnableGateway, error) {
guestConnections := ipc.NewGuestConnections(containerNames)

socket, err := ipc.CreateListenSocket(unixSocketPath)
Expand All @@ -56,7 +57,7 @@ func NewRunnableGateway(cl client.Client, podName, deploymentName, namespace str
return &RunnableGateway{
Client: cl,
PodName: podName,
DeploymentName: deploymentName,
GatewayName: gatewayName,
Namespace: namespace,
Socket: socket,
GuestConnections: guestConnections,
Expand All @@ -67,7 +68,7 @@ func NewRunnableGateway(cl client.Client, podName, deploymentName, namespace str
func (rg *RunnableGateway) Start(ctx context.Context) error {
defer rg.Close()

pods, err := ListAllGatewaysReplicas(ctx, rg.Client, rg.Namespace, rg.DeploymentName)
pods, err := ListAllGatewaysReplicas(ctx, rg.Client, rg.Namespace, rg.GatewayName)
if err != nil {
return err
}
Expand All @@ -84,6 +85,10 @@ func (rg *RunnableGateway) Start(ctx context.Context) error {
}
}

if activePod == nil {
return fmt.Errorf("active gateway pod not found")
}

if err := AddActiveGatewayLabel(ctx, rg.Client, client.ObjectKeyFromObject(activePod)); err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/gateway/concurrent/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ func RemoveActiveGatewayLabel(ctx context.Context, cl client.Client, key client.
return nil
}

// ListAllGatewaysReplicas returns the list of all the gateways replicas of the same deployment.
func ListAllGatewaysReplicas(ctx context.Context, cl client.Client, namespace, deploymentName string) ([]corev1.Pod, error) {
// ListAllGatewaysReplicas returns the list of all the gateways replicas of the same gateway.
func ListAllGatewaysReplicas(ctx context.Context, cl client.Client, namespace, gatewayName string) ([]corev1.Pod, error) {
podList := &corev1.PodList{}
if err := cl.List(ctx, podList, client.InNamespace(namespace), client.MatchingLabels{
consts.K8sAppNameKey: deploymentName,
consts.K8sAppNameKey: gatewayName,
}); err != nil {
return nil, err
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/utils/maps/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package maps
import (
"fmt"
"maps"
"sort"
"slices"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -179,16 +179,14 @@ func UpdateCache(annots, template map[string]string, cacheKey string) map[string
// SerializeMap convert a map in a string of concatenated keys seprated by commas.
func SerializeMap(m map[string]string) string {
serialized := ""
keys := make([]string, len(m))
i := 0
keys := make([]string, 0, len(m))
for k := range m {
keys[i] = k
i++
keys = append(keys, k)
}
sort.Strings(keys)
slices.Sort(keys)

for _, k := range keys {
serialized += fmt.Sprintf("%s,", m[k])
serialized += fmt.Sprintf("%s,", k)
}
return serialized
}
Expand Down
54 changes: 49 additions & 5 deletions test/e2e/cruise/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (

const (
// clustersRequired is the number of clusters required in this E2E test.
clustersRequired = 2
clustersRequired = 3
// testName is the name of this E2E test.
testName = "NETWORK"
// StressMax is the maximum number of stress iterations.
Expand Down Expand Up @@ -126,16 +126,29 @@ var _ = Describe("Liqo E2E", func() {
When("\"liqoctl test network\" runs", func() {
It("should succeed both before and after gateway pods restart", func() {
// Run the tests.
Eventually(runLiqoctlNetworkTests(defaultArgs), timeout, interval).Should(Succeed())
Eventually(func() error {
return runLiqoctlNetworkTests(defaultArgs)
}, timeout, interval).Should(Succeed())

// Restart the gateway pods.
for i := range testContext.Clusters {
RestartPods(testContext.Clusters[i].ControllerClient)
}

// Check if there is only one active gateway pod per remote cluster.
for i := range testContext.Clusters {
numActiveGateway := testContext.Clusters[i].NumPeeredConsumers + testContext.Clusters[i].NumPeeredProviders
Eventually(func() error {
return checkUniqueActiveGatewayPod(testContext.Clusters[i].ControllerClient, numActiveGateway)
}, timeout, interval).Should(Succeed())
}

// Run the tests again.
Eventually(runLiqoctlNetworkTests(defaultArgs), timeout, interval).Should(Succeed())
Eventually(func() error {
return runLiqoctlNetworkTests(defaultArgs)
}, timeout, interval).Should(Succeed())
})

It("should succeed both before and after gateway pods restart (stress gateway deletion and run basic tests)", func() {
args := defaultArgs
args.basic = true
Expand All @@ -146,12 +159,22 @@ var _ = Describe("Liqo E2E", func() {
RestartPods(testContext.Clusters[j].ControllerClient)
}

// Check if there is only one active gateway pod per remote cluster.
for j := range testContext.Clusters {
numActiveGateway := testContext.Clusters[j].NumPeeredConsumers + testContext.Clusters[j].NumPeeredProviders
Eventually(func() error {
return checkUniqueActiveGatewayPod(testContext.Clusters[j].ControllerClient, numActiveGateway)
}, timeout, interval).Should(Succeed())
}

if i == stressMax-1 {
args.remove = true
}

// Run the tests.
Eventually(runLiqoctlNetworkTests(args), timeout, interval).Should(Succeed())
Eventually(func() error {
return runLiqoctlNetworkTests(args)
}, timeout, interval).Should(Succeed())
}
})
})
Expand Down Expand Up @@ -299,7 +322,6 @@ func RestartPods(cl client.Client) {
if err := cl.List(ctx, deploymentList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
gateway.GatewayComponentKey: gateway.GatewayComponentGateway,
concurrent.ActiveGatewayKey: concurrent.ActiveGatewayValue,
}),
}); err != nil {
return err
Expand All @@ -314,3 +336,25 @@ func RestartPods(cl client.Client) {
return nil
}, timeout, interval).Should(Succeed())
}

// checkUniqueActiveGatewayPod checks if there is only one active gateway pod.
func checkUniqueActiveGatewayPod(cl client.Client, numActiveGateway int) error {
// Sleep few seconds to be sure that the new leader is elected.
time.Sleep(2 * time.Second)

podList := &corev1.PodList{}
if err := cl.List(ctx, podList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
gateway.GatewayComponentKey: gateway.GatewayComponentGateway,
concurrent.ActiveGatewayKey: concurrent.ActiveGatewayValue,
}),
}); err != nil {
return fmt.Errorf("unable to list active gateway pods: %w", err)
}

if len(podList.Items) != numActiveGateway {
return fmt.Errorf("expected %d active gateway pod, got %d", numActiveGateway, len(podList.Items))
}

return nil
}
4 changes: 2 additions & 2 deletions test/e2e/pipeline/infra/cluster-api/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ WORKDIR=$(dirname "$FILEPATH")
source "$WORKDIR/../../utils.sh"

# shellcheck disable=SC1091
# shellcheck source=./cni.sh
source "$WORKDIR/cni.sh"
# shellcheck source=../cni.sh
source "$WORKDIR/../cni.sh"

export K8S_VERSION=${K8S_VERSION:-"1.29.7"}
K8S_VERSION=$(echo -n "$K8S_VERSION" | sed 's/v//g') # remove the leading v
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/pipeline/infra/kind/pre-requirements.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ install_kubectl "${OS}" "${ARCH}" "${K8S_VERSION}"

install_helm "${OS}" "${ARCH}"

KIND_VERSION="v0.23.0"
KIND_VERSION="v0.24.0"

echo "Downloading Kind ${KIND_VERSION}"

Expand Down
9 changes: 9 additions & 0 deletions test/e2e/pipeline/infra/kind/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ WORKDIR=$(dirname "$FILEPATH")
# shellcheck source=../../utils.sh
source "$WORKDIR/../../utils.sh"

# shellcheck disable=SC1091
# shellcheck source=../cni.sh
source "$WORKDIR/../cni.sh"

KIND="${BINDIR}/kind"

CLUSTER_NAME=cluster
Expand All @@ -59,6 +63,11 @@ do
echo "Creating cluster ${CLUSTER_NAME}${i}..."
${KIND} create cluster --name "${CLUSTER_NAME}${i}" --kubeconfig "${TMPDIR}/kubeconfigs/liqo_kubeconf_${i}" --config "${TMPDIR}/liqo-cluster-${CLUSTER_NAME}${i}.yaml" --wait 2m

# Install CNI if kindnet disabled
if [[ ${DISABLE_KINDNET} == "true" ]]; then
"install_${CNI}" "${TMPDIR}/kubeconfigs/liqo_kubeconf_${i}"
fi

# Install metrics-server
install_metrics_server "${TMPDIR}/kubeconfigs/liqo_kubeconf_${i}"
done
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/pipeline/installer/liqoctl/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ LIQO_VERSION="${LIQO_VERSION:-$(git rev-parse HEAD)}"
export SERVICE_CIDR=10.100.0.0/16
export POD_CIDR=10.200.0.0/16
export POD_CIDR_OVERLAPPING=${POD_CIDR_OVERLAPPING:-"false"}
export HA_REPLICAS=3
export HA_REPLICAS=2

for i in $(seq 1 "${CLUSTER_NUMBER}");
do
Expand Down
25 changes: 24 additions & 1 deletion test/e2e/postinstall/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

"github.com/liqotech/liqo/pkg/consts"
gwconsts "github.com/liqotech/liqo/pkg/gateway"
fcutils "github.com/liqotech/liqo/pkg/utils/foreigncluster"
"github.com/liqotech/liqo/pkg/vkMachinery"
"github.com/liqotech/liqo/test/e2e/testutils/tester"
"github.com/liqotech/liqo/test/e2e/testutils/util"
)
Expand Down Expand Up @@ -92,8 +95,28 @@ var _ = Describe("Liqo E2E", func() {
for _, tenantNs := range tenantNsList.Items {
Eventually(func() bool {
readyPods, notReadyPods, err := util.ArePodsUp(ctx, cluster.NativeClient, tenantNs.Name)
Expect(err).ToNot(HaveOccurred())
klog.Infof("Tenant pods status: %d ready, %d not ready", len(readyPods), len(notReadyPods))
return err == nil && len(notReadyPods) == 0 && len(readyPods) == util.NumPodsInTenantNs(true, cluster.Role)
// Get deployment gateway
gwDeployments, err := cluster.NativeClient.AppsV1().Deployments(tenantNs.Name).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", gwconsts.GatewayComponentKey, gwconsts.GatewayComponentGateway),
})
Expect(err).ToNot(HaveOccurred())
Expect(gwDeployments.Items).To(HaveLen(1))
gwReplicas := int(ptr.Deref(gwDeployments.Items[0].Spec.Replicas, 1))

// Get deployment virtual-kubelet if role is consumer
vkReplicas := 0
if fcutils.IsConsumer(cluster.Role) {
vkDeployments, err := cluster.NativeClient.AppsV1().Deployments(tenantNs.Name).List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(vkMachinery.KubeletBaseLabels).String(),
})
Expect(err).ToNot(HaveOccurred())
Expect(vkDeployments.Items).To(HaveLen(1))
vkReplicas = int(ptr.Deref(vkDeployments.Items[0].Spec.Replicas, 1))
}
return len(notReadyPods) == 0 &&
len(readyPods) == util.NumPodsInTenantNs(true, cluster.Role, gwReplicas, vkReplicas)
}, timeout, interval).Should(BeTrue())
}
},
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/testutils/util/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ func ArePodsUp(ctx context.Context, clientset kubernetes.Interface, namespace st
}

// NumPodsInTenantNs returns the number of pods that should be present in a tenant namespace.
func NumPodsInTenantNs(networkingEnabled bool, role liqov1beta1.RoleType) int {
func NumPodsInTenantNs(networkingEnabled bool, role liqov1beta1.RoleType, gwReplicas, vkReplicas int) int {
count := 0
// If the network is enabled, it should have the gateway pod.
if networkingEnabled {
count += 3
count += gwReplicas
}
// If the cluster is a consumer, it should have the virtual-kubelet pod.
if fcutils.IsConsumer(role) {
count++
count += vkReplicas
}
return count
}
Expand Down