Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous synchronization of Beyla cache #1358

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 10 additions & 33 deletions pkg/internal/kube/informer_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (

const (
kubeConfigEnvVariable = "KUBECONFIG"
defaultResyncTime = 30 * time.Minute
defaultSyncTimeout = 60 * time.Second
)

func klog() *slog.Logger {
Expand All @@ -48,14 +46,7 @@ type MetadataProvider struct {
}

func NewMetadataProvider(config MetadataConfig) *MetadataProvider {
if config.SyncTimeout == 0 {
config.SyncTimeout = defaultSyncTimeout
}
if config.ResyncPeriod == 0 {
config.ResyncPeriod = defaultResyncTime
}
mp := &MetadataProvider{cfg: &config}
return mp
return &MetadataProvider{cfg: &config}
}

func (mp *MetadataProvider) IsKubeEnabled() bool {
Expand Down Expand Up @@ -167,29 +158,15 @@ func (mp *MetadataProvider) CurrentNodeName(ctx context.Context) (string, error)
// initLocalInformers initializes an informer client that directly connects to the Node Kube API
// for getting informer data
func (mp *MetadataProvider) initLocalInformers(ctx context.Context) (*meta.Informers, error) {
done := make(chan error)
var informers *meta.Informers
go func() {
var err error
opts := append(disabledInformerOpts(mp.cfg.DisabledInformers),
meta.WithResyncPeriod(mp.cfg.ResyncPeriod),
meta.WithKubeConfigPath(mp.cfg.KubeConfigPath))
if informers, err = meta.InitInformers(ctx, opts...); err != nil {
done <- err
}
close(done)
}()

select {
case <-time.After(mp.cfg.SyncTimeout):
klog().Warn("kubernetes cache has not been synced after timeout. The kubernetes attributes might be incomplete."+
" Consider increasing the BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT value", "timeout", mp.cfg.SyncTimeout)
case err, ok := <-done:
if ok {
return nil, fmt.Errorf("failed to initialize Kubernetes informers: %w", err)
}
Comment on lines -183 to -190
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we didn't realize a bug here.

If the timeout happened, the later returned "informers" variable would be nil, as it requires that the InitInformers function successfully returns from another goroutine.

}
return informers, nil
opts := append(disabledInformerOpts(mp.cfg.DisabledInformers),
meta.WithResyncPeriod(mp.cfg.ResyncPeriod),
meta.WithKubeConfigPath(mp.cfg.KubeConfigPath),
// we don't want that the informer starts decorating spans and flows
// before getting all the existing K8s metadata
meta.WaitForCacheSync(),
meta.WithCacheSyncTimeout(mp.cfg.SyncTimeout),
)
return meta.InitInformers(ctx, opts...)
}

// initRemoteInformerCacheClient connects via gRPC/Protobuf to a remote beyla-k8s-cache service, to avoid that
Expand Down
191 changes: 119 additions & 72 deletions pkg/kubecache/envtest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
ctx context.Context
k8sClient client.Client
testEnv *envtest.Environment

kubeAPIIface kubernetes.Interface
)

const timeout = 10 * time.Second
Expand All @@ -58,7 +60,7 @@ func TestMain(m *testing.M) {
os.Exit(1)
}
config := k8sManager.GetConfig()
theClient, err := kubernetes.NewForConfig(config)
kubeAPIIface, err = kubernetes.NewForConfig(config)
if err != nil {
slog.Error("creating kube API client", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -93,7 +95,7 @@ func TestMain(m *testing.M) {
go func() {
if err := svc.Run(ctx,
meta.WithResyncPeriod(iConfig.InformerResyncPeriod),
meta.WithKubeClient(theClient),
meta.WithKubeClient(kubeAPIIface),
); err != nil {
slog.Error("running service", "error", err)
os.Exit(1)
Expand All @@ -104,11 +106,13 @@ func TestMain(m *testing.M) {
}

func TestAPIs(t *testing.T) {
svcClient := serviceClient{ID: "first-pod", Address: fmt.Sprintf("127.0.0.1:%d", freePort)}
// client
require.Eventually(t, func() bool {
return svcClient.Start(ctx) == nil
}, timeout, 100*time.Millisecond)
svcClient := serviceClient{
Address: fmt.Sprintf("127.0.0.1:%d", freePort),
Messages: make(chan *informer.Event, 10),
}
test.Eventually(t, timeout, func(t require.TestingT) {
svcClient.Start(ctx, t)
})

// wait for the service to have sent the initial snapshot of entities
// (at the end, will send the "SYNC_FINISHED" event)
Expand Down Expand Up @@ -145,23 +149,24 @@ func TestAPIs(t *testing.T) {
}

func TestBlockedClients(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

// a varied number of cache clients connect concurrently. Some of them are blocked
// after a while, and they don't release the connection
never1 := &countingStallingClient{stallAfterMessages: 1000000}
never2 := &countingStallingClient{stallAfterMessages: 1000000}
never3 := &countingStallingClient{stallAfterMessages: 1000000}
stall5 := &countingStallingClient{stallAfterMessages: 5}
stall10 := &countingStallingClient{stallAfterMessages: 10}
stall15 := &countingStallingClient{stallAfterMessages: 15}
go stall15.Start(ctx, t, freePort)
go never1.Start(ctx, t, freePort)
go stall5.Start(ctx, t, freePort)
go never2.Start(ctx, t, freePort)
go stall10.Start(ctx, t, freePort)
go never3.Start(ctx, t, freePort)
addr := fmt.Sprintf("127.0.0.1:%d", freePort)
never1 := &serviceClient{Address: addr, stallAfterMessages: 1000000}
never2 := &serviceClient{Address: addr, stallAfterMessages: 1000000}
never3 := &serviceClient{Address: addr, stallAfterMessages: 1000000}
stall5 := &serviceClient{Address: addr, stallAfterMessages: 5}
stall10 := &serviceClient{Address: addr, stallAfterMessages: 10}
stall15 := &serviceClient{Address: addr, stallAfterMessages: 15}
go stall15.Start(ctx, t)
go never1.Start(ctx, t)
go stall5.Start(ctx, t)
go never2.Start(ctx, t)
go stall10.Start(ctx, t)
go never3.Start(ctx, t)

// generating a large number of notifications until the gRPC buffer of the
// server-to-client connections is full, so the "Send" operation is blocked
Expand All @@ -187,9 +192,9 @@ func TestBlockedClients(t *testing.T) {
test.Eventually(t, timeout, func(t require.TestingT) {
// the clients that got stalled, just received the expected number of messages
// before they got blocked
require.EqualValues(t, 5, stall5.readMessages.Load())
require.EqualValues(t, 10, stall10.readMessages.Load())
require.EqualValues(t, 15, stall15.readMessages.Load())
require.EqualValues(t, int32(5), stall5.readMessages.Load())
require.EqualValues(t, int32(10), stall10.readMessages.Load())
require.EqualValues(t, int32(15), stall15.readMessages.Load())

// but that did not block the rest of clients, which got all the expected messages
require.GreaterOrEqual(t, never1.readMessages.Load(), int32(createdPods))
Expand All @@ -203,6 +208,67 @@ func TestBlockedClients(t *testing.T) {
ReadChannel(t, allSent, timeout)
}

// makes sure that a new cache server won't forward the sync data to the clients until
// it effectively has synced everything
func TestAsynchronousStartup(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

// generating some contents to force a new Beyla Cache service to take a while
// to synchronize during initialization
const createdPods = 20
for n := 0; n < createdPods; n++ {
require.NoError(t, k8sClient.Create(ctx, &corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("async-pod-%02d", n),
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "test-container", Image: "nginx"},
},
},
}))
}

// creating a new Beyla cache service instance that will start synchronizing with
// the previously generated amount of data (also from previous tests)
newFreePort, err := test.FreeTCPPort()
require.NoError(t, err)

// create few clients that start trying to connect and sync
// even before the new cache service starts
addr := fmt.Sprintf("127.0.0.1:%d", newFreePort)
cl1 := serviceClient{Address: addr}
cl2 := serviceClient{Address: addr}
cl3 := serviceClient{Address: addr}
// passing the test-wide testing.T instance in case clients fail asynchronously, after eventually succeeded
go func() { test.Eventually(t, timeout, func(_ require.TestingT) { cl1.Start(ctx, t) }) }()
go func() { test.Eventually(t, timeout, func(_ require.TestingT) { cl2.Start(ctx, t) }) }()
go func() { test.Eventually(t, timeout, func(_ require.TestingT) { cl3.Start(ctx, t) }) }()

iConfig := kubecache.DefaultConfig
iConfig.Port = newFreePort
svc := service.InformersCache{Config: &iConfig, SendTimeout: time.Second}
go func() {
require.NoError(t, svc.Run(ctx,
meta.WithResyncPeriod(iConfig.InformerResyncPeriod),
meta.WithKubeClient(kubeAPIIface),
))
}()

// The clients should have received the Sync complete signal even if they
// connected to the cache service before it was fully synchronized
test.Eventually(t, timeout, func(t require.TestingT) {
require.NotZero(t, cl1.syncSignalOnMessage.Load())
require.NotZero(t, cl2.syncSignalOnMessage.Load())
require.NotZero(t, cl3.syncSignalOnMessage.Load())
})
assert.LessOrEqual(t, int32(createdPods), cl1.syncSignalOnMessage.Load())
assert.LessOrEqual(t, int32(createdPods), cl2.syncSignalOnMessage.Load())
assert.LessOrEqual(t, int32(createdPods), cl3.syncSignalOnMessage.Load())
}

func ReadChannel[T any](t require.TestingT, inCh <-chan T, timeout time.Duration) T {
var item T
select {
Expand All @@ -216,76 +282,57 @@ func ReadChannel[T any](t require.TestingT, inCh <-chan T, timeout time.Duration
}

type serviceClient struct {
ID string
Address string
// Address of the cache service
Address string
// Messages to be forwarded on read. If nil, the client won't forward anything
Messages chan *informer.Event
// counter of read messages
readMessages atomic.Int32
// if != 0, the client will be blocked when the count of read messages reach stallAfterMessages
stallAfterMessages int32
// stores at which message number the signal is synced
syncSignalOnMessage atomic.Int32
}

func (sc *serviceClient) Start(ctx context.Context) error {
sc.Messages = make(chan *informer.Event, 10)

func (sc *serviceClient) Start(ctx context.Context, t require.TestingT) {
conn, err := grpc.NewClient(sc.Address,
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("can't connect client: %w", err)
}
require.NoError(t, err)

eventsClient := informer.NewEventStreamServiceClient(conn)

// Subscribe to the event stream.
stream, err := eventsClient.Subscribe(ctx, &informer.SubscribeMessage{})
if err != nil {
return fmt.Errorf("subscribing: %w", err)
}
require.NoError(t, err)

// Receive and print messages.
go func() {
defer conn.Close()
for {
if sc.stallAfterMessages != 0 && sc.stallAfterMessages == sc.readMessages.Load() {
// just block without doing any connection activity
// nor closing/releasing the connection
<-stream.Context().Done()
return
}
event, err := stream.Recv()
if err != nil {
slog.Error("receiving message at client side", "error", err)
break
}
sc.Messages <- event
sc.readMessages.Add(1)
if sc.Messages != nil {
sc.Messages <- event
}
if event.Type == informer.EventType_SYNC_FINISHED {
if sc.syncSignalOnMessage.Load() != 0 {
t.Errorf("client %s: can't receive two signal sync messages! (received at %d and %d)",
conn.GetState().String(), sc.syncSignalOnMessage.Load(), sc.readMessages.Load())
t.FailNow()
}
sc.syncSignalOnMessage.Store(sc.readMessages.Load())
}
}

}()
return nil
}

// a fake client that counts the received messages and gets blocked (without closing the connection)
// after a defined number of messages
type countingStallingClient struct {
readMessages atomic.Int32
stallAfterMessages int32
}

func (csc *countingStallingClient) Start(ctx context.Context, t *testing.T, port int) {
// Set up a connection to the server.
address := fmt.Sprintf("127.0.0.1:%d", port)
conn, err := grpc.NewClient(address,
grpc.WithTransportCredentials(insecure.NewCredentials()))
// nolint:staticcheck
defer conn.Close()
require.NoError(t, err)
client := informer.NewEventStreamServiceClient(conn)

// Subscribe to the event stream.
stream, err := client.Subscribe(ctx, &informer.SubscribeMessage{})
require.NoError(t, err)

// Receive messages
for {
if csc.stallAfterMessages == csc.readMessages.Load() {
// just block without doing any connection activity
// nor closing/releasing the connection
<-stream.Context().Done()
return
}
if _, err := stream.Recv(); err == nil {
csc.readMessages.Add(1)
}
// discarding event
}
}
33 changes: 24 additions & 9 deletions pkg/kubecache/meta/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Informers struct {
pods cache.SharedIndexInformer
nodes cache.SharedIndexInformer
services cache.SharedIndexInformer

waitForSync chan struct{}
}

func (inf *Informers) Subscribe(observer Observer) {
Expand Down Expand Up @@ -57,13 +59,26 @@ func (inf *Informers) Subscribe(observer Observer) {
}
}
}
// notify the end of synchronization, so the client knows that already has a snapshot
// of all the existing resources
if err := observer.On(&informer.Event{
Type: informer.EventType_SYNC_FINISHED,
}); err != nil {
inf.log.Debug("error notifying observer. Unsubscribing", "observerID", observer.ID(), "error", err)
inf.BaseNotifier.Unsubscribe(observer)
return
}

// until the informer waitForSync, we won't send the sync_finished event to remote beyla clients
// TODO: in some very slowed-down environments (e.g. tests with -race conditions), this last message might
// be sent and executed before the rest of previous updates have been processed and submitted.
// In production, it might mean that few initialization updates are sent right before the "sync_finished" signal.
// To fix that we should rearchitecture this to not directly invoking the notifications but enqueuing them
// in a synchronized list.
// Given the amount of work and complexity, we can afford this small delay, as the data eventually
// reaches the client right after the sync_finished signal.
go func() {
<-inf.waitForSync

// notify the end of synchronization, so the client knows that already has a snapshot
// of all the existing resources
if err := observer.On(&informer.Event{
Type: informer.EventType_SYNC_FINISHED,
}); err != nil {
inf.log.Debug("error notifying observer. Unsubscribing", "observerID", observer.ID(), "error", err)
inf.BaseNotifier.Unsubscribe(observer)
return
}
}()
}
Loading
Loading