Skip to content

Commit

Permalink
use label to select services (#19)
Browse files Browse the repository at this point in the history
* use label to select services

* change key

* use portname to find grpc port
  • Loading branch information
mohammadVatandoost authored Nov 8, 2023
1 parent 35272f4 commit a09bab4
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ podAnnotations: {}
service:
labels:
monitoring-app: xds-apps
xds: active
type: clusterIP
headless: true
ports:
Expand All @@ -54,7 +53,7 @@ configMap:
LOGGER_LEVEL: "info"
GRPC_GO_LOG_VERBOSITY_LEVEL: 99
GRPC_GO_LOG_SEVERITY_LEVEL: "info"
Server1Address: "xds:///xds-grpc-server-example-headless.control-plane-example:8888"
Server1Address: "xds:///xds-grpc-server-example-headless.control-plane-example.svc.cluster.local:8888"
GRPC_XDS_BOOTSTRAP: "./xds_bootstrap.json"
# GRPC_XDS_BOOTSTRAP_CONFIG: "{\n \"xds_servers\": [\n {\n \"server_uri\": \"xds-control-plane-headless.xds-control-plane.svc.cluster.local:8888\",\n \"channel_creds\": [\n {\n \"type\": \"insecure\"\n }\n ],\n \"server_features\": [\"xds_v3\"] \n }\n ],\n \"node\": {\n \"id\": \"b7f9c818-fb46-43ca-8662-d3bdbcf7ec18~10.0.0.1\",\n \"metadata\": {\n \"R_GCP_PROJECT_NUMBER\": \"123456789012\"\n },\n \"locality\": {\n \"zone\": \"us-central1-a\"\n }\n }\n}\n"

2 changes: 1 addition & 1 deletion example/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var (
func main() {

// viper.SetDefault("Server1Address", "xds:///xds-grpc-server-example-headless:8888")
viper.SetDefault("Server1Address", "xds:///xds-grpc-server-example-headless.control-plane-example")
viper.SetDefault("Server1Address", "xds:///xds-grpc-server-example-headless.control-plane-example.svc.cluster.local:8888")
// viper.SetDefault("Server1Address", "xds-grpc-server-example-headless:8888")
// Read Config from ENV
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ podAnnotations: {}
service:
labels:
monitoring-app: xds-apps
xds: active
xds/portName: grpc
type: clusterIP
headless: true
ports:
Expand Down
53 changes: 24 additions & 29 deletions internal/app/control-plane/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,45 @@ import (
"log/slog"

"github.com/mohammadVatandoost/xds-conrol-plane/internal/resource"
v1 "k8s.io/api/core/v1"
)

func (a *App) OnAddSerivce(key string, serviceObj *v1.Service) {
func (a *App) OnAddSerivce(res *resource.Resource) {
a.muResource.Lock()
defer a.muResource.Unlock()
slog.Info("OnAddSerivce", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels)
resourceInstance, ok := a.resources[key]
if !ok {
resourceInstance = resource.NewResource(serviceObj.Name, serviceObj.APIVersion, "", "service", key, serviceObj)
}
resourceInstance.Name = serviceObj.Name
resourceInstance.Version = serviceObj.APIVersion
a.resources[key] = resourceInstance
slog.Info("OnAddSerivce", "key", res.Key, "name", res.ServiceObj.Name,
"Namespace", res.ServiceObj.Namespace, "Labels", res.ServiceObj.Labels)
a.resources[res.Key] = res
}

func (a *App) OnDeleteService(key string, serviceObj *v1.Service) {
// func (a *App) OnDeleteService(key string, serviceObj *v1.Service) {
// a.muResource.Lock()
// defer a.muResource.Unlock()
// slog.Info("OnDeleteService", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels)

// _, ok := a.resources[key]
// if !ok {
// slog.Error("OnDeleteService resource doesn't exist in DB", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels)
// return
// }
// delete(a.resources, key)
// }

func (a *App) DeleteService(key string) {
a.muResource.Lock()
defer a.muResource.Unlock()
slog.Info("OnDeleteService", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels)

_, ok := a.resources[key]
if !ok {
slog.Error("OnDeleteService resource doesn't exist in DB", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels)
return
}
delete(a.resources, key)

}

func (a *App) OnUpdateService(newKey string, newServiceObj *v1.Service, oldKey string, oldServiceObj *v1.Service) {
func (a *App) OnUpdateService(newRes *resource.Resource, oldRes *resource.Resource) {
// slog.Info("OnUpdateService", "newKey", newKey, "newServiceName", newServiceObj.Name, "newServiceNamespace", newServiceObj.Namespace,
// "oldKey", oldKey, "oleServiceName", oldServiceObj.Name, "oldServiceNamespace", oldServiceObj.Namespace)
a.muResource.Lock()
defer a.muResource.Unlock()
resourceInstance, ok := a.resources[oldKey]
_, ok := a.resources[oldRes.Key]
if !ok {
slog.Error("OnUpdateService resource doesn't exist in DB", "key", oldKey, "name", oldServiceObj.Name,
"Namespace", oldServiceObj.Namespace, "Labels", oldServiceObj.Labels)
resourceInstance = resource.NewResource(newServiceObj.Name, newServiceObj.APIVersion, "", "service", newKey, newServiceObj)
slog.Error("OnUpdateService resource doesn't exist in DB", "key", oldRes.Key, "name", oldRes.ServiceObj.Name,
"Namespace", oldRes.ServiceObj.Namespace, "Labels", oldRes.ServiceObj.Labels)
}
delete(a.resources, oldKey)
resourceInstance.Name = newServiceObj.Name
resourceInstance.Version = newServiceObj.APIVersion
resourceInstance.ServiceObj = newServiceObj
a.resources[newKey] = resourceInstance
delete(a.resources, oldRes.Key)
a.resources[newRes.Key] = newRes
}
2 changes: 1 addition & 1 deletion internal/app/control-plane/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (a *App) AddResourceWatchToNode(id string, resourceName string, typeURL str
resourceInstance, ok := a.resources[resourceName]
if !ok {
slog.Info("AddResourceWatchToNode, resource does not exist in the DB, creating", "name", resourceName, "nodeID", id, "typeURL", typeURL)
resourceInstance = resource.NewResource(resourceName, "1", "", typeURL, resourceName, nil)
resourceInstance = resource.NewResource(resourceName, "1", "", typeURL, resourceName, "", nil)
a.resources[resourceName] = resourceInstance
}
resourceInstance.Watchers[id] = struct{}{}
Expand Down
4 changes: 3 additions & 1 deletion internal/app/control-plane/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (a *App) UpdateNodeCache(nodeID string) {
node, ok := a.nodes[nodeID]
if !ok {
slog.Error("UpdateNodeCache, node doesn't exist", "nodeID", nodeID)
return
}
resources := node.GetWatchings()
node.ClearResources()
Expand All @@ -49,8 +50,9 @@ func (a *App) UpdateNodeCache(nodeID string) {
slog.Error("UpdateCache, resource doesn't exist", "resource", rn, "nodeID", nodeID)
continue
}
// resource.ServiceObj.Spec.Ports[0].Name
//ToDo: later fix loop through each port name
endPoint, cluster, route, listner, err := xds.MakeXDSResource(resource, a.conf.Region, a.conf.Zone, resource.ServiceObj.Spec.Ports[0].Name)
endPoint, cluster, route, listner, err := xds.MakeXDSResource(resource, a.conf.Region, a.conf.Zone)
if err != nil {
slog.Error("UpdateCache, failed to Make XDS Resource", "error", err, "resource", rn, "nodeID", nodeID)
continue
Expand Down
102 changes: 89 additions & 13 deletions internal/informer/service.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
package informer

import (
"fmt"
"log/slog"

"github.com/mohammadVatandoost/xds-conrol-plane/internal/resource"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

const PortNameLabel = "xds/portName"

type ServiceInformer struct {
cache cache.SharedIndexInformer
handler ServiceEventHandler
}

type ServiceEventHandler interface {
OnAddSerivce(key string, serviceObj *v1.Service)
OnDeleteService(key string, serviceObj *v1.Service)
OnUpdateService(newKey string, newServiceObj *v1.Service, oldKey string, oldServiceObj *v1.Service)
OnAddSerivce(res *resource.Resource)
OnUpdateService(newRes *resource.Resource, oldRes *resource.Resource)
DeleteService(key string)
}

func NewServiceInformer(factory informers.SharedInformerFactory, handler ServiceEventHandler) *ServiceInformer {
Expand All @@ -32,8 +36,25 @@ func NewServiceInformer(factory informers.SharedInformerFactory, handler Service
return si
}

func getServiceKey(service *v1.Service) string {
return service.Name + "." + service.Namespace
// ToDo: later use array of ports
func isXDSService(service *v1.Service) (string, bool) {
for k, value := range service.Labels {
if k == PortNameLabel {
return value, true
}
}
return "", false
}

func getServiceKey(service *v1.Service, portName string) (string, error) {
for _, port := range service.Spec.Ports {
if port.Name == portName {
return fmt.Sprintf("%s.%s.svc.cluster.local:%d",
service.Name, service.Namespace, port.Port), nil
}
}
return "", fmt.Errorf("couldn't find the port name, portName: %s, serviceName: %s, namespace: %s",
portName, service.Name, service.Namespace)
}

func (si *ServiceInformer) Run(stopCh <-chan struct{}) {
Expand All @@ -46,9 +67,17 @@ func (si *ServiceInformer) OnAdd(obj interface{}) {
slog.Error("type of object is not service ", "obj", obj, "method", "OnAdd")
return
}

key := getServiceKey(service)
si.handler.OnAddSerivce(key, service)
portName, ok := isXDSService(service)
if !ok {
return
}
key, err := getServiceKey(service, portName)
if err != nil {
slog.Error("couldn't get service key ", "err", err)
return
}
res := resource.NewResource(service.Name, service.APIVersion, "", "service", key, portName, service)
si.handler.OnAddSerivce(res)
}

func (si *ServiceInformer) OnUpdate(oldObj, newObj interface{}) {
Expand All @@ -62,9 +91,47 @@ func (si *ServiceInformer) OnUpdate(oldObj, newObj interface{}) {
slog.Error("type of object is not service ", "obj", oldObj, "method", "OnUpdate")
return
}
newKey := getServiceKey(newService)
oldKey := getServiceKey(oldService)
si.handler.OnUpdateService(newKey, newService, oldKey, oldService)

portNameOld, ok := isXDSService(oldService)
if !ok {
portNameNew, ok := isXDSService(newService)
if ok {
key, err := getServiceKey(newService, portNameNew)
if err != nil {
slog.Error("couldn't get service key ", "err", err)
return
}
res := resource.NewResource(newService.Name, newService.APIVersion, "", "service", key, portNameNew, newService)
si.handler.OnAddSerivce(res)
}
return
}

portNameNew, ok := isXDSService(newService)
if !ok {
oldKey, err := getServiceKey(oldService, portNameOld)
if err != nil {
slog.Error("couldn't get service key ", "err", err)
return
}
si.handler.DeleteService(oldKey)
return
}

oldKey, err := getServiceKey(oldService, portNameOld)
if err != nil {
slog.Error("couldn't get oldService key ", "err", err)
return
}

newKey, err := getServiceKey(newService, portNameNew)
if err != nil {
slog.Error("couldn't get newService key ", "err", err)
return
}
newRes := resource.NewResource(newService.Name, newService.APIVersion, "", "service", newKey, portNameNew, newService)
oldRes := resource.NewResource(oldService.Name, oldService.APIVersion, "", "service", oldKey, portNameOld, oldService)
si.handler.OnUpdateService(newRes, oldRes)
}

func (si *ServiceInformer) OnDelete(obj interface{}) {
Expand All @@ -74,6 +141,15 @@ func (si *ServiceInformer) OnDelete(obj interface{}) {
return
}

key := getServiceKey(service)
si.handler.OnDeleteService(key, service)
portName, ok := isXDSService(service)
if !ok {
return
}
key, err := getServiceKey(service, portName)
if err != nil {
slog.Error("couldn't get service key ", "err", err)
return
}

si.handler.DeleteService(key)
}
4 changes: 3 additions & 1 deletion internal/resource/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ type Resource struct {
NameSpace string `json:"namespace"`
K8SKind string `json:"kind"`
EnvoyTypeURL string `json:"envoyTypeURL"`
PortName string `json:"portName"`
Key string `json:"key"` //key is name.namespace:portnumber
Watchers map[string]struct{} `json:"watchers"`
ServiceObj *v1.Service `json:"serviceOBJ"` // we only support Service, later we can add Ingress
}

func NewResource(name, version, nameSpace, resourceType, key string, serviceObj *v1.Service) *Resource {
func NewResource(name, version, nameSpace, resourceType, key, portName string, serviceObj *v1.Service) *Resource {
return &Resource{
Name: name,
Version: version,
Expand All @@ -22,5 +23,6 @@ func NewResource(name, version, nameSpace, resourceType, key string, serviceObj
Key: key,
ServiceObj: serviceObj,
Watchers: make(map[string]struct{}),
PortName: portName,
}
}
Loading

0 comments on commit a09bab4

Please sign in to comment.