Skip to content

Commit

Permalink
chore: refactor oteld helper
Browse files Browse the repository at this point in the history
  • Loading branch information
kubeJocker committed Oct 30, 2023
1 parent 8c7d75c commit 0c724ab
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 114 deletions.
96 changes: 5 additions & 91 deletions controllers/monitor/reconcile/oteld.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package reconcile

import (
"context"
"fmt"

"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -74,116 +73,31 @@ func BuildInstanceMapForPipline(appDatasources *v1alpha1.CollectorDataSourceList
instanceMap := map[v1alpha1.Mode]*monitortype.OteldInstance{
DefaultMode: monitortype.NewOteldInstance(oteld, cli, ctx),
}
if err := buildSystemInstanceMap(oteld, instanceMap, metricsExporters, logsExporters, cli, ctx); err != nil {
if err := buildSystemInstanceMap(oteld, instanceMap, metricsExporters, logsExporters, appDatasources, cli, ctx); err != nil {
return nil, err
}

// for _, dataSource := range datasources.Items {
// mode := dataSource.Spec.Mode
// if mode == "" {
// mode = DefaultMode
// }
// oteldInstance, ok := instanceMap[mode]
// if !ok {
// oteldInstance = monitortype.NewOteldInstance(oteld)
// }
// if oteldInstance.MetricsPipline == nil {
// oteldInstance.MetricsPipline = []monitortype.Pipline{}
// }
// pipline := monitortype.NewPipeline(dataSource.Name)
// for _, data := range dataSource.Spec.DataSourceList {
// pipline.ReceiverMap[data.Name] = monitortype.Receiver{
// Parameter: data.Parameter,
// CollectionInterval: dataSource.Spec.CollectionInterval,
// }
// }
//
// if oteldInstance.Oteld.Spec.Batch.Enabled {
// pipline.ProcessorMap[monitortype.BatchProcessorName] = true
// }
// if oteldInstance.Oteld.Spec.MemoryLimiter.Enabled {
// pipline.ProcessorMap[monitortype.MemoryProcessorName] = true
// }
// switch dataSource.Spec.Type {
// case v1alpha1.MetricsDatasourceType:
// for _, exporterRef := range dataSource.Spec.ExporterNames {
// for _, exporter := range metricsExporters.Items {
// if exporter.Name == exporterRef {
// pipline.ExporterMap[fmt.Sprintf(ExporterNamePattern, exporter.Spec.Type, exporter.Name)] = true
// }
// }
// }
// oteldInstance.MetricsPipline = append(oteldInstance.MetricsPipline, pipline)
// case v1alpha1.LogsDataSourceType:
// for _, exporterRef := range dataSource.Spec.ExporterNames {
// for _, exporter := range logsExporters.Items {
// if exporter.Name == exporterRef {
// pipline.ExporterMap[fmt.Sprintf(ExporterNamePattern, exporter.Spec.Type, exporter.Name)] = true
// }
// }
// }
// oteldInstance.LogPipline = append(oteldInstance.LogPipline, pipline)
// default:
// return nil, fmt.Errorf("unknown data source type %s", dataSource.Spec.Type)
// }
// instanceMap[mode] = oteldInstance
// }

for _, dataSource := range appDatasources.Items {
mode := v1alpha1.ModeDaemonSet
oteldInstance, ok := instanceMap[mode]
if !ok {
oteldInstance = monitortype.NewOteldInstance(oteld, cli, ctx)
}
oteldInstance.AppDataSources = append(oteldInstance.AppDataSources, dataSource)
instanceMap[mode] = oteldInstance
}

for _, instance := range instanceMap {
systemMetricsPipline := monitortype.NewPipeline(monitortype.AppMetricsCreatorName)
if instance.Oteld.Spec.Batch.Enabled {
systemMetricsPipline.ProcessorMap[monitortype.BatchProcessorName] = true
}
if instance.Oteld.Spec.MemoryLimiter.Enabled {
systemMetricsPipline.ProcessorMap[monitortype.MemoryProcessorName] = true
}
for _, exporter := range metricsExporters.Items {
systemMetricsPipline.ExporterMap[fmt.Sprintf(ExporterNamePattern, exporter.Spec.Type, exporter.Name)] = true
}
instance.AppMetricsPiplien = systemMetricsPipline

logPipline := monitortype.NewPipeline(monitortype.LogCreatorName)
logPipline.ReceiverMap[monitortype.LogCreatorName] = monitortype.Receiver{}
if instance.Oteld.Spec.Batch.Enabled {
logPipline.ProcessorMap[monitortype.BatchProcessorName] = true
}
if instance.Oteld.Spec.MemoryLimiter.Enabled {
logPipline.ProcessorMap[monitortype.MemoryProcessorName] = true
}
for _, exporter := range logsExporters.Items {
logPipline.ExporterMap[fmt.Sprintf(ExporterNamePattern, exporter.Spec.Type, exporter.Name)] = true
}
instance.AppLogsPipline = logPipline
}

return instanceMap, nil
}

func buildSystemInstanceMap(oteld *v1alpha1.OTeld,
instanceMap map[v1alpha1.Mode]*monitortype.OteldInstance,
exporters *v1alpha1.MetricsExporterSinkList,
logsExporters *v1alpha1.LogsExporterSinkList,
datasources *v1alpha1.CollectorDataSourceList,
cli client.Client,
ctx context.Context) error {
systemDataSource := oteld.Spec.SystemDataSource
if systemDataSource == nil {
return nil
}

return newOTeldHelper(systemDataSource, instanceMap, oteld, exporters, logsExporters, cli, ctx).
return newOTeldHelper(systemDataSource, instanceMap, oteld, exporters, logsExporters, datasources, cli, ctx).
buildAPIServicePipeline().
buildK8sNodeStatesPipeline().
buildNodePipeline().
buildPodLogsPipeline().
appendUserDataSource().
buildFixedPipline().
complete()
}
88 changes: 69 additions & 19 deletions controllers/monitor/reconcile/oteld_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type oteldWrapper struct {
errs []error

source *v1alpha1.SystemDataSource
userSource *v1alpha1.CollectorDataSourceList
instanceMap map[v1alpha1.Mode]*types.OteldInstance
logsExporters *v1alpha1.LogsExporterSinkList
metricsExporters *v1alpha1.MetricsExporterSinkList
Expand All @@ -48,14 +49,24 @@ const (
k8sclusterPipeline = "api-service"
k8snodePipeline = "datasource-metrics"
k8spodLogsPipeline = "podlogs"

AppMetricsCreatorName = "receiver_creator/app"
LogsCreatorName = "receiver_creator/logs"
)

type collectType string

const (
collectTypeMetrics collectType = "metrics"
collectTypeLogs collectType = "logs"
)

func (w *oteldWrapper) buildAPIServicePipeline() *oteldWrapper {
if !w.source.EnabledK8sClusterExporter {
return w
}

pipeline := w.createPipeline(v1alpha1.ModeDeployment, k8sclusterPipeline, false)
pipeline := w.createPipeline(v1alpha1.ModeDeployment, k8sclusterPipeline, collectTypeMetrics)
pipeline.ReceiverMap[constant.APIServiceReceiverTPLName] = types.Receiver{
CollectionInterval: w.source.CollectionInterval.String(),
}
Expand All @@ -69,7 +80,7 @@ func (w *oteldWrapper) buildK8sNodeStatesPipeline() *oteldWrapper {
return w
}

pipeline := w.createPipeline(v1alpha1.ModeDaemonSet, k8snodePipeline, false)
pipeline := w.createPipeline(v1alpha1.ModeDaemonSet, k8snodePipeline, collectTypeMetrics)
pipeline.ReceiverMap[constant.K8SNodeStatesReceiverTPLName] = types.Receiver{
CollectionInterval: w.source.CollectionInterval.String(),
}
Expand All @@ -83,7 +94,7 @@ func (w *oteldWrapper) buildNodePipeline() *oteldWrapper {
return w
}

pipeline := w.createPipeline(v1alpha1.ModeDaemonSet, k8snodePipeline, false)
pipeline := w.createPipeline(v1alpha1.ModeDaemonSet, k8snodePipeline, collectTypeMetrics)
pipeline.ReceiverMap[constant.NodeExporterReceiverTPLName] = types.Receiver{
CollectionInterval: w.source.CollectionInterval.String(),
}
Expand All @@ -97,14 +108,14 @@ func (w *oteldWrapper) buildPodLogsPipeline() *oteldWrapper {
return w
}

pipeline := w.createPipeline(v1alpha1.ModeDaemonSet, k8spodLogsPipeline, true)
pipeline := w.createPipeline(v1alpha1.ModeDaemonSet, k8spodLogsPipeline, collectTypeLogs)
pipeline.ReceiverMap[constant.PodLogsReceiverTPLName] = types.Receiver{}
w.buildProcessor(pipeline)
w.buildLogsExporter(pipeline)
return w
}

func (w *oteldWrapper) createPipeline(mode v1alpha1.Mode, name string, logsCollect bool) *types.Pipline {
func (w *oteldWrapper) createPipeline(mode v1alpha1.Mode, name string, collectType collectType) *types.Pipline {
var instance *types.OteldInstance

if instance = w.instanceMap[mode]; instance == nil {
Expand All @@ -114,7 +125,7 @@ func (w *oteldWrapper) createPipeline(mode v1alpha1.Mode, name string, logsColle
if instance.MetricsPipline == nil {
instance.MetricsPipline = []types.Pipline{}
}
return foundOrCreatePipeline(instance, name, logsCollect)
return foundOrCreatePipeline(instance, name, collectType)
}

func (w *oteldWrapper) buildProcessor(pipeline *types.Pipline) {
Expand Down Expand Up @@ -146,11 +157,51 @@ func (w *oteldWrapper) buildLogsExporter(pipeline *types.Pipline) {
w.errs = append(w.errs, cfgcore.MakeError("the logs exporter[%s] relied on by %s was not found.", w.source.LogsExporterRef, pipeline.Name))
}

func (w *oteldWrapper) appendAllMetricsExporter(pipeline *types.Pipline) {
for _, exporter := range w.metricsExporters.Items {
pipeline.ExporterMap[fmt.Sprintf(ExporterNamePattern, exporter.Spec.Type, exporter.Name)] = true
}
}

func (w *oteldWrapper) appendAllLogsExporter(pipeline *types.Pipline) {
for _, exporter := range w.logsExporters.Items {
pipeline.ExporterMap[fmt.Sprintf(ExporterNamePattern, exporter.Spec.Type, exporter.Name)] = true
}
}

func (w *oteldWrapper) appendUserDataSource() *oteldWrapper {
for _, dataSource := range w.userSource.Items {
var instance *types.OteldInstance

if instance = w.instanceMap[v1alpha1.ModeDaemonSet]; instance == nil {
instance = types.NewOteldInstance(w.OTeld, w.cli, w.ctx)
w.instanceMap[v1alpha1.ModeDaemonSet] = instance
}
instance.AppDataSources = append(instance.AppDataSources, dataSource)
}
return w
}

func (w *oteldWrapper) buildFixedPipline() *oteldWrapper {
for _, instance := range w.instanceMap {
logsPipline := types.NewPipeline(LogsCreatorName)
w.buildProcessor(&logsPipline)
w.appendAllLogsExporter(&logsPipline)
instance.AppLogsPipline = &logsPipline

metricsPipline := types.NewPipeline(AppMetricsCreatorName)
w.buildProcessor(&metricsPipline)
w.appendAllMetricsExporter(&metricsPipline)
instance.AppMetricsPiplien = &metricsPipline
}
return w
}

func (w *oteldWrapper) complete() error {
return errors.Join(w.errs...)
}

func foundOrCreatePipeline(instance *types.OteldInstance, name string, collect bool) *types.Pipline {
func foundOrCreatePipeline(instance *types.OteldInstance, name string, collectType collectType) *types.Pipline {
foundPipeline := func(pipelines []types.Pipline) *types.Pipline {
for i := range pipelines {
pipeline := &pipelines[i]
Expand All @@ -168,28 +219,27 @@ func foundOrCreatePipeline(instance *types.OteldInstance, name string, collect b
return update(p)
}

if collect {
switch collectType {
case collectTypeMetrics:
return checkAndCreate(instance.MetricsPipline, func(pipeline types.Pipline) *types.Pipline {
instance.MetricsPipline = append(instance.MetricsPipline, pipeline)
return &instance.MetricsPipline[len(instance.MetricsPipline)-1]
})
case collectTypeLogs:
return checkAndCreate(instance.LogPipline, func(pipeline types.Pipline) *types.Pipline {
instance.LogPipline = append(instance.LogPipline, pipeline)
return &instance.LogPipline[len(instance.LogPipline)-1]
})
default:
return nil
}
return checkAndCreate(instance.MetricsPipline, func(pipeline types.Pipline) *types.Pipline {
instance.MetricsPipline = append(instance.MetricsPipline, pipeline)
return &instance.MetricsPipline[len(instance.MetricsPipline)-1]
})
}

func newOTeldHelper(source *v1alpha1.SystemDataSource,
instanceMap map[v1alpha1.Mode]*types.OteldInstance,
oteld *v1alpha1.OTeld,
metricsExporters *v1alpha1.MetricsExporterSinkList,
logsExporters *v1alpha1.LogsExporterSinkList,
cli client.Client,
ctx context.Context) *oteldWrapper {
func newOTeldHelper(source *v1alpha1.SystemDataSource, instanceMap map[v1alpha1.Mode]*types.OteldInstance, oteld *v1alpha1.OTeld, metricsExporters *v1alpha1.MetricsExporterSinkList, logsExporters *v1alpha1.LogsExporterSinkList, userSources *v1alpha1.CollectorDataSourceList, cli client.Client, ctx context.Context) *oteldWrapper {
return &oteldWrapper{
OTeld: oteld,
source: source,
userSource: userSources,
instanceMap: instanceMap,
logsExporters: logsExporters,
metricsExporters: metricsExporters,
Expand Down
8 changes: 4 additions & 4 deletions controllers/monitor/types/pipline.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type Pipline struct {
type OteldInstance struct {
MetricsPipline []Pipline
LogPipline []Pipline
AppLogsPipline Pipline
AppMetricsPiplien Pipline
AppLogsPipline *Pipline
AppMetricsPiplien *Pipline
Oteld *v1alpha1.OTeld
AppDataSources []v1alpha1.CollectorDataSource

Expand All @@ -71,8 +71,8 @@ func NewOteldInstance(oteld *v1alpha1.OTeld, cli client.Client, ctx context.Cont
Oteld: oteld,
MetricsPipline: []Pipline{},
LogPipline: []Pipline{},
AppLogsPipline: Pipline{},
AppMetricsPiplien: Pipline{},
AppLogsPipline: &Pipline{},
AppMetricsPiplien: &Pipline{},
AppDataSources: []v1alpha1.CollectorDataSource{},
}
}
15 changes: 15 additions & 0 deletions controllers/monitor/types/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,29 @@ package types
import (
"context"

"gopkg.in/yaml.v2"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/apis/monitor/v1alpha1"
"github.com/apecloud/kubeblocks/controllers/monitor/builder"
"github.com/apecloud/kubeblocks/pkg/configuration/core"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

func buildSliceFromCUE(tplName string, valMap map[string]any) (yaml.MapSlice, error) {
bytes, err := builder.BuildFromCUEForOTel(tplName, valMap, "output")
if err != nil {
return nil, err
}
extensionSlice := yaml.MapSlice{}
err = yaml.Unmarshal(bytes, &extensionSlice)
if err != nil {
return nil, err
}
return extensionSlice, nil
}

type ScrapeConfig map[string]any

func fromCollectorDataSource(dataSourceSpec v1alpha1.CollectorDataSourceSpec, cli client.Client, ctx context.Context, namespace string) ([]ScrapeConfig, error) {
Expand Down

0 comments on commit 0c724ab

Please sign in to comment.