Skip to content

Commit

Permalink
Merge pull request #9 from castai/feat/snapshot/additional-data
Browse files Browse the repository at this point in the history
feat: extract more of non-sensitive data
  • Loading branch information
zilvinasu authored Mar 24, 2021
2 parents dced59b + 7fb0487 commit 1d5ca60
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 71 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
restore-keys: ${{ runner.os }}-build-

- name: Build Go binary
run: go build -o bin/castai-agent
run: go build -o bin/castai-agent ./cmd/server/main.go
env:
GOOS: linux
GOARCH: amd64
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
build:
GOOS=linux go build -o bin/castai-agent
GOOS=linux go build -o bin/castai-agent ./cmd/server/main.go
docker build -t castai/agent:0.0.1 .

push:
Expand Down
53 changes: 23 additions & 30 deletions main.go → cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/go-resty/resty/v2"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"

"castai-agent/internal/services/collector"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand All @@ -38,27 +39,26 @@ type Request struct {
Payload []byte `json:"payload"`
}

type TelemetryData struct {
ClusterID string `json:"clusterId"`
AccountID string `json:"accountId"`
OrganizationID string `json:"organizationId"`
ClusterProvider string `json:"clusterProvider"`
ClusterName string `json:"clusterName"`
ClusterVersion string `json:"clusterVersion"`
ClusterRegion string `json:"clusterRegion"`
NodeList *v1.NodeList `json:"nodeList"`
PodList *v1.PodList `json:"podList"`
type TelemetrySnapshot struct {
ClusterID string `json:"clusterId"`
AccountID string `json:"accountId"`
OrganizationID string `json:"organizationId"`
ClusterProvider string `json:"clusterProvider"`
ClusterName string `json:"clusterName"`
ClusterVersion string `json:"clusterVersion"`
ClusterRegion string `json:"clusterRegion"`
*collector.ClusterData
}

type EKSParams struct {
ClusterName string `json:"clusterName"`
Region string `json:"region"`
AccountID string `json:"accountId"`
ClusterName string `json:"clusterName"`
Region string `json:"region"`
AccountID string `json:"accountId"`
}

type RegisterClusterRequest struct {
Name string `json:"name"`
EKS EKSParams `json:"eks"`
Name string `json:"name"`
EKS EKSParams `json:"eks"`
}

type Cluster struct {
Expand Down Expand Up @@ -120,36 +120,29 @@ func main() {
panic(err)
}

col := collector.NewCollector(clientset)

for {
select {
case <-ticker.C:
case <-ctx.Done():
return
}

// TODO: move into separate collector function
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
log.Errorf("failed listing nodes: %v", err)
continue
}


pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
cd, err := col.Collect(ctx)
if err != nil {
log.Errorf("failed listing pods: %v", err)
log.Errorf("failed collecting snapshot data: %v", err)
continue
}

t := &TelemetryData{
t := &TelemetrySnapshot{
OrganizationID: c.Cluster.OrganizationID,
ClusterID: c.Cluster.ID,
AccountID: awsAccountId,
ClusterProvider: "EKS",
ClusterName: clusterName,
ClusterRegion: clusterRegion,
NodeList: nodes,
PodList: pods,
ClusterData: cd,
}

version, err := clientset.ServerVersion()
Expand Down Expand Up @@ -221,7 +214,7 @@ func registerCluster(log *logrus.Logger, client *resty.Client, registerRequest *
return resp.Result().(*RegisterClusterResponse), nil
}

func sendTelemetry(log *logrus.Logger, client *resty.Client, t *TelemetryData) error {
func sendTelemetry(log *logrus.Logger, client *resty.Client, t *TelemetrySnapshot) error {
tb, err := json.Marshal(t)
if err != nil {
return err
Expand Down
38 changes: 0 additions & 38 deletions deployment.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module castai-agent

go 1.15
go 1.16

require (
github.com/aws/aws-sdk-go v1.37.23
Expand Down
198 changes: 198 additions & 0 deletions internal/services/collector/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package collector

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

type collector struct {
clientset *kubernetes.Clientset
cd *ClusterData
}

func NewCollector(clientset *kubernetes.Clientset) *collector {
var cd ClusterData
return &collector{
clientset: clientset,
cd: &cd,
}
}

func (c *collector) Collect(ctx context.Context) (*ClusterData, error) {
if err := c.collectNodes(ctx); err != nil {
return nil, err
}

if err := c.collectPods(ctx); err != nil {
return nil, err
}

if err := c.collectPods(ctx); err != nil {
return nil, err
}

if err := c.collectPersistentVolumes(ctx); err != nil {
return nil, err
}

if err := c.collectPersistentVolumeClaims(ctx); err != nil {
return nil, err
}

if err := c.collectDeploymentList(ctx); err != nil {
return nil, err
}

if err := c.collectReplicaSetList(ctx); err != nil {
return nil, err
}

if err := c.collectDaemonSetList(ctx); err != nil {
return nil, err
}

if err := c.collectStatefulSetList(ctx); err != nil {
return nil, err
}

if err := c.collectReplicationControllerList(ctx); err != nil {
return nil, err
}

if err := c.collectServiceList(ctx); err != nil {
return nil, err
}

if err := c.collectCSINodeList(ctx); err != nil {
return nil, err
}

if err := c.collectStorageClassList(ctx); err != nil {
return nil, err
}

if err := c.collectJobList(ctx); err != nil {
return nil, err
}

return c.cd, nil
}

func (c *collector) collectNodes(ctx context.Context) error {
nodes, err := c.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.NodeList = nodes
return nil
}

func (c *collector) collectPods(ctx context.Context) error {
pods, err := c.clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.PodList = pods
return nil
}

func (c *collector) collectPersistentVolumes(ctx context.Context) error {
pods, err := c.clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.PodList = pods
return nil
}

func (c *collector) collectPersistentVolumeClaims(ctx context.Context) error {
pvc, err := c.clientset.CoreV1().PersistentVolumeClaims("").List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.PersistentVolumeClaimList = pvc
return nil
}

func (c *collector) collectDeploymentList(ctx context.Context) error {
dpls, err := c.clientset.AppsV1().Deployments("").List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.DeploymentList = dpls
return nil
}

func (c *collector) collectReplicaSetList(ctx context.Context) error {
rpsl, err := c.clientset.AppsV1().ReplicaSets("").List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.ReplicaSetList = rpsl
return nil
}

func (c *collector) collectDaemonSetList(ctx context.Context) error {
dsl, err := c.clientset.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.DaemonSetList = dsl
return nil
}

func (c *collector) collectStatefulSetList(ctx context.Context) error {
stsl, err := c.clientset.AppsV1().StatefulSets("").List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.StatefulSetList = stsl
return nil
}

func (c *collector) collectReplicationControllerList(ctx context.Context) error {
rc, err := c.clientset.CoreV1().ReplicationControllers("").List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.ReplicationControllerList = rc
return nil
}

func (c *collector) collectServiceList(ctx context.Context) error {
svc, err := c.clientset.CoreV1().Services("").List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.ServiceList = svc
return nil
}

func (c *collector) collectCSINodeList(ctx context.Context) error {
csin, err := c.clientset.StorageV1().CSINodes().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.CSINodeList = csin
return nil
}

func (c *collector) collectStorageClassList(ctx context.Context) error {
scl, err := c.clientset.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.StorageClassList = scl
return nil
}

func (c *collector) collectJobList(ctx context.Context) error {
jobs, err := c.clientset.BatchV1().Jobs("").List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
c.cd.JobList = jobs
return nil
}
24 changes: 24 additions & 0 deletions internal/services/collector/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package collector

import (
appv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
)

type ClusterData struct {
NodeList *corev1.NodeList `json:"nodeList"`
PodList *corev1.PodList `json:"podList"`
PersistentVolumeList *corev1.PersistentVolumeList `json:"persistentVolumeList"`
PersistentVolumeClaimList *corev1.PersistentVolumeClaimList `json:"persistentVolumeClaimList"`
DeploymentList *appv1.DeploymentList `json:"deploymentList"`
ReplicaSetList *appv1.ReplicaSetList `json:"replicaSetList"`
DaemonSetList *appv1.DaemonSetList `json:"daemonSetList"`
StatefulSetList *appv1.StatefulSetList `json:"statefulSetList"`
ReplicationControllerList *corev1.ReplicationControllerList `json:"replicationControllerList"`
ServiceList *corev1.ServiceList `json:"serviceList"`
CSINodeList *storagev1.CSINodeList `json:"csiNodeList"`
StorageClassList *storagev1.StorageClassList `json:"storageClassList"`
JobList *batchv1.JobList `json:"jobList"`
}

0 comments on commit 1d5ca60

Please sign in to comment.