diff --git a/example/client/deployments/helm/xds-grpc-client-example/values.yaml b/example/client/deployments/helm/xds-grpc-client-example/values.yaml index 6494b57..eaa86e8 100644 --- a/example/client/deployments/helm/xds-grpc-client-example/values.yaml +++ b/example/client/deployments/helm/xds-grpc-client-example/values.yaml @@ -30,7 +30,6 @@ podAnnotations: {} service: labels: monitoring-app: xds-apps - xds: active type: clusterIP headless: true ports: @@ -54,7 +53,7 @@ configMap: LOGGER_LEVEL: "info" GRPC_GO_LOG_VERBOSITY_LEVEL: 99 GRPC_GO_LOG_SEVERITY_LEVEL: "info" - Server1Address: "xds:///xds-grpc-server-example-headless.control-plane-example:8888" + Server1Address: "xds:///xds-grpc-server-example-headless.control-plane-example.svc.cluster.local:8888" GRPC_XDS_BOOTSTRAP: "./xds_bootstrap.json" # GRPC_XDS_BOOTSTRAP_CONFIG: "{\n \"xds_servers\": [\n {\n \"server_uri\": \"xds-control-plane-headless.xds-control-plane.svc.cluster.local:8888\",\n \"channel_creds\": [\n {\n \"type\": \"insecure\"\n }\n ],\n \"server_features\": [\"xds_v3\"] \n }\n ],\n \"node\": {\n \"id\": \"b7f9c818-fb46-43ca-8662-d3bdbcf7ec18~10.0.0.1\",\n \"metadata\": {\n \"R_GCP_PROJECT_NUMBER\": \"123456789012\"\n },\n \"locality\": {\n \"zone\": \"us-central1-a\"\n }\n }\n}\n" diff --git a/example/client/main.go b/example/client/main.go index 584db61..a0a4359 100644 --- a/example/client/main.go +++ b/example/client/main.go @@ -43,7 +43,7 @@ var ( func main() { // viper.SetDefault("Server1Address", "xds:///xds-grpc-server-example-headless:8888") - viper.SetDefault("Server1Address", "xds:///xds-grpc-server-example-headless.control-plane-example") + viper.SetDefault("Server1Address", "xds:///xds-grpc-server-example-headless.control-plane-example.svc.cluster.local:8888") // viper.SetDefault("Server1Address", "xds-grpc-server-example-headless:8888") // Read Config from ENV viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) diff --git a/example/server/deployments/helm/xds-grpc-server-example/values.yaml b/example/server/deployments/helm/xds-grpc-server-example/values.yaml index 4baccc3..306c076 100644 --- a/example/server/deployments/helm/xds-grpc-server-example/values.yaml +++ b/example/server/deployments/helm/xds-grpc-server-example/values.yaml @@ -28,7 +28,7 @@ podAnnotations: {} service: labels: monitoring-app: xds-apps - xds: active + xds/portName: grpc type: clusterIP headless: true ports: diff --git a/internal/app/control-plane/informer.go b/internal/app/control-plane/informer.go index 385716d..da32432 100644 --- a/internal/app/control-plane/informer.go +++ b/internal/app/control-plane/informer.go @@ -4,50 +4,45 @@ import ( "log/slog" "github.com/mohammadVatandoost/xds-conrol-plane/internal/resource" - v1 "k8s.io/api/core/v1" ) -func (a *App) OnAddSerivce(key string, serviceObj *v1.Service) { +func (a *App) OnAddSerivce(res *resource.Resource) { a.muResource.Lock() defer a.muResource.Unlock() - slog.Info("OnAddSerivce", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels) - resourceInstance, ok := a.resources[key] - if !ok { - resourceInstance = resource.NewResource(serviceObj.Name, serviceObj.APIVersion, "", "service", key, serviceObj) - } - resourceInstance.Name = serviceObj.Name - resourceInstance.Version = serviceObj.APIVersion - a.resources[key] = resourceInstance + slog.Info("OnAddSerivce", "key", res.Key, "name", res.ServiceObj.Name, + "Namespace", res.ServiceObj.Namespace, "Labels", res.ServiceObj.Labels) + a.resources[res.Key] = res } -func (a *App) OnDeleteService(key string, serviceObj *v1.Service) { +// func (a *App) OnDeleteService(key string, serviceObj *v1.Service) { +// a.muResource.Lock() +// defer a.muResource.Unlock() +// slog.Info("OnDeleteService", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels) + +// _, ok := a.resources[key] +// if !ok { +// slog.Error("OnDeleteService resource doesn't exist in DB", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels) +// return +// } +// delete(a.resources, key) +// } + +func (a *App) DeleteService(key string) { a.muResource.Lock() defer a.muResource.Unlock() - slog.Info("OnDeleteService", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels) - - _, ok := a.resources[key] - if !ok { - slog.Error("OnDeleteService resource doesn't exist in DB", "key", key, "name", serviceObj.Name, "Namespace", serviceObj.Namespace, "Labels", serviceObj.Labels) - return - } delete(a.resources, key) - } -func (a *App) OnUpdateService(newKey string, newServiceObj *v1.Service, oldKey string, oldServiceObj *v1.Service) { +func (a *App) OnUpdateService(newRes *resource.Resource, oldRes *resource.Resource) { // slog.Info("OnUpdateService", "newKey", newKey, "newServiceName", newServiceObj.Name, "newServiceNamespace", newServiceObj.Namespace, // "oldKey", oldKey, "oleServiceName", oldServiceObj.Name, "oldServiceNamespace", oldServiceObj.Namespace) a.muResource.Lock() defer a.muResource.Unlock() - resourceInstance, ok := a.resources[oldKey] + _, ok := a.resources[oldRes.Key] if !ok { - slog.Error("OnUpdateService resource doesn't exist in DB", "key", oldKey, "name", oldServiceObj.Name, - "Namespace", oldServiceObj.Namespace, "Labels", oldServiceObj.Labels) - resourceInstance = resource.NewResource(newServiceObj.Name, newServiceObj.APIVersion, "", "service", newKey, newServiceObj) + slog.Error("OnUpdateService resource doesn't exist in DB", "key", oldRes.Key, "name", oldRes.ServiceObj.Name, + "Namespace", oldRes.ServiceObj.Namespace, "Labels", oldRes.ServiceObj.Labels) } - delete(a.resources, oldKey) - resourceInstance.Name = newServiceObj.Name - resourceInstance.Version = newServiceObj.APIVersion - resourceInstance.ServiceObj = newServiceObj - a.resources[newKey] = resourceInstance + delete(a.resources, oldRes.Key) + a.resources[newRes.Key] = newRes } diff --git a/internal/app/control-plane/resource.go b/internal/app/control-plane/resource.go index bb351dc..a809339 100644 --- a/internal/app/control-plane/resource.go +++ b/internal/app/control-plane/resource.go @@ -46,7 +46,7 @@ func (a *App) AddResourceWatchToNode(id string, resourceName string, typeURL str resourceInstance, ok := a.resources[resourceName] if !ok { slog.Info("AddResourceWatchToNode, resource does not exist in the DB, creating", "name", resourceName, "nodeID", id, "typeURL", typeURL) - resourceInstance = resource.NewResource(resourceName, "1", "", typeURL, resourceName, nil) + resourceInstance = resource.NewResource(resourceName, "1", "", typeURL, resourceName, "", nil) a.resources[resourceName] = resourceInstance } resourceInstance.Watchers[id] = struct{}{} diff --git a/internal/app/control-plane/xds.go b/internal/app/control-plane/xds.go index 6c09a4a..47cb75c 100644 --- a/internal/app/control-plane/xds.go +++ b/internal/app/control-plane/xds.go @@ -39,6 +39,7 @@ func (a *App) UpdateNodeCache(nodeID string) { node, ok := a.nodes[nodeID] if !ok { slog.Error("UpdateNodeCache, node doesn't exist", "nodeID", nodeID) + return } resources := node.GetWatchings() node.ClearResources() @@ -49,8 +50,9 @@ func (a *App) UpdateNodeCache(nodeID string) { slog.Error("UpdateCache, resource doesn't exist", "resource", rn, "nodeID", nodeID) continue } + // resource.ServiceObj.Spec.Ports[0].Name //ToDo: later fix loop through each port name - endPoint, cluster, route, listner, err := xds.MakeXDSResource(resource, a.conf.Region, a.conf.Zone, resource.ServiceObj.Spec.Ports[0].Name) + endPoint, cluster, route, listner, err := xds.MakeXDSResource(resource, a.conf.Region, a.conf.Zone) if err != nil { slog.Error("UpdateCache, failed to Make XDS Resource", "error", err, "resource", rn, "nodeID", nodeID) continue diff --git a/internal/informer/service.go b/internal/informer/service.go index 2f4cf9d..a646c9b 100644 --- a/internal/informer/service.go +++ b/internal/informer/service.go @@ -1,22 +1,26 @@ package informer import ( + "fmt" "log/slog" + "github.com/mohammadVatandoost/xds-conrol-plane/internal/resource" v1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" ) +const PortNameLabel = "xds/portName" + type ServiceInformer struct { cache cache.SharedIndexInformer handler ServiceEventHandler } type ServiceEventHandler interface { - OnAddSerivce(key string, serviceObj *v1.Service) - OnDeleteService(key string, serviceObj *v1.Service) - OnUpdateService(newKey string, newServiceObj *v1.Service, oldKey string, oldServiceObj *v1.Service) + OnAddSerivce(res *resource.Resource) + OnUpdateService(newRes *resource.Resource, oldRes *resource.Resource) + DeleteService(key string) } func NewServiceInformer(factory informers.SharedInformerFactory, handler ServiceEventHandler) *ServiceInformer { @@ -32,8 +36,25 @@ func NewServiceInformer(factory informers.SharedInformerFactory, handler Service return si } -func getServiceKey(service *v1.Service) string { - return service.Name + "." + service.Namespace +// ToDo: later use array of ports +func isXDSService(service *v1.Service) (string, bool) { + for k, value := range service.Labels { + if k == PortNameLabel { + return value, true + } + } + return "", false +} + +func getServiceKey(service *v1.Service, portName string) (string, error) { + for _, port := range service.Spec.Ports { + if port.Name == portName { + return fmt.Sprintf("%s.%s.svc.cluster.local:%d", + service.Name, service.Namespace, port.Port), nil + } + } + return "", fmt.Errorf("couldn't find the port name, portName: %s, serviceName: %s, namespace: %s", + portName, service.Name, service.Namespace) } func (si *ServiceInformer) Run(stopCh <-chan struct{}) { @@ -46,9 +67,17 @@ func (si *ServiceInformer) OnAdd(obj interface{}) { slog.Error("type of object is not service ", "obj", obj, "method", "OnAdd") return } - - key := getServiceKey(service) - si.handler.OnAddSerivce(key, service) + portName, ok := isXDSService(service) + if !ok { + return + } + key, err := getServiceKey(service, portName) + if err != nil { + slog.Error("couldn't get service key ", "err", err) + return + } + res := resource.NewResource(service.Name, service.APIVersion, "", "service", key, portName, service) + si.handler.OnAddSerivce(res) } func (si *ServiceInformer) OnUpdate(oldObj, newObj interface{}) { @@ -62,9 +91,47 @@ func (si *ServiceInformer) OnUpdate(oldObj, newObj interface{}) { slog.Error("type of object is not service ", "obj", oldObj, "method", "OnUpdate") return } - newKey := getServiceKey(newService) - oldKey := getServiceKey(oldService) - si.handler.OnUpdateService(newKey, newService, oldKey, oldService) + + portNameOld, ok := isXDSService(oldService) + if !ok { + portNameNew, ok := isXDSService(newService) + if ok { + key, err := getServiceKey(newService, portNameNew) + if err != nil { + slog.Error("couldn't get service key ", "err", err) + return + } + res := resource.NewResource(newService.Name, newService.APIVersion, "", "service", key, portNameNew, newService) + si.handler.OnAddSerivce(res) + } + return + } + + portNameNew, ok := isXDSService(newService) + if !ok { + oldKey, err := getServiceKey(oldService, portNameOld) + if err != nil { + slog.Error("couldn't get service key ", "err", err) + return + } + si.handler.DeleteService(oldKey) + return + } + + oldKey, err := getServiceKey(oldService, portNameOld) + if err != nil { + slog.Error("couldn't get oldService key ", "err", err) + return + } + + newKey, err := getServiceKey(newService, portNameNew) + if err != nil { + slog.Error("couldn't get newService key ", "err", err) + return + } + newRes := resource.NewResource(newService.Name, newService.APIVersion, "", "service", newKey, portNameNew, newService) + oldRes := resource.NewResource(oldService.Name, oldService.APIVersion, "", "service", oldKey, portNameOld, oldService) + si.handler.OnUpdateService(newRes, oldRes) } func (si *ServiceInformer) OnDelete(obj interface{}) { @@ -74,6 +141,15 @@ func (si *ServiceInformer) OnDelete(obj interface{}) { return } - key := getServiceKey(service) - si.handler.OnDeleteService(key, service) + portName, ok := isXDSService(service) + if !ok { + return + } + key, err := getServiceKey(service, portName) + if err != nil { + slog.Error("couldn't get service key ", "err", err) + return + } + + si.handler.DeleteService(key) } diff --git a/internal/resource/provider.go b/internal/resource/provider.go index decc652..84fed9d 100644 --- a/internal/resource/provider.go +++ b/internal/resource/provider.go @@ -8,12 +8,13 @@ type Resource struct { NameSpace string `json:"namespace"` K8SKind string `json:"kind"` EnvoyTypeURL string `json:"envoyTypeURL"` + PortName string `json:"portName"` Key string `json:"key"` //key is name.namespace:portnumber Watchers map[string]struct{} `json:"watchers"` ServiceObj *v1.Service `json:"serviceOBJ"` // we only support Service, later we can add Ingress } -func NewResource(name, version, nameSpace, resourceType, key string, serviceObj *v1.Service) *Resource { +func NewResource(name, version, nameSpace, resourceType, key, portName string, serviceObj *v1.Service) *Resource { return &Resource{ Name: name, Version: version, @@ -22,5 +23,6 @@ func NewResource(name, version, nameSpace, resourceType, key string, serviceObj Key: key, ServiceObj: serviceObj, Watchers: make(map[string]struct{}), + PortName: portName, } } diff --git a/internal/xds/bootstrap.go b/internal/xds/bootstrap.go index 6d1b2a1..6ac3d43 100644 --- a/internal/xds/bootstrap.go +++ b/internal/xds/bootstrap.go @@ -7,23 +7,15 @@ import ( "strconv" "strings" - cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" - listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" - route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" - v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" - hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" // "github.com/envoyproxy/go-control-plane/pkg/cache/types" // cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" // "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/sirupsen/logrus" kube_core "k8s.io/api/core/v1" - // "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/wrapperspb" // healthpb "google.golang.org/grpc/health/grpc_health_v1" ) @@ -38,7 +30,7 @@ type ServiceConfig struct { } // key is servicename.namespace -func getAddresses(key string, portName string) []string { +func getAddresses(serviceName string, portName string, namespace string) []string { var upstreamPorts []string // serviceName := svcc.ServiceName //"be-srv" // namespace := svcc.Namespace //"default" @@ -46,14 +38,16 @@ func getAddresses(key string, portName string) []string { protocol := kube_core.ProtocolTCP //"tcp" // grpcServiceName := svcc.GRPCServiceName //"echo.EchoServer" // name := fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, namespace) - // name := fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, namespace) - name := fmt.Sprintf("%s.svc.cluster.local", key) + name := fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, namespace) + // name := fmt.Sprintf("%s.svc.cluster.local", key) + // name := key + cname, rec, err := net.LookupSRV(portName, string(protocol), name) if err != nil { - slog.Error("Could not find the address", "key", key, "portName", portName, "err", err.Error()) + slog.Error("Could not find the address", "name", name, "portName", portName, "err", err.Error()) return upstreamPorts } else { - slog.Info("address found", "cname", cname, "rec", rec) + slog.Info("addresses found", "cname", cname, "rec", rec, "name", name) } for i := range rec { @@ -122,115 +116,115 @@ func getAddresses(key string, portName string) []string { // }) // } -func (cp *ControlPlane) makeXDSConfigFromService(svc ServiceConfig) (*endpoint.ClusterLoadAssignment, *cluster.Cluster, *route.RouteConfiguration, *listener.Listener, error) { - routeConfigName := svc.ServiceName + "-route" - clusterName := svc.ServiceName + "-cluster" - virtualHostName := svc.ServiceName + "-vs" - region := svc.Region //"us-central1" - zone := svc.Zone // us-central1-a - addresses := getAddresses(fmt.Sprintf("%s.%s", svc.ServiceName, svc.PortName), svc.PortName) - if len(addresses) == 0 { - return nil, nil, nil, nil, fmt.Errorf("there is no availabe address for service: %v", svc.ServiceName) - } - // cp.log.Infof("service: %v, addresses: %v \n", svc.ServiceName, addresses) - lbe := makeLBEndpoint(addresses) - eds := &endpoint.ClusterLoadAssignment{ - ClusterName: clusterName, - Endpoints: []*endpoint.LocalityLbEndpoints{{ - Locality: &core.Locality{ - Region: region, - Zone: zone, - }, - Priority: 0, - LoadBalancingWeight: &wrapperspb.UInt32Value{Value: uint32(1000)}, - LbEndpoints: lbe, - }}, - } - cls := &cluster.Cluster{ - Name: clusterName, - LbPolicy: cluster.Cluster_ROUND_ROBIN, - ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS}, - EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{ - EdsConfig: &core.ConfigSource{ - ConfigSourceSpecifier: &core.ConfigSource_Ads{}, - }, - }, - } +// func (cp *ControlPlane) makeXDSConfigFromService(svc ServiceConfig) (*endpoint.ClusterLoadAssignment, *cluster.Cluster, *route.RouteConfiguration, *listener.Listener, error) { +// routeConfigName := svc.ServiceName + "-route" +// clusterName := svc.ServiceName + "-cluster" +// virtualHostName := svc.ServiceName + "-vs" +// region := svc.Region //"us-central1" +// zone := svc.Zone // us-central1-a +// addresses := getAddresses(fmt.Sprintf("%s.%s", svc.ServiceName, svc.PortName), svc.PortName) +// if len(addresses) == 0 { +// return nil, nil, nil, nil, fmt.Errorf("there is no availabe address for service: %v", svc.ServiceName) +// } +// // cp.log.Infof("service: %v, addresses: %v \n", svc.ServiceName, addresses) +// lbe := makeLBEndpoint(addresses) +// eds := &endpoint.ClusterLoadAssignment{ +// ClusterName: clusterName, +// Endpoints: []*endpoint.LocalityLbEndpoints{{ +// Locality: &core.Locality{ +// Region: region, +// Zone: zone, +// }, +// Priority: 0, +// LoadBalancingWeight: &wrapperspb.UInt32Value{Value: uint32(1000)}, +// LbEndpoints: lbe, +// }}, +// } +// cls := &cluster.Cluster{ +// Name: clusterName, +// LbPolicy: cluster.Cluster_ROUND_ROBIN, +// ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS}, +// EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{ +// EdsConfig: &core.ConfigSource{ +// ConfigSourceSpecifier: &core.ConfigSource_Ads{}, +// }, +// }, +// } - // RDS - // cp.log.Infof(">>>>>>>>>>>>>>>>>>> creating RDS " + virtualHostName) - vh := &route.VirtualHost{ - Name: virtualHostName, - Domains: []string{svc.ServiceName}, //******************* >> must match what is specified at xds:/// // +// // RDS +// // cp.log.Infof(">>>>>>>>>>>>>>>>>>> creating RDS " + virtualHostName) +// vh := &route.VirtualHost{ +// Name: virtualHostName, +// Domains: []string{svc.ServiceName}, //******************* >> must match what is specified at xds:/// // - Routes: []*route.Route{{ - Match: &route.RouteMatch{ - PathSpecifier: &route.RouteMatch_Prefix{ - Prefix: "", - }, - }, - Action: &route.Route_Route{ - Route: &route.RouteAction{ - ClusterSpecifier: &route.RouteAction_Cluster{ - Cluster: clusterName, - }, - }, - }, - }}} +// Routes: []*route.Route{{ +// Match: &route.RouteMatch{ +// PathSpecifier: &route.RouteMatch_Prefix{ +// Prefix: "", +// }, +// }, +// Action: &route.Route_Route{ +// Route: &route.RouteAction{ +// ClusterSpecifier: &route.RouteAction_Cluster{ +// Cluster: clusterName, +// }, +// }, +// }, +// }}} - rds := &route.RouteConfiguration{ - Name: routeConfigName, - VirtualHosts: []*route.VirtualHost{vh}, - } +// rds := &route.RouteConfiguration{ +// Name: routeConfigName, +// VirtualHosts: []*route.VirtualHost{vh}, +// } - // LISTENER - // cp.log.Infof(">>>>>>>>>>>>>>>>>>> creating LISTENER " + svc.ServiceName) - hcRds := &hcm.HttpConnectionManager_Rds{ - Rds: &hcm.Rds{ - RouteConfigName: routeConfigName, - ConfigSource: &core.ConfigSource{ - ConfigSourceSpecifier: &core.ConfigSource_Ads{ - Ads: &core.AggregatedConfigSource{}, - }, - }, - }, - } +// // LISTENER +// // cp.log.Infof(">>>>>>>>>>>>>>>>>>> creating LISTENER " + svc.ServiceName) +// hcRds := &hcm.HttpConnectionManager_Rds{ +// Rds: &hcm.Rds{ +// RouteConfigName: routeConfigName, +// ConfigSource: &core.ConfigSource{ +// ConfigSourceSpecifier: &core.ConfigSource_Ads{ +// Ads: &core.AggregatedConfigSource{}, +// }, +// }, +// }, +// } - filterPbst, err := anypb.New(&v3routerpb.Router{}) - if err != nil { - panic(err) - } - // RouterHTTPFilter := hcm.HTTPFilter("router", &v3routerpb.Router{}) - RouterHTTPFilter := &hcm.HttpFilter{ - Name: "router", - ConfigType: &hcm.HttpFilter_TypedConfig{ - TypedConfig: filterPbst, - }, - } - filters := []*hcm.HttpFilter{ - RouterHTTPFilter, - } +// filterPbst, err := anypb.New(&v3routerpb.Router{}) +// if err != nil { +// panic(err) +// } +// // RouterHTTPFilter := hcm.HTTPFilter("router", &v3routerpb.Router{}) +// RouterHTTPFilter := &hcm.HttpFilter{ +// Name: "router", +// ConfigType: &hcm.HttpFilter_TypedConfig{ +// TypedConfig: filterPbst, +// }, +// } +// filters := []*hcm.HttpFilter{ +// RouterHTTPFilter, +// } - manager := &hcm.HttpConnectionManager{ - CodecType: hcm.HttpConnectionManager_AUTO, - RouteSpecifier: hcRds, - HttpFilters: filters, - } +// manager := &hcm.HttpConnectionManager{ +// CodecType: hcm.HttpConnectionManager_AUTO, +// RouteSpecifier: hcRds, +// HttpFilters: filters, +// } - pbst, err := anypb.New(manager) - if err != nil { - panic(err) - } +// pbst, err := anypb.New(manager) +// if err != nil { +// panic(err) +// } - lsnr := &listener.Listener{ - Name: svc.ServiceName, - ApiListener: &listener.ApiListener{ - ApiListener: pbst, - }, - } +// lsnr := &listener.Listener{ +// Name: svc.ServiceName, +// ApiListener: &listener.ApiListener{ +// ApiListener: pbst, +// }, +// } - return eds, cls, rds, lsnr, nil -} +// return eds, cls, rds, lsnr, nil +// } // // HTTPFilter constructs an xds HttpFilter with the provided name and config. // func HTTPFilter(name string, config proto.Message) *hcm.HttpFilter { diff --git a/internal/xds/resource.go b/internal/xds/resource.go index dd7f885..e81cb85 100644 --- a/internal/xds/resource.go +++ b/internal/xds/resource.go @@ -16,11 +16,12 @@ import ( ) func MakeXDSResource(resourceInfo *resource.Resource, region string, - zone string, portName string) (*endpoint.ClusterLoadAssignment, *cluster.Cluster, *route.RouteConfiguration, *listener.Listener, error) { + zone string) (*endpoint.ClusterLoadAssignment, *cluster.Cluster, *route.RouteConfiguration, *listener.Listener, error) { routeConfigName := resourceInfo.Name + "-route" clusterName := resourceInfo.Name + "-cluster" virtualHostName := resourceInfo.Name + "-vs" - addresses := getAddresses(resourceInfo.Key, portName) + addresses := getAddresses(resourceInfo.ServiceObj.Name, resourceInfo.PortName, + resourceInfo.ServiceObj.Namespace) if len(addresses) == 0 { return nil, nil, nil, nil, fmt.Errorf("there is no availabe address for service: %v", resourceInfo.Key) }