Skip to content

Commit

Permalink
Asynchronous synchronization of Beyla cache (#1358)
Browse files Browse the repository at this point in the history
* Asynchronous synchronization of Beyla cache

* Fixed unit tests for slow -race environments

* Fixed race condition on error during informers initialization
  • Loading branch information
mariomac authored Nov 14, 2024
1 parent c4244b1 commit 3a57830
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 129 deletions.
43 changes: 10 additions & 33 deletions pkg/internal/kube/informer_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (

const (
kubeConfigEnvVariable = "KUBECONFIG"
defaultResyncTime = 30 * time.Minute
defaultSyncTimeout = 60 * time.Second
)

func klog() *slog.Logger {
Expand All @@ -48,14 +46,7 @@ type MetadataProvider struct {
}

func NewMetadataProvider(config MetadataConfig) *MetadataProvider {
if config.SyncTimeout == 0 {
config.SyncTimeout = defaultSyncTimeout
}
if config.ResyncPeriod == 0 {
config.ResyncPeriod = defaultResyncTime
}
mp := &MetadataProvider{cfg: &config}
return mp
return &MetadataProvider{cfg: &config}
}

func (mp *MetadataProvider) IsKubeEnabled() bool {
Expand Down Expand Up @@ -167,29 +158,15 @@ func (mp *MetadataProvider) CurrentNodeName(ctx context.Context) (string, error)
// initLocalInformers initializes an informer client that directly connects to the Node Kube API
// for getting informer data
func (mp *MetadataProvider) initLocalInformers(ctx context.Context) (*meta.Informers, error) {
done := make(chan error)
var informers *meta.Informers
go func() {
var err error
opts := append(disabledInformerOpts(mp.cfg.DisabledInformers),
meta.WithResyncPeriod(mp.cfg.ResyncPeriod),
meta.WithKubeConfigPath(mp.cfg.KubeConfigPath))
if informers, err = meta.InitInformers(ctx, opts...); err != nil {
done <- err
}
close(done)
}()

select {
case <-time.After(mp.cfg.SyncTimeout):
klog().Warn("kubernetes cache has not been synced after timeout. The kubernetes attributes might be incomplete."+
" Consider increasing the BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT value", "timeout", mp.cfg.SyncTimeout)
case err, ok := <-done:
if ok {
return nil, fmt.Errorf("failed to initialize Kubernetes informers: %w", err)
}
}
return informers, nil
opts := append(disabledInformerOpts(mp.cfg.DisabledInformers),
meta.WithResyncPeriod(mp.cfg.ResyncPeriod),
meta.WithKubeConfigPath(mp.cfg.KubeConfigPath),
// we don't want that the informer starts decorating spans and flows
// before getting all the existing K8s metadata
meta.WaitForCacheSync(),
meta.WithCacheSyncTimeout(mp.cfg.SyncTimeout),
)
return meta.InitInformers(ctx, opts...)
}

// initRemoteInformerCacheClient connects via gRPC/Protobuf to a remote beyla-k8s-cache service, to avoid that
Expand Down
191 changes: 119 additions & 72 deletions pkg/kubecache/envtest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
ctx context.Context
k8sClient client.Client
testEnv *envtest.Environment

kubeAPIIface kubernetes.Interface
)

const timeout = 10 * time.Second
Expand All @@ -58,7 +60,7 @@ func TestMain(m *testing.M) {
os.Exit(1)
}
config := k8sManager.GetConfig()
theClient, err := kubernetes.NewForConfig(config)
kubeAPIIface, err = kubernetes.NewForConfig(config)
if err != nil {
slog.Error("creating kube API client", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -93,7 +95,7 @@ func TestMain(m *testing.M) {
go func() {
if err := svc.Run(ctx,
meta.WithResyncPeriod(iConfig.InformerResyncPeriod),
meta.WithKubeClient(theClient),
meta.WithKubeClient(kubeAPIIface),
); err != nil {
slog.Error("running service", "error", err)
os.Exit(1)
Expand All @@ -104,11 +106,13 @@ func TestMain(m *testing.M) {
}

func TestAPIs(t *testing.T) {
svcClient := serviceClient{ID: "first-pod", Address: fmt.Sprintf("127.0.0.1:%d", freePort)}
// client
require.Eventually(t, func() bool {
return svcClient.Start(ctx) == nil
}, timeout, 100*time.Millisecond)
svcClient := serviceClient{
Address: fmt.Sprintf("127.0.0.1:%d", freePort),
Messages: make(chan *informer.Event, 10),
}
test.Eventually(t, timeout, func(t require.TestingT) {
svcClient.Start(ctx, t)
})

// wait for the service to have sent the initial snapshot of entities
// (at the end, will send the "SYNC_FINISHED" event)
Expand Down Expand Up @@ -145,23 +149,24 @@ func TestAPIs(t *testing.T) {
}

func TestBlockedClients(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

// a varied number of cache clients connect concurrently. Some of them are blocked
// after a while, and they don't release the connection
never1 := &countingStallingClient{stallAfterMessages: 1000000}
never2 := &countingStallingClient{stallAfterMessages: 1000000}
never3 := &countingStallingClient{stallAfterMessages: 1000000}
stall5 := &countingStallingClient{stallAfterMessages: 5}
stall10 := &countingStallingClient{stallAfterMessages: 10}
stall15 := &countingStallingClient{stallAfterMessages: 15}
go stall15.Start(ctx, t, freePort)
go never1.Start(ctx, t, freePort)
go stall5.Start(ctx, t, freePort)
go never2.Start(ctx, t, freePort)
go stall10.Start(ctx, t, freePort)
go never3.Start(ctx, t, freePort)
addr := fmt.Sprintf("127.0.0.1:%d", freePort)
never1 := &serviceClient{Address: addr, stallAfterMessages: 1000000}
never2 := &serviceClient{Address: addr, stallAfterMessages: 1000000}
never3 := &serviceClient{Address: addr, stallAfterMessages: 1000000}
stall5 := &serviceClient{Address: addr, stallAfterMessages: 5}
stall10 := &serviceClient{Address: addr, stallAfterMessages: 10}
stall15 := &serviceClient{Address: addr, stallAfterMessages: 15}
go stall15.Start(ctx, t)
go never1.Start(ctx, t)
go stall5.Start(ctx, t)
go never2.Start(ctx, t)
go stall10.Start(ctx, t)
go never3.Start(ctx, t)

// generating a large number of notifications until the gRPC buffer of the
// server-to-client connections is full, so the "Send" operation is blocked
Expand All @@ -187,9 +192,9 @@ func TestBlockedClients(t *testing.T) {
test.Eventually(t, timeout, func(t require.TestingT) {
// the clients that got stalled, just received the expected number of messages
// before they got blocked
require.EqualValues(t, 5, stall5.readMessages.Load())
require.EqualValues(t, 10, stall10.readMessages.Load())
require.EqualValues(t, 15, stall15.readMessages.Load())
require.EqualValues(t, int32(5), stall5.readMessages.Load())
require.EqualValues(t, int32(10), stall10.readMessages.Load())
require.EqualValues(t, int32(15), stall15.readMessages.Load())

// but that did not block the rest of clients, which got all the expected messages
require.GreaterOrEqual(t, never1.readMessages.Load(), int32(createdPods))
Expand All @@ -203,6 +208,67 @@ func TestBlockedClients(t *testing.T) {
ReadChannel(t, allSent, timeout)
}

// makes sure that a new cache server won't forward the sync data to the clients until
// it effectively has synced everything
func TestAsynchronousStartup(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

// generating some contents to force a new Beyla Cache service to take a while
// to synchronize during initialization
const createdPods = 20
for n := 0; n < createdPods; n++ {
require.NoError(t, k8sClient.Create(ctx, &corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("async-pod-%02d", n),
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "test-container", Image: "nginx"},
},
},
}))
}

// creating a new Beyla cache service instance that will start synchronizing with
// the previously generated amount of data (also from previous tests)
newFreePort, err := test.FreeTCPPort()
require.NoError(t, err)

// create few clients that start trying to connect and sync
// even before the new cache service starts
addr := fmt.Sprintf("127.0.0.1:%d", newFreePort)
cl1 := serviceClient{Address: addr}
cl2 := serviceClient{Address: addr}
cl3 := serviceClient{Address: addr}
// passing the test-wide testing.T instance in case clients fail asynchronously, after eventually succeeded
go func() { test.Eventually(t, timeout, func(_ require.TestingT) { cl1.Start(ctx, t) }) }()
go func() { test.Eventually(t, timeout, func(_ require.TestingT) { cl2.Start(ctx, t) }) }()
go func() { test.Eventually(t, timeout, func(_ require.TestingT) { cl3.Start(ctx, t) }) }()

iConfig := kubecache.DefaultConfig
iConfig.Port = newFreePort
svc := service.InformersCache{Config: &iConfig, SendTimeout: time.Second}
go func() {
require.NoError(t, svc.Run(ctx,
meta.WithResyncPeriod(iConfig.InformerResyncPeriod),
meta.WithKubeClient(kubeAPIIface),
))
}()

// The clients should have received the Sync complete signal even if they
// connected to the cache service before it was fully synchronized
test.Eventually(t, timeout, func(t require.TestingT) {
require.NotZero(t, cl1.syncSignalOnMessage.Load())
require.NotZero(t, cl2.syncSignalOnMessage.Load())
require.NotZero(t, cl3.syncSignalOnMessage.Load())
})
assert.LessOrEqual(t, int32(createdPods), cl1.syncSignalOnMessage.Load())
assert.LessOrEqual(t, int32(createdPods), cl2.syncSignalOnMessage.Load())
assert.LessOrEqual(t, int32(createdPods), cl3.syncSignalOnMessage.Load())
}

func ReadChannel[T any](t require.TestingT, inCh <-chan T, timeout time.Duration) T {
var item T
select {
Expand All @@ -216,76 +282,57 @@ func ReadChannel[T any](t require.TestingT, inCh <-chan T, timeout time.Duration
}

type serviceClient struct {
ID string
Address string
// Address of the cache service
Address string
// Messages to be forwarded on read. If nil, the client won't forward anything
Messages chan *informer.Event
// counter of read messages
readMessages atomic.Int32
// if != 0, the client will be blocked when the count of read messages reach stallAfterMessages
stallAfterMessages int32
// stores at which message number the signal is synced
syncSignalOnMessage atomic.Int32
}

func (sc *serviceClient) Start(ctx context.Context) error {
sc.Messages = make(chan *informer.Event, 10)

func (sc *serviceClient) Start(ctx context.Context, t require.TestingT) {
conn, err := grpc.NewClient(sc.Address,
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("can't connect client: %w", err)
}
require.NoError(t, err)

eventsClient := informer.NewEventStreamServiceClient(conn)

// Subscribe to the event stream.
stream, err := eventsClient.Subscribe(ctx, &informer.SubscribeMessage{})
if err != nil {
return fmt.Errorf("subscribing: %w", err)
}
require.NoError(t, err)

// Receive and print messages.
go func() {
defer conn.Close()
for {
if sc.stallAfterMessages != 0 && sc.stallAfterMessages == sc.readMessages.Load() {
// just block without doing any connection activity
// nor closing/releasing the connection
<-stream.Context().Done()
return
}
event, err := stream.Recv()
if err != nil {
slog.Error("receiving message at client side", "error", err)
break
}
sc.Messages <- event
sc.readMessages.Add(1)
if sc.Messages != nil {
sc.Messages <- event
}
if event.Type == informer.EventType_SYNC_FINISHED {
if sc.syncSignalOnMessage.Load() != 0 {
t.Errorf("client %s: can't receive two signal sync messages! (received at %d and %d)",
conn.GetState().String(), sc.syncSignalOnMessage.Load(), sc.readMessages.Load())
t.FailNow()
}
sc.syncSignalOnMessage.Store(sc.readMessages.Load())
}
}

}()
return nil
}

// a fake client that counts the received messages and gets blocked (without closing the connection)
// after a defined number of messages
type countingStallingClient struct {
readMessages atomic.Int32
stallAfterMessages int32
}

func (csc *countingStallingClient) Start(ctx context.Context, t *testing.T, port int) {
// Set up a connection to the server.
address := fmt.Sprintf("127.0.0.1:%d", port)
conn, err := grpc.NewClient(address,
grpc.WithTransportCredentials(insecure.NewCredentials()))
// nolint:staticcheck
defer conn.Close()
require.NoError(t, err)
client := informer.NewEventStreamServiceClient(conn)

// Subscribe to the event stream.
stream, err := client.Subscribe(ctx, &informer.SubscribeMessage{})
require.NoError(t, err)

// Receive messages
for {
if csc.stallAfterMessages == csc.readMessages.Load() {
// just block without doing any connection activity
// nor closing/releasing the connection
<-stream.Context().Done()
return
}
if _, err := stream.Recv(); err == nil {
csc.readMessages.Add(1)
}
// discarding event
}
}
33 changes: 24 additions & 9 deletions pkg/kubecache/meta/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Informers struct {
pods cache.SharedIndexInformer
nodes cache.SharedIndexInformer
services cache.SharedIndexInformer

waitForSync chan struct{}
}

func (inf *Informers) Subscribe(observer Observer) {
Expand Down Expand Up @@ -57,13 +59,26 @@ func (inf *Informers) Subscribe(observer Observer) {
}
}
}
// notify the end of synchronization, so the client knows that already has a snapshot
// of all the existing resources
if err := observer.On(&informer.Event{
Type: informer.EventType_SYNC_FINISHED,
}); err != nil {
inf.log.Debug("error notifying observer. Unsubscribing", "observerID", observer.ID(), "error", err)
inf.BaseNotifier.Unsubscribe(observer)
return
}

// until the informer waitForSync, we won't send the sync_finished event to remote beyla clients
// TODO: in some very slowed-down environments (e.g. tests with -race conditions), this last message might
// be sent and executed before the rest of previous updates have been processed and submitted.
// In production, it might mean that few initialization updates are sent right before the "sync_finished" signal.
// To fix that we should rearchitecture this to not directly invoking the notifications but enqueuing them
// in a synchronized list.
// Given the amount of work and complexity, we can afford this small delay, as the data eventually
// reaches the client right after the sync_finished signal.
go func() {
<-inf.waitForSync

// notify the end of synchronization, so the client knows that already has a snapshot
// of all the existing resources
if err := observer.On(&informer.Event{
Type: informer.EventType_SYNC_FINISHED,
}); err != nil {
inf.log.Debug("error notifying observer. Unsubscribing", "observerID", observer.ID(), "error", err)
inf.BaseNotifier.Unsubscribe(observer)
return
}
}()
}
Loading

0 comments on commit 3a57830

Please sign in to comment.