Skip to content

Commit

Permalink
create resource pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
mohammadVatandoost committed Oct 25, 2023
1 parent c93d2bf commit dadd833
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 13 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# XDS Control Plane
A proxy-less service mesh for grpc services in kubernetes.

### Running in K8S

Use argoCD yaml files or Helm charts to deploy on K8s
Expand Down
32 changes: 32 additions & 0 deletions internal/app/control-plane/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,50 @@ package controlplane
import (
"log/slog"

"github.com/mohammadVatandoost/xds-conrol-plane/internal/resource"
v1 "k8s.io/api/core/v1"
)

func (a *App) OnAddSerivce(key string, serviceObj *v1.Service) {
a.muResource.Lock()
defer a.muResource.Unlock()
slog.Info("OnAddSerivce", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels)
resourceInstance, ok := a.resources[key]
if !ok {
resourceInstance = resource.NewResource(serviceObj.Name, serviceObj.APIVersion, "", "service", key)
}
resourceInstance.Name = serviceObj.Name
resourceInstance.Version = serviceObj.APIVersion
a.resources[key] = resourceInstance
}

func (a *App) OnDeleteService(key string, serviceObj *v1.Service) {
a.muResource.Lock()
defer a.muResource.Unlock()
slog.Info("OnDeleteService", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels)

_, ok := a.resources[key]
if !ok {
slog.Error("OnDeleteService resource doesn't exist in DB", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels)
return
}
delete(a.resources, key)

}

func (a *App) OnUpdateService(newKey string, newServiceObj *v1.Service, oldKey string, oldServiceObj *v1.Service) {
slog.Info("OnUpdateService", "newKey", newKey, "newServiceName", newServiceObj.Name, "newServiceNamespace", newServiceObj.Namespace,
"oldKey", oldKey, "oleServiceName", oldServiceObj.Name, "oldServiceNamespace", oldServiceObj.Namespace)
a.muResource.Lock()
defer a.muResource.Unlock()
resourceInstance, ok := a.resources[oldKey]
if !ok {
slog.Error("OnUpdateService resource doesn't exist in DB", "key", oldKey, "name", oldServiceObj.Name,
"Namespace", oldServiceObj.Namespace, "Labels", oldServiceObj.Labels)
resourceInstance = resource.NewResource(newServiceObj.Name, newServiceObj.APIVersion, "", "service", newKey)
}
delete(a.resources, oldKey)
resourceInstance.Name = newServiceObj.Name
resourceInstance.Version = newServiceObj.APIVersion
a.resources[newKey] = resourceInstance
}
9 changes: 7 additions & 2 deletions internal/app/control-plane/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@ import (
"sync"

"github.com/mohammadVatandoost/xds-conrol-plane/internal/node"
"github.com/mohammadVatandoost/xds-conrol-plane/internal/resource"
"github.com/mohammadVatandoost/xds-conrol-plane/pkg/config/app/controlplane"
)

type App struct {
conf *controlplane.ControlPlaneConfig
nodes map[string]*node.Node
mu sync.RWMutex
resources map[string]map[string]struct{} // A resource is watched by which nodes
resources map[string]*resource.Resource
muResource sync.RWMutex
}


func NewApp(conf *controlplane.ControlPlaneConfig) *App {
return &App{
conf: conf,
conf: conf,
nodes: make(map[string]*node.Node),
mu: sync.RWMutex{},
resources: make(map[string]*resource.Resource),
muResource: sync.RWMutex{},
}
}
17 changes: 10 additions & 7 deletions internal/app/control-plane/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package controlplane

import (
"fmt"
"log/slog"

"github.com/mohammadVatandoost/xds-conrol-plane/internal/node"
"github.com/mohammadVatandoost/xds-conrol-plane/internal/resource"
)

func (a *App) GetServices() {
Expand Down Expand Up @@ -41,26 +43,27 @@ func (a *App) DeleteNode(id string) error {
return nil
}

func (a *App) AddResourceWatchToNode(id string, resource string) {
func (a *App) AddResourceWatchToNode(id string, resourceName string, typeURL string) {
a.muResource.Lock()
defer a.muResource.Unlock()
nodes, ok := a.resources[resource]
resourceInstance, ok := a.resources[resourceName]
if !ok {
nodes = make(map[string]struct{})
a.resources[resource] = nodes
slog.Info("AddResourceWatchToNode, resource does not exist in the DB, creating", "name", resourceName, "nodeID", id, "typeURL", typeURL)
resourceInstance = resource.NewResource(resourceName, "1", "", typeURL, resourceName)
a.resources[resourceName] = resourceInstance
}
nodes[id] = struct{}{}
resourceInstance.Watchers[id] = struct{}{}
}

func (a *App) GetNodesWatchTheResource(resource string) []string {
a.muResource.RLock()
defer a.muResource.RUnlock()
nodesArray := make([]string, 0)
nodes, ok := a.resources[resource]
resourceInstance, ok := a.resources[resource]
if !ok {
return nodesArray
}
for n := range nodes {
for n := range resourceInstance.Watchers {
nodesArray = append(nodesArray, n)
}
return nodesArray
Expand Down
4 changes: 2 additions & 2 deletions internal/app/control-plane/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package controlplane
import "log/slog"


func (a *App) NewStreamRequest(id string, resourceNames []string) {
func (a *App) NewStreamRequest(id string, resourceNames []string, typeURL string) {
node := a.CreateNode(id)
for _, rn := range resourceNames {
node.AddWatcher(rn)
a.AddResourceWatchToNode(id, rn)
a.AddResourceWatchToNode(id, rn, typeURL)
}
}

Expand Down
21 changes: 21 additions & 0 deletions internal/resource/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package resource

type Resource struct {
Name string
Version string
Path string
Type string
Key string
Watchers map[string]struct{}
}

func NewResource(name, version, path, resourceType, key string) *Resource {
return &Resource{
Name: name,
Version: version,
Path: path,
Type: resourceType,
Key: key,
Watchers: make(map[string]struct{}),
}
}
2 changes: 1 addition & 1 deletion internal/xds/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package xds


type App interface {
NewStreamRequest(id string, resourceNames []string)
NewStreamRequest(id string, resourceNames []string, typeURL string)
StreamClosed(id string)
}
2 changes: 1 addition & 1 deletion internal/xds/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (cp *ControlPlane) OnStreamRequest(id int64, r *discovery.DiscoveryRequest)
return nil
}
slog.Info("OnStreamRequest", "id", id, "request", r.TypeUrl, "ResourceNames", r.ResourceNames)
cp.app.NewStreamRequest(r.Node.Id, r.ResourceNames)
cp.app.NewStreamRequest(r.Node.Id, r.ResourceNames, r.TypeUrl)

node := cp.CreateNode(r.Node.Id)
for _, rn := range r.ResourceNames {
Expand Down

0 comments on commit dadd833

Please sign in to comment.