diff --git a/cmd/kuscia/modules/ktexporter.go b/cmd/kuscia/modules/ktexporter.go new file mode 100644 index 00000000..77823ea7 --- /dev/null +++ b/cmd/kuscia/modules/ktexporter.go @@ -0,0 +1,56 @@ +// Copyright 2023 Ant Group Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package modules + +import ( + "context" + "fmt" + "time" + + pkgcom "github.com/secretflow/kuscia/pkg/common" + "github.com/secretflow/kuscia/pkg/ktexporter" + "github.com/secretflow/kuscia/pkg/utils/readyz" +) + +type ktExporterModule struct { + moduleRuntimeBase + runMode pkgcom.RunModeType + domainID string + rootDir string + metricUpdatePeriod uint + ktExportPort string +} + +func NewKtExporter(i *ModuleRuntimeConfigs) (Module, error) { + readyURI := fmt.Sprintf("http://127.0.0.1:%s", i.KtExportPort) + return &ktExporterModule{ + moduleRuntimeBase: moduleRuntimeBase{ + name: "ktexporter", + readyTimeout: 60 * time.Second, + rdz: readyz.NewHTTPReadyZ(readyURI, 404, func(_ []byte) error { + return nil + }), + }, + runMode: i.RunMode, + domainID: i.DomainID, + rootDir: i.RootDir, + metricUpdatePeriod: i.MetricUpdatePeriod, + ktExportPort: i.KtExportPort, + }, nil +} + +func (exporter *ktExporterModule) Run(ctx context.Context) error { + return ktexporter.KtExporter(ctx, exporter.runMode, exporter.domainID, exporter.metricUpdatePeriod, exporter.ktExportPort) +} diff --git a/cmd/kuscia/modules/metricexporter.go b/cmd/kuscia/modules/metricexporter.go index e6cfae25..f99ed764 100644 --- a/cmd/kuscia/modules/metricexporter.go +++ b/cmd/kuscia/modules/metricexporter.go @@ -33,6 +33,7 @@ type metricExporterModule struct { metricURLs map[string]string nodeExportPort string ssExportPort string + ktExportPort string metricExportPort string podManager pod.Manager } @@ -63,12 +64,14 @@ func NewMetricExporter(i *ModuleRuntimeConfigs) (Module, error) { rootDir: i.RootDir, nodeExportPort: i.NodeExportPort, ssExportPort: i.SsExportPort, + ktExportPort: i.KtExportPort, metricExportPort: i.MetricExportPort, podManager: podManager, metricURLs: map[string]string{ "node-exporter": "http://localhost:" + i.NodeExportPort + "/metrics", "envoy": envoyexporter.GetEnvoyMetricURL(), "ss": "http://localhost:" + i.SsExportPort + "/ssmetrics", + "kt": "http://localhost:" + i.KtExportPort + "/ktmetrics", }, } return exporter, nil diff --git a/cmd/kuscia/modules/runtime.go b/cmd/kuscia/modules/runtime.go index fa5e049b..544d8b3a 100644 --- a/cmd/kuscia/modules/runtime.go +++ b/cmd/kuscia/modules/runtime.go @@ -51,6 +51,7 @@ type ModuleRuntimeConfigs struct { TransportPort int InterConnSchedulerPort int SsExportPort string + KtExportPort string NodeExportPort string MetricExportPort string KusciaKubeConfig string @@ -190,6 +191,7 @@ func NewModuleRuntimeConfigs(ctx context.Context, kusciaConf confloader.KusciaCo } // init exporter ports + dependencies.KtExportPort = "9093" dependencies.SsExportPort = "9092" dependencies.NodeExportPort = "9100" dependencies.MetricExportPort = "9091" diff --git a/cmd/kuscia/start/start.go b/cmd/kuscia/start/start.go index c46a96c3..79d15b95 100644 --- a/cmd/kuscia/start/start.go +++ b/cmd/kuscia/start/start.go @@ -90,6 +90,7 @@ func Start(ctx context.Context, configFile string) error { mm.Regist("metricexporter", modules.NewMetricExporter, autonomy, lite, master) mm.Regist("nodeexporter", modules.NewNodeExporter, autonomy, lite, master) mm.Regist("ssexporter", modules.NewSsExporter, autonomy, lite, master) + mm.Regist("ktexporter", modules.NewKtExporter, autonomy, lite, master) mm.Regist("scheduler", modules.NewScheduler, autonomy, master) mm.Regist("transport", modules.NewTransport, autonomy, lite) @@ -104,7 +105,8 @@ func Start(ctx context.Context, configFile string) error { mm.SetDependencies("kusciaapi", "k3s", "config", "domainroute") mm.SetDependencies("scheduler", "k3s") mm.SetDependencies("ssexporter", "envoy") - mm.SetDependencies("metricexporter", "agent", "envoy", "ssexporter", "nodeexporter") + mm.SetDependencies("ktexporter", "envoy") + mm.SetDependencies("metricexporter", "agent", "envoy", "ssexporter", "ktexporter", "nodeexporter") mm.SetDependencies("transport", "envoy") mm.SetDependencies("k3s", "coredns") diff --git a/docs/tutorial/run_scql_on_kuscia_cn.md b/docs/tutorial/run_scql_on_kuscia_cn.md index e9047623..f1de42d5 100644 --- a/docs/tutorial/run_scql_on_kuscia_cn.md +++ b/docs/tutorial/run_scql_on_kuscia_cn.md @@ -291,7 +291,12 @@ curl -X POST http://127.0.0.1:80/intra/project/create \ ```bash curl -X POST http://127.0.0.1:80/intra/project/list \ --header "host: scql-broker-intra.alice.svc" \ ---header "kuscia-source: alice" +--header "kuscia-source: alice" \ +-d '{ + "ids": [ + "demo" + ] +}' ``` 4.邀请 bob 加入到 "demo" 项目中 @@ -309,7 +314,8 @@ curl -X POST http://127.0.0.1:80/intra/member/invite \ ```bash curl -X POST http://127.0.0.1:80/intra/invitation/list \ --header "host: scql-broker-intra.alice.svc" \ ---header "kuscia-source: alice" +--header "kuscia-source: alice" \ +-d '{}' ``` #### Bob 接受邀请 @@ -699,4 +705,4 @@ ls /home/kuscia/var/stdout/pods # 查看应用容器的日志,示例如下: cat /home/kuscia/var/stdout/pods/alice_xxxx_engine_xxxx/secretflow/0.log cat /home/kuscia/var/stdout/pods/alice_xxxx_broker_xxxx/secretflow/0.log -``` \ No newline at end of file +``` diff --git a/go.mod b/go.mod index b23ea2c1..0968240b 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/moby/sys/mount v0.3.3 github.com/moby/sys/mountinfo v0.7.1 github.com/olekukonko/tablewriter v0.0.5 - github.com/opencontainers/image-spec v1.1.0-rc5 + github.com/opencontainers/image-spec v1.1.0 github.com/opencontainers/runtime-spec v1.1.1-0.20230823135140-4fec88fd00a4 github.com/opencontainers/selinux v1.11.0 github.com/patrickmn/go-cache v2.1.0+incompatible @@ -90,11 +90,14 @@ require ( require ( cloud.google.com/go/compute v1.23.3 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect + github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect + github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab // indirect github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible // indirect github.com/MakeNowJust/heredoc v1.0.0 // indirect - github.com/Microsoft/go-winio v0.6.1 // indirect + github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/Microsoft/hcsshim v0.11.7 // indirect github.com/NYTimes/gziphandler v1.1.1 // indirect github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect github.com/apparentlymart/go-cidr v1.1.0 // indirect @@ -109,9 +112,19 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/cilium/ebpf v0.11.0 // indirect github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 // indirect + github.com/containerd/cgroups v1.1.0 // indirect github.com/containerd/console v1.0.3 // indirect + github.com/containerd/containerd v1.7.24 // indirect + github.com/containerd/containerd/api v1.7.19 // indirect + github.com/containerd/continuity v0.4.2 // indirect + github.com/containerd/errdefs v0.3.0 // indirect + github.com/containerd/fifo v1.1.0 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/containerd/platforms v0.2.1 // indirect github.com/containerd/stargz-snapshotter/estargz v0.14.3 // indirect - github.com/containerd/ttrpc v1.2.2 // indirect + github.com/containerd/ttrpc v1.2.5 // indirect + github.com/containerd/typeurl v1.0.2 // indirect + github.com/containerd/typeurl/v2 v2.1.1 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cyphar/filepath-securejoin v0.2.4 // indirect @@ -120,8 +133,10 @@ require ( github.com/distribution/reference v0.6.0 // indirect github.com/dnstap/golang-dnstap v0.4.0 // indirect github.com/docker/cli v27.1.1+incompatible // indirect - github.com/docker/docker v27.1.1+incompatible // indirect + github.com/docker/docker v27.3.1+incompatible // indirect github.com/docker/docker-credential-helpers v0.7.0 // indirect + github.com/docker/go-connections v0.4.0 // indirect + github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect github.com/euank/go-kmsg-parser v2.0.0+incompatible // indirect @@ -184,7 +199,12 @@ require ( github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible // indirect github.com/mitchellh/go-wordwrap v1.0.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/locker v1.0.1 // indirect github.com/moby/spdystream v0.2.0 // indirect + github.com/moby/sys/sequential v0.5.0 // indirect + github.com/moby/sys/signal v0.7.0 // indirect + github.com/moby/sys/user v0.3.0 // indirect + github.com/moby/sys/userns v0.1.0 // indirect github.com/moby/term v0.0.0-20221205130635-1aeaba878587 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -221,11 +241,12 @@ require ( github.com/xlab/treeprint v1.1.0 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect - go.etcd.io/bbolt v1.3.8 // indirect + go.etcd.io/bbolt v1.3.10 // indirect go.etcd.io/etcd/api/v3 v3.5.6 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.6 // indirect go.etcd.io/etcd/client/v2 v2.305.6 // indirect go.etcd.io/etcd/client/v3 v3.5.6 // indirect + go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect go.opentelemetry.io/otel v1.21.0 // indirect diff --git a/go.sum b/go.sum index d9d06e5c..926b3274 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,10 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= +github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0 h1:59MxjQVfjXsBpLy+dbd2/ELV5ofnUkUZBvWSC85sheA= +github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0/go.mod h1:OahwfttHWG6eJ0clwcfBAHoDI6X/LV/15hx/wlMZSrU= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -52,6 +56,10 @@ github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6 github.com/Microsoft/go-winio v0.4.15/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/Microsoft/hcsshim v0.11.7 h1:vl/nj3Bar/CvJSYo7gIQPyRWc9f3c6IeSNavBTSZNZQ= +github.com/Microsoft/hcsshim v0.11.7/go.mod h1:MV8xMfmECjl5HdO7U/3/hFVnkmSBjAjmA09d4bExKcU= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/agiledragon/gomonkey/v2 v2.2.0 h1:QJWqpdEhGV/JJy70sZ/LDnhbSlMrqHAWHcNOjz1kyuI= @@ -126,17 +134,37 @@ github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+g github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/container-storage-interface/spec v1.7.0 h1:gW8eyFQUZWWrMWa8p1seJ28gwDoN5CVJ4uAbQ+Hdycw= github.com/container-storage-interface/spec v1.7.0/go.mod h1:JYuzLqr9VVNoDJl44xp/8fmCOvWPDKzuGTwCoklhuqk= +github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM= +github.com/containerd/cgroups v1.1.0/go.mod h1:6ppBcbh/NOOUU+dMKrykgaBnK9lCIBxHqJDGwsa1mIw= github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0= github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0= github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw= github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= +github.com/containerd/containerd v1.7.24 h1:zxszGrGjrra1yYJW/6rhm9cJ1ZQ8rkKBR48brqsa7nA= +github.com/containerd/containerd v1.7.24/go.mod h1:7QUzfURqZWCZV7RLNEn1XjUCQLEf0bkaK4GjUaZehxw= +github.com/containerd/containerd/api v1.7.19 h1:VWbJL+8Ap4Ju2mx9c9qS1uFSB1OVYr5JJrW2yT5vFoA= +github.com/containerd/containerd/api v1.7.19/go.mod h1:fwGavl3LNwAV5ilJ0sbrABL44AQxmNjDRcwheXDb6Ig= +github.com/containerd/continuity v0.4.2 h1:v3y/4Yz5jwnvqPKJJ+7Wf93fyWoCB3F5EclWG023MDM= +github.com/containerd/continuity v0.4.2/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ= +github.com/containerd/errdefs v0.3.0 h1:FSZgGOeK4yuT/+DnF07/Olde/q4KBoMsaamhXxIMDp4= +github.com/containerd/errdefs v0.3.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= +github.com/containerd/fifo v1.1.0 h1:4I2mbh5stb1u6ycIABlBw9zgtlK8viPI9QkQNRQEEmY= +github.com/containerd/fifo v1.1.0/go.mod h1:bmC4NWMbXlt2EZ0Hc7Fx7QzTFxgPID13eH0Qu+MAb2o= +github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= +github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= +github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw= github.com/containerd/stargz-snapshotter/estargz v0.14.3 h1:OqlDCK3ZVUO6C3B/5FSkDwbkEETK84kQgEeFwDC+62k= github.com/containerd/stargz-snapshotter/estargz v0.14.3/go.mod h1:KY//uOCIkSuNAHhJogcZtrNHdKrA99/FCCRjE3HD36o= github.com/containerd/ttrpc v1.1.0/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ= github.com/containerd/ttrpc v1.2.2 h1:9vqZr0pxwOF5koz6N0N3kJ0zDHokrcPxIR/ZR2YFtOs= github.com/containerd/ttrpc v1.2.2/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak= +github.com/containerd/ttrpc v1.2.5 h1:IFckT1EFQoFBMG4c3sMdT8EP3/aKfumK1msY+Ze4oLU= +github.com/containerd/ttrpc v1.2.5/go.mod h1:YCXHsb32f+Sq5/72xHubdiJRQY9inL4a4ZQrAbN1q9o= github.com/containerd/typeurl v1.0.2 h1:Chlt8zIieDbzQFzXzAeBEF92KhExuE4p9p92/QmY7aY= github.com/containerd/typeurl v1.0.2/go.mod h1:9trJWW2sRlGub4wZJRTW83VtbOLS6hwcDZXTn6oPz9s= +github.com/containerd/typeurl/v2 v2.1.1 h1:3Q4Pt7i8nYwy2KmQWIw2+1hTvwTE/6w9FqcttATPO/4= +github.com/containerd/typeurl/v2 v2.1.1/go.mod h1:IDp2JFvbwZ31H8dQbEIY7sDl2L3o3HZj1hsSQlywkQ0= github.com/coredns/caddy v1.1.1 h1:2eYKZT7i6yxIfGP3qLJoJ7HAsDJqYB+X68g4NYjSrE0= github.com/coredns/caddy v1.1.1/go.mod h1:A6ntJQlAWuQfFlsd9hvigKbo2WS0VUs2l1e2F+BawD4= github.com/coredns/coredns v1.10.0 h1:jCfuWsBjTs0dapkkhISfPCzn5LqvSRtrFtaf/Tjj4DI= @@ -172,10 +200,14 @@ github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4Kfc github.com/docker/docker v20.10.18+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY= github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v27.3.1+incompatible h1:KttF0XoteNTicmUtBO0L2tP+J7FGRFTjaEF4k6WdhfI= +github.com/docker/docker v27.3.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker-credential-helpers v0.7.0 h1:xtCHsjxogADNZcdv1pKUHXryefjlVRqWqIhk/uXJp0A= github.com/docker/docker-credential-helpers v0.7.0/go.mod h1:rETQfLdHNT3foU5kuNkFR1R1V12OJRRO5lzt2D1b5X0= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= +github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c h1:+pKlWGMw7gf6bQ+oDZB4KHQFypsfjYlq/C4rfL7D3g8= +github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= @@ -346,6 +378,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -493,6 +526,8 @@ github.com/mitchellh/go-wordwrap v1.0.0 h1:6GlHJ/LTGMrIJbwgdqdl2eEH8o+Exx/0m8ir9 github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= +github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg= +github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc= github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs= @@ -501,6 +536,14 @@ github.com/moby/sys/mountinfo v0.5.0/go.mod h1:3bMD3Rg+zkqx8MRYPi7Pyb0Ie97QEBmdx github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= github.com/moby/sys/mountinfo v0.7.1 h1:/tTvQaSJRr2FshkhXiIpux6fQ2Zvc4j7tAhMTStAG2g= github.com/moby/sys/mountinfo v0.7.1/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= +github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc= +github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo= +github.com/moby/sys/signal v0.7.0 h1:25RW3d5TnQEoKvRbEKUGay6DCQ46IxAVTT9CUMgmsSI= +github.com/moby/sys/signal v0.7.0/go.mod h1:GQ6ObYZfqacOwTtlXvcmh9A26dVRul/hbOZn88Kg8Tg= +github.com/moby/sys/user v0.3.0 h1:9ni5DlcW5an3SvRSx4MouotOygvzaXbaSrc/wGDFWPo= +github.com/moby/sys/user v0.3.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs= +github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g= +github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28= github.com/moby/term v0.0.0-20220808134915-39b0c02b01ae/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw= github.com/moby/term v0.0.0-20221205130635-1aeaba878587 h1:HfkjXDfhgVaN5rmueG8cL8KKeFNecRCXFhaJ2qZ5SKA= github.com/moby/term v0.0.0-20221205130635-1aeaba878587/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= @@ -541,6 +584,8 @@ github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3I github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI= github.com/opencontainers/image-spec v1.1.0-rc5/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8= +github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= +github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/opencontainers/runc v1.1.4/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg= github.com/opencontainers/runc v1.1.12 h1:BOIssBaW1La0/qbNZHXOOa71dZfZEQOzW7dqQf3phss= github.com/opencontainers/runc v1.1.12/go.mod h1:S+lQwSfncpBha7XTy/5lBwWgm5+y5Ma/O44Ekby9FK8= @@ -710,6 +755,7 @@ gitlab.com/jonas.jasas/condchan v0.0.0-20190210165812-36637ad2b5bc/go.mod h1:4JS go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA= go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= +go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ= go.etcd.io/etcd/api/v3 v3.5.6 h1:Cy2qx3npLcYqTKqGJzMypnMv2tiRyifZJ17BlWIWA7A= go.etcd.io/etcd/api/v3 v3.5.6/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8= go.etcd.io/etcd/client/pkg/v3 v3.5.6 h1:TXQWYceBKqLp4sa87rcPs11SXxUA/mHwH975v+BDvLU= @@ -729,6 +775,8 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 h1:x8Z78aZx8cOF0+Kkazoc7lwUNMGy0LrzEMxTm4BbTxg= @@ -844,6 +892,7 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= @@ -1118,6 +1167,7 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= diff --git a/pkg/agent/pod/pod_manager.go b/pkg/agent/pod/pod_manager.go index 00bb4d39..45847276 100644 --- a/pkg/agent/pod/pod_manager.go +++ b/pkg/agent/pod/pod_manager.go @@ -342,4 +342,3 @@ func (pm *basicManager) GetPodByMirrorPod(mirrorPod *v1.Pod) (*v1.Pod, bool) { pod, ok := pm.podByFullName[pkgcontainer.GetPodFullName(mirrorPod)] return pod, ok } - diff --git a/pkg/ktexporter/fetchmetrics/fetchmetrics.go b/pkg/ktexporter/fetchmetrics/fetchmetrics.go new file mode 100644 index 00000000..ffe14edd --- /dev/null +++ b/pkg/ktexporter/fetchmetrics/fetchmetrics.go @@ -0,0 +1,347 @@ +package fetchmetrics + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + v1 "github.com/containerd/cgroups/stats/v1" + "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/typeurl" + "github.com/docker/docker/client" + "github.com/gogo/protobuf/types" + "github.com/secretflow/kuscia/pkg/common" + "github.com/secretflow/kuscia/pkg/utils/nlog" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + listers "k8s.io/client-go/listers/core/v1" +) + +type ContainerConfig struct { + Hostname string `json:"hostname"` +} + +func GetKusciaTaskPID(podLister listers.PodLister) (map[string]string, error) { + taskIDToPID := make(map[string]string) + var pods []*corev1.Pod + pods, err := podLister.List(labels.Everything()) // 这里不做任何标签筛选 + if err != nil { + return nil, err + } + + for _, pod := range pods { + annotations := pod.Annotations + if annotations == nil || annotations[common.TaskIDAnnotationKey] == "" { + continue + } + taskID := annotations[common.TaskIDAnnotationKey] + for _, container := range pod.Status.ContainerStatuses { + containerID := container.ContainerID + taskIDToPID[taskID] = containerID + } + } + + return taskIDToPID, nil +} + +func getContainerPID(containerID string) (int, error) { + // 创建一个 Docker 客户端 + cli, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + return 0, err + } + + // 获取容器的详细信息 + containerInfo, err := cli.ContainerInspect(context.Background(), containerID) + if err != nil { + return 0, err + } + + // 返回容器的 PID + return containerInfo.State.Pid, nil +} + +func GetTaskIDToContainerID(podLister listers.PodLister) (map[string]string, error) { + var pods []*corev1.Pod + pods, err := podLister.List(labels.Everything()) // 这里不做任何标签筛选 + if err != nil { + return nil, err + } + taskIDToContainerID := make(map[string]string) + + for _, pod := range pods { + annotations := pod.Annotations + if annotations == nil || annotations[common.TaskIDAnnotationKey] == "" { + continue + } + taskID := annotations[common.TaskIDAnnotationKey] + for _, container := range pod.Status.ContainerStatuses { + containerID := container.ContainerID + PID, err := getContainerPID(containerID) + if err != nil { + nlog.Fatal(err) + } + containerPID := strconv.Itoa(PID) + taskIDToContainerID[taskID] = containerPID + } + } + return taskIDToContainerID, nil +} + +// ContainerStats holds the stats information for a container +type ContainerStats struct { + CPUPercentage string + Memory string + Disk string + Inodes string +} + +// GetContainerStats fetches the container stats using crictl stats command +func GetContainerStats(podLister listers.PodLister) (map[string]ContainerStats, error) { + + var pods []*corev1.Pod + pods, err := podLister.List(labels.Everything()) // 这里不做任何标签筛选 + if err != nil { + return nil, err + } + statsMap := make(map[string]ContainerStats) + + for _, pod := range pods { + + for _, container := range pod.Status.ContainerStatuses { + containerID := container.ContainerID + + client, err := containerd.New("/run/containerd/containerd.sock") + if err != nil { + nlog.Fatal(err) + } + defer client.Close() + + ctx := namespaces.WithNamespace(context.Background(), "default") + + container, err := client.LoadContainer(ctx, containerID) + if err != nil { + nlog.Fatalf("failed to load container: %v", err) + } + + task, err := container.Task(ctx, cio.Load) + if err != nil { + nlog.Fatalf("failed to load task for container: %v", err) + } + + metrics, err := task.Metrics(ctx) + if err != nil { + nlog.Fatalf("failed to get metrics: %v", err) + } + + newAny := &types.Any{ + TypeUrl: metrics.Data.GetTypeUrl(), + Value: metrics.Data.GetValue(), + } + + data, err := typeurl.UnmarshalAny(newAny) + if err != nil { + nlog.Fatalf("failed to unmarshal metrics data: %v", err) + } + + ans := data.(*v1.Metrics) + usageStr1 := strconv.FormatUint(ans.CPU.Usage.Total, 10) + usageStr2 := strconv.FormatUint(ans.Memory.Usage.Usage, 10) + + stats := ContainerStats{ + CPUPercentage: usageStr1, + Memory: usageStr2, + Disk: "", + Inodes: "", + } + + statsMap[containerID] = stats + } + } + + return statsMap, nil +} + +// GetMemoryUsageStats retrieves memory usage statistics (virtual, physical, swap) for a given container +func GetMaxMemoryUsageStats(pid string, cidPrefix string) (uint64, uint64, error) { + // Find the full CID based on the prefix + cid, err := findCIDByPrefix(cidPrefix) + if err != nil { + nlog.Warn("failed to find full CID by prefix", err) + return 0, 0, err + } + + // Get Virtual Memory (VmPeak) + virtualMemory, err := getVirtualMemoryUsage(pid) + if err != nil { + return 0, 0, err + } + + // Get Physical and Swap Memory (max usage in bytes) + physicalMemory, err := getPhysicalMemoryUsage(cid) + if err != nil { + return 0, 0, err + } + + return virtualMemory, physicalMemory, nil +} + +// getVirtualMemoryUsage retrieves the peak virtual memory usage for a given PID +func getVirtualMemoryUsage(pid string) (uint64, error) { + // Read /proc/[pid]/status + statusFile := filepath.Join("/proc", pid, "status") + statusData, err := os.ReadFile(statusFile) + if err != nil { + nlog.Warn("failed to read /proc/[pid]/status", err) + return 0, err + } + + // Parse VmPeak from status + lines := strings.Split(string(statusData), "\n") + for _, line := range lines { + if strings.HasPrefix(line, "VmPeak:") { + parts := strings.Fields(line) + if len(parts) == 3 { + vmPeak, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + nlog.Warn("failed to parse VmPeak value", err) + return 0, err + } + return vmPeak * 1024, nil // Convert to bytes + } + } + } + + nlog.Warn("VmPeak not found in /proc/[pid]/status") + return 0, fmt.Errorf("VmPeak not found in /proc/[pid]/status") +} + +// getPhysicalMemoryUsage retrieves the peak physical memory usage for a cgroup +func getPhysicalMemoryUsage(cid string) (uint64, error) { + // Read /sys/fs/cgroup/memory/k8s.io/[cid]/memory.max_usage_in_bytes + memMaxUsageFile := filepath.Join("/sys/fs/cgroup/memory/k8s.io", cid, "memory.max_usage_in_bytes") + memMaxUsageData, err := os.ReadFile(memMaxUsageFile) + if err != nil { + nlog.Warn("failed to read memory.max_usage_in_bytes", err) + return 0, err + } + + // Parse the max usage in bytes + physicalMemory, err := strconv.ParseUint(strings.TrimSpace(string(memMaxUsageData)), 10, 64) + if err != nil { + nlog.Warn("failed to parse memory.max_usage_in_bytes", err) + return 0, err + } + + return physicalMemory, nil +} + +func GetContainerNetIOFromProc(defaultIface, pid string) (recvBytes, xmitBytes uint64, err error) { + netDevPath := fmt.Sprintf("/proc/%s/net/dev", pid) + data, err := os.ReadFile(netDevPath) + if err != nil { + nlog.Warn("Fail to read the path", netDevPath) + return recvBytes, xmitBytes, err + } + + lines := strings.Split(string(data), "\n") + if len(lines) < 3 { + nlog.Error("unexpected format in ", netDevPath) + return recvBytes, xmitBytes, err + } + recvByteStr := "" + xmitByteStr := "" + for _, line := range lines { + if line == "" { + continue + } + + fields := strings.Fields(line) + if len(fields) < 10 { + continue + } + + iface := strings.Trim(fields[0], ":") + if iface == defaultIface { + recvByteStr = fields[1] + xmitByteStr = fields[9] + } + } + if recvByteStr == "" { + recvByteStr = "0" + } + if xmitByteStr == "" { + xmitByteStr = "0" + } + recvBytes, err = strconv.ParseUint(recvByteStr, 10, 64) + if err != nil { + nlog.Error("Error converting string to uint64:", err) + return recvBytes, xmitBytes, err + } + xmitBytes, err = strconv.ParseUint(xmitByteStr, 10, 64) + if err != nil { + nlog.Error("Error converting string to uint64:", err) + return recvBytes, xmitBytes, err + } + + return recvBytes, xmitBytes, nil +} + +func GetContainerBandwidth(curRecvBytes, preRecvBytes, curXmitBytes, preXmitBytes uint64, timeWindow float64) (recvBandwidth, xmitBandwidth float64, err error) { + recvBytesDiff := float64(curRecvBytes) - float64(preRecvBytes) + xmitBytesDiff := float64(curXmitBytes) - float64(preXmitBytes) + + recvBandwidth = (recvBytesDiff * 8) / timeWindow + xmitBandwidth = (xmitBytesDiff * 8) / timeWindow + + return recvBandwidth, xmitBandwidth, nil +} + +func findCIDByPrefix(prefix string) (string, error) { + cgroupDir := "/sys/fs/cgroup/cpu/k8s.io/" + files, err := os.ReadDir(cgroupDir) + if err != nil { + return "", err + } + + for _, file := range files { + if strings.HasPrefix(file.Name(), prefix) { + return file.Name(), nil + } + } + + return "", os.ErrNotExist +} + +func GetTotalCPUUsageStats(cidPrefix string) (uint64, error) { + //var stats CPUUsageStats + + // Find the full CID based on the prefix + cid, err := findCIDByPrefix(cidPrefix) + if err != nil { + nlog.Warn("failed to find full CID by prefix", err) + return 0, err + } + + // Read /sys/fs/cgroup/cpu/k8s.io/[cid]/cpuacct.usage + cgroupUsageFile := filepath.Join("/sys/fs/cgroup/cpu/k8s.io", cid, "cpuacct.usage") + cgroupUsageData, err := os.ReadFile(cgroupUsageFile) + if err != nil { + nlog.Warn("failed to read cpuacct.usage", err) + return 0, err + } + + // Parse cpuacct.usage + cpuUsage, err := strconv.ParseUint(strings.TrimSpace(string(cgroupUsageData)), 10, 64) + if err != nil { + nlog.Warn("failed to parse cpuacct.usage", err) + return 0, err + } + + return cpuUsage, nil +} diff --git a/pkg/ktexporter/ktexporter.go b/pkg/ktexporter/ktexporter.go new file mode 100644 index 00000000..145f89d5 --- /dev/null +++ b/pkg/ktexporter/ktexporter.go @@ -0,0 +1,89 @@ +// Copyright 2023 Ant Group Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ktexporter + +import ( + "context" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/secretflow/kuscia/pkg/ktexporter/ktmetrics" + "github.com/secretflow/kuscia/pkg/ktexporter/ktpromexporter" + "github.com/secretflow/kuscia/pkg/ktexporter/parse" + + pkgcom "github.com/secretflow/kuscia/pkg/common" + "github.com/secretflow/kuscia/pkg/metricexporter/domain" + "github.com/secretflow/kuscia/pkg/utils/nlog" +) + +var ( + ReadyChan = make(chan struct{}) +) + +func KtExporter(ctx context.Context, runMode pkgcom.RunModeType, domainID string, exportPeriod uint, port string) error { + // read the config + _, AggregationMetrics := parse.LoadMetricConfig() + clusterAddresses, _ := domain.GetClusterAddress(domainID) + localDomainName := domainID + var MetricTypes = ktpromexporter.NewMetricTypeskt() + + reg := ktpromexporter.ProduceRegister() + lastClusterMetricValues, err := ktmetrics.GetKtMetricResults(runMode, localDomainName, clusterAddresses, AggregationMetrics, exportPeriod) + if err != nil { + nlog.Warnf("Fail to get kt metric results, err: %v", err) + return err + } + // export the cluster metrics + ticker := time.NewTicker(time.Duration(exportPeriod) * time.Second) + defer ticker.Stop() + go func(runMode pkgcom.RunModeType, reg *prometheus.Registry, MetricTypes map[string]string, exportPeriods uint, _ map[string]float64) { + for range ticker.C { + // get clusterName and clusterAddress + clusterAddresses, _ = domain.GetClusterAddress(domainID) + // get cluster metrics + currentClusterMetricValues, err := ktmetrics.GetKtMetricResults(runMode, localDomainName, clusterAddresses, AggregationMetrics, exportPeriods) + if err != nil { + nlog.Warnf("Fail to get kt metric results, err: %v", err) + } + + // calculate the change values of cluster metrics + + ktpromexporter.UpdateMetrics(reg, currentClusterMetricValues, MetricTypes) + } + }(runMode, reg, MetricTypes, exportPeriod, lastClusterMetricValues) + // export to the prometheus + ktServer := http.NewServeMux() + ktServer.Handle("/ktmetrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{ + EnableOpenMetrics: true, + })) + go func() { + if err := http.ListenAndServe("0.0.0.0:"+port, ktServer); err != nil { + nlog.Error("Fail to start the metric exporterserver", err) + } + }() + defer func() { + close(ReadyChan) + nlog.Info("Start to export metrics...") + }() + + <-ctx.Done() + nlog.Info("Stopping the metric exporter...") + return nil +} diff --git a/pkg/ktexporter/ktmetrics.md b/pkg/ktexporter/ktmetrics.md new file mode 100644 index 00000000..837fc625 --- /dev/null +++ b/pkg/ktexporter/ktmetrics.md @@ -0,0 +1 @@ +现在将kusciatask的相关指标在localhost:9093/ktmetrics暴露输出,同时集成到localhost:9091/metrics中,具体类型见ktpromexporter/type.go \ No newline at end of file diff --git a/pkg/ktexporter/ktmetrics/ktmetrics.go b/pkg/ktexporter/ktmetrics/ktmetrics.go new file mode 100644 index 00000000..d69e9849 --- /dev/null +++ b/pkg/ktexporter/ktmetrics/ktmetrics.go @@ -0,0 +1,188 @@ +package ktmetrics + +import ( + "fmt" + "strings" + + "github.com/secretflow/kuscia/pkg/ktexporter/fetchmetrics" + "github.com/secretflow/kuscia/pkg/ktexporter/parse" + "github.com/secretflow/kuscia/pkg/metricexporter/aggfunc" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + + "github.com/secretflow/kuscia/pkg/utils/nlog" + + pkgcom "github.com/secretflow/kuscia/pkg/common" +) + +type NetStats struct { + RecvBytes uint64 // Bytes + XmitBytes uint64 // Bytes + RecvBW float64 // bps + XmitBW float64 // bps +} + +type KusicaTaskStats struct { + CtrStats ContainerStats + NetIO NetStats +} + +// ContainerStats holds the stats information for a container +type ContainerStats struct { + CPUPercentage string + Memory string + Disk string + Inodes string +} + +// AggregateStatistics aggregate statistics using an aggregation function +func AggregateStatistics(localDomainName string, clusterResults map[string]float64, networkResults []map[string]string, aggregationMetrics map[string]string, dstDomain string, _ uint) (map[string]float64, error) { + if len(networkResults) == 0 { + return clusterResults, nil + } + for _, taskid := range networkResults { + for metric, aggFunc := range aggregationMetrics { + metricID := localDomainName + ";" + dstDomain + ";" + metric + ";" + aggFunc + ";" + taskid["taskid"] + nlog.Infof("Generated metricID: %s", metricID) + nlog.Infof("Generated metricID: %s", metric) + nlog.Infof("Generated metricID: %s", aggFunc) + if metric != parse.KtLocalAddr && metric != parse.KtPeerAddr { + var err error + if aggFunc == parse.AggSum { + clusterResults[metricID], err = aggfunc.Sum(networkResults, metric) + } else if aggFunc == parse.AggAvg { + clusterResults[metricID], err = aggfunc.Avg(networkResults, metric) + } else if aggFunc == parse.AggMax { + clusterResults[metricID], err = aggfunc.Max(networkResults, metric) + } else if aggFunc == parse.AggMin { + clusterResults[metricID], err = aggfunc.Min(networkResults, metric) + } + if err != nil { + nlog.Warnf("Fail to get clusterResults from aggregation functions, err: %v", err) + return clusterResults, err + } + } + } + } + return clusterResults, nil +} + +// Alert an alert function that reports whether a metric value exceeds a given threshold +func Alert(metric float64, threshold float64) bool { + return metric > threshold +} + +// GetKtMetricResults Get the results of kt statistics after filtering +func GetKtMetricResults(_ pkgcom.RunModeType, localDomainName string, clusterAddresses map[string][]string, AggregationMetrics map[string]string, MonitorPeriods uint) (map[string]float64, error) { + // Initialize the result map + ktResults := make(map[string]float64) + + // Iterate over each endpoint address from clusterAddresses (without using IP) + for _, endpointAddresses := range clusterAddresses { + for _, endpointAddress := range endpointAddresses { + // Log some information + nlog.Info("Processing endpoint address: ", endpointAddress) + ktMetrics, err := GetStatisticFromKt() + if err != nil { + nlog.Warnf("Fail to get statistics from kt, err: %v", err) + return ktResults, err + } + // Get the connection name (e.g., domain or IP) + endpointName := strings.Split(endpointAddress, ":")[0] + + // Collect all ktMetrics into networkResults (no filtering) + var networkResults []map[string]string + networkResults = append(networkResults, ktMetrics...) // Add all ktMetrics data + + // Perform aggregation based on metrics and store results in ktResults + ktResults, err = AggregateStatistics(localDomainName, ktResults, networkResults, AggregationMetrics, endpointName, MonitorPeriods) + if err != nil { + nlog.Warnf("Failed to aggregate statistics for endpoint %s: %v", endpointName, err) + } + + } + } + // Return the aggregated ktResults + return ktResults, nil +} + +func GetStatisticFromKt() ([]map[string]string, error) { + var preRecvBytes, preXmitBytes uint64 + var tcpStatisticList []map[string]string + timeWindow := 1.0 + + podLister := informers.NewSharedInformerFactoryWithOptions(kubernetes.NewForConfigOrDie(nil), 0).Core().V1().Pods().Lister() + + taskToPID, err := fetchmetrics.GetKusciaTaskPID(podLister) + + if err != nil { + nlog.Error("Fail to get container PIDs", err) + return nil, err + } + + taskIDToContainerID, err := fetchmetrics.GetTaskIDToContainerID(podLister) + if err != nil { + nlog.Error("Fail to get container ID", err) + return nil, err + } + + for kusciaTaskID, containerPID := range taskToPID { + containerID, exists := taskIDToContainerID[kusciaTaskID] + if !exists || containerID == "" { + // Skip this task if no valid CID is found + continue + } + + recvBytes, xmitBytes, err := fetchmetrics.GetContainerNetIOFromProc("eth0", containerPID) + if err != nil { + nlog.Warnf("Fail to get container network IO from proc") + continue + } + + recvBW, xmitBW, err := fetchmetrics.GetContainerBandwidth(recvBytes, preRecvBytes, xmitBytes, preXmitBytes, timeWindow) + if err != nil { + nlog.Warnf("Fail to get the network bandwidth of containers") + continue + } + preRecvBytes = recvBytes + preXmitBytes = xmitBytes + ktMetrics := make(map[string]string) + ktMetrics[parse.MetricRecvBytes] = fmt.Sprintf("%d", recvBytes) + ktMetrics[parse.MetricXmitBytes] = fmt.Sprintf("%d", xmitBytes) + ktMetrics[parse.MetricRecvBw] = fmt.Sprintf("%.2f", recvBW) + ktMetrics[parse.MetricXmitBw] = fmt.Sprintf("%.2f", xmitBW) + ktMetrics["taskid"] = kusciaTaskID + + podLister := informers.NewSharedInformerFactoryWithOptions(kubernetes.NewForConfigOrDie(nil), 0).Core().V1().Pods().Lister() + + containerStats, err := fetchmetrics.GetContainerStats(podLister) + + if err != nil { + nlog.Warn("Fail to get the stats of containers") + continue + } + ktMetrics[parse.MetricCPUPercentage] = containerStats[containerID].CPUPercentage + ktMetrics[parse.MetricDisk] = containerStats[containerID].Disk + ktMetrics[parse.MetricInodes] = containerStats[containerID].Inodes + ktMetrics[parse.MetricMemory] = containerStats[containerID].Memory + + cpuUsage, err := fetchmetrics.GetTotalCPUUsageStats(containerID) + if err != nil { + nlog.Warn("Fail to get the total CPU usage stats") + continue + } + ktMetrics[parse.MetricCPUUsage] = fmt.Sprintf("%d", cpuUsage) + + virtualMemory, physicalMemory, err := fetchmetrics.GetMaxMemoryUsageStats(containerPID, containerID) + if err != nil { + nlog.Warn("Fail to get the total memory stats") + continue + } + ktMetrics[parse.MetricVirtualMemory] = fmt.Sprintf("%d", virtualMemory) + ktMetrics[parse.MetricPhysicalMemory] = fmt.Sprintf("%d", physicalMemory) + + tcpStatisticList = append(tcpStatisticList, ktMetrics) + } + + return tcpStatisticList, nil +} diff --git a/pkg/ktexporter/ktpromexporter/export.go b/pkg/ktexporter/ktpromexporter/export.go new file mode 100644 index 00000000..1690d4f7 --- /dev/null +++ b/pkg/ktexporter/ktpromexporter/export.go @@ -0,0 +1,103 @@ +// Copyright 2023 Ant Group Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package metric the function to export metrics to Prometheus +package ktpromexporter + +import ( + "strings" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + + "github.com/secretflow/kuscia/pkg/utils/nlog" + "github.com/secretflow/kuscia/pkg/utils/promeregist" +) + +var counters = make(map[string]prometheus.Counter) +var gauges = make(map[string]prometheus.Gauge) +var histograms = make(map[string]prometheus.Histogram) +var summaries = make(map[string]prometheus.Summary) + +func produceMetric(reg *prometheus.Registry, + metricID string, metricType string) *prometheus.Registry { + splitedMetric := strings.Split(metricID, ";") + labels := make(map[string]string) + labels["type"] = "kt" + labels["remote_domain"] = splitedMetric[len(splitedMetric)-4] + name := splitedMetric[len(splitedMetric)-3] + labels["aggregation_function"] = splitedMetric[len(splitedMetric)-2] + labels["taskid"] = splitedMetric[len(splitedMetric)-1] + help := name + " aggregated by " + labels["aggregation_function"] + " from kt" + nameSpace := "kt" + if metricType == "Counter" { + counters[metricID] = promeregist.ProduceCounter(nameSpace, name, help, labels) + reg.MustRegister(counters[metricID]) + } else if metricType == "Gauge" { + gauges[metricID] = promeregist.ProduceGauge(nameSpace, name, help, labels) + reg.MustRegister(gauges[metricID]) + } else if metricType == "Histogram" { + histograms[metricID] = promeregist.ProduceHistogram(nameSpace, name, help, labels) + reg.MustRegister(histograms[metricID]) + } else if metricType == "Summary" { + summaries[metricID] = promeregist.ProduceSummary(nameSpace, name, help, labels) + reg.MustRegister(summaries[metricID]) + } + return reg +} + +func ProduceRegister() *prometheus.Registry { + reg := prometheus.NewRegistry() + reg.MustRegister( + collectors.NewGoCollector(), + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + ) + return reg +} + +func UpdateMetrics(reg *prometheus.Registry, + clusterResults map[string]float64, MetricTypes map[string]string) { + for metric, val := range clusterResults { + metricID := promeregist.Formalize(metric) + splitedMetric := strings.Split(metric, ";") + var metricTypeID string + metricTypeID = splitedMetric[len(splitedMetric)-3] + metricType, ok := MetricTypes[metricTypeID] + if !ok { + nlog.Error("Fail to get metric types", ok) + } + switch metricType { + case "Counter": + if _, ok := counters[metricID]; !ok { + produceMetric(reg, metricID, metricType) + } + counters[metricID].Add(val) + case "Gauge": + if _, ok := gauges[metricID]; !ok { + produceMetric(reg, metricID, metricType) + } + gauges[metricID].Set(val) + case "Histogram": + if _, ok := histograms[metricID]; !ok { + produceMetric(reg, metricID, metricType) + } + histograms[metricID].Observe(val) + case "Summary": + if _, ok := summaries[metricID]; !ok { + produceMetric(reg, metricID, metricType) + } + summaries[metricID].Observe(val) + } + } +} diff --git a/pkg/ktexporter/ktpromexporter/type.go b/pkg/ktexporter/ktpromexporter/type.go new file mode 100644 index 00000000..5346d10a --- /dev/null +++ b/pkg/ktexporter/ktpromexporter/type.go @@ -0,0 +1,39 @@ +// Copyright 2023 Ant Group Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package metric defines the type of metrics exporting to Prometheus +package ktpromexporter + +const ( + metricCounter = "Counter" + metricGauge = "Gauge" +) + +// NewMetricTypeskt parse the metric types from a yaml file + +func NewMetricTypeskt() map[string]string { + MetricTypes := make(map[string]string) + MetricTypes["recvbw"] = metricGauge + MetricTypes["xmitbw"] = metricGauge + MetricTypes["recvbytes"] = metricGauge + MetricTypes["xmitbytes"] = metricGauge + MetricTypes["cpu_percentage"] = metricGauge + MetricTypes["virtual_memory"] = metricGauge + MetricTypes["physical_memory"] = metricGauge + MetricTypes["total_cpu_time_ns"] = metricGauge + MetricTypes["memory_usage"] = metricGauge + MetricTypes["disk_io"] = metricGauge + MetricTypes["inodes"] = metricGauge + return MetricTypes +} diff --git a/pkg/ktexporter/parse/config.go b/pkg/ktexporter/parse/config.go new file mode 100644 index 00000000..9f89c968 --- /dev/null +++ b/pkg/ktexporter/parse/config.go @@ -0,0 +1,69 @@ +// Copyright 2023 Ant Group Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package parse configures files and domain files +package parse + +// Config define the structure of the configuration file +type MonitorConfig struct { + KtMetrics []string + AggMetrics []string +} + +const ( + MetricRecvBytes = "recvbytes" + MetricXmitBytes = "xmitbytes" + MetricRecvBw = "recvbw" + MetricXmitBw = "xmitbw" + MetricCPUPercentage = "cpu_percentage" + MetricCPUUsage = "total_cpu_time_ns" + MetricVirtualMemory = "virtual_memory" + MetricPhysicalMemory = "physical_memory" + MetricMemory = "memory_usage" + MetricDisk = "disk_io" + MetricInodes = "inodes" + + KtLocalAddr = "localAddr" + KtPeerAddr = "peerAddr" + + AggSum = "sum" + AggAvg = "avg" + AggMax = "max" + AggMin = "min" + AggAlert = "alert" + AggRate = "rate" +) + +// ReadConfig read the configuration and return each entry +func LoadMetricConfig() ([]string, map[string]string) { + var config MonitorConfig + config.KtMetrics = append(config.KtMetrics, + MetricRecvBytes, + MetricXmitBytes, + MetricRecvBw, + MetricXmitBw, + MetricCPUPercentage, + MetricCPUUsage, + MetricMemory, + MetricDisk, + MetricInodes, + MetricVirtualMemory, + MetricPhysicalMemory) + aggMetrics := make(map[string]string) + config.AggMetrics = append(config.AggMetrics, AggSum, AggSum, AggSum, AggSum, AggAvg, AggAvg, AggAvg, AggSum, AggSum, AggSum, AggSum) + for i, metric := range config.KtMetrics { + aggMetrics[metric] = config.AggMetrics[i] + } + return config.KtMetrics, aggMetrics +} diff --git a/pkg/ktexporter/parse/config_test.go b/pkg/ktexporter/parse/config_test.go new file mode 100644 index 00000000..cf2c711d --- /dev/null +++ b/pkg/ktexporter/parse/config_test.go @@ -0,0 +1,56 @@ +// Copyright 2023 Ant Group Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parse + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TestLoadMetricConfig test LoadMetricConfig +func TestLoadMetricConfig(t *testing.T) { + testCases := []struct { + name string + expectedMetrics []string + expectedAgg map[string]string + }{ + { + name: "Check LoadMetricConfig output", + expectedMetrics: []string{"recvbytes", "xmitbytes", "recvbw", "xmitbw", "cpu_percentage", "total_cpu_time_ns", "memory_usage", "disk_io", "inodes", "virtual_memory", "physical_memory"}, + expectedAgg: map[string]string{ + "recvbytes": "sum", + "xmitbytes": "sum", + "recvbw": "sum", + "xmitbw": "sum", + "cpu_percentage": "avg", + "total_cpu_time_ns": "avg", + "memory_usage": "avg", + "disk_io": "sum", + "inodes": "sum", + "virtual_memory": "sum", + "physical_memory": "sum", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + metrics, agg := LoadMetricConfig() + require.Equal(t, tc.expectedMetrics, metrics, "Metrics do not match expected values") + require.Equal(t, tc.expectedAgg, agg, "Aggregated metrics do not match expected values") + }) + } +} diff --git a/pkg/metricexporter/aggfunc/aggfunc.go b/pkg/metricexporter/aggfunc/aggfunc.go new file mode 100644 index 00000000..4c841c34 --- /dev/null +++ b/pkg/metricexporter/aggfunc/aggfunc.go @@ -0,0 +1,72 @@ +package aggfunc + +import ( + "math" + "strconv" + + "github.com/secretflow/kuscia/pkg/utils/nlog" +) + +// Sum an aggregation function to sum up two network metrics +func Sum(metrics []map[string]string, key string) (float64, error) { + sum := 0.0 + for _, metric := range metrics { + val, err := strconv.ParseFloat(metric[key], 64) + if err != nil { + nlog.Warnf("fail to parse float: %s, key: %s, value: %f", metric[key], key, val) + return sum, err + } + sum += val + } + return sum, nil +} + +// Avg an aggregation function to calculate the average of two network metrics +func Avg(metrics []map[string]string, key string) (float64, error) { + sum, err := Sum(metrics, key) + if err != nil { + nlog.Warnf("Fail to get the sum of kt metrics, err: %v", err) + return sum, err + } + return sum / float64(len(metrics)), nil +} + +// Max an aggregation function to calculate the maximum of two network metrics +func Max(metrics []map[string]string, key string) (float64, error) { + max := math.MaxFloat64 * (-1) + for _, metric := range metrics { + val, err := strconv.ParseFloat(metric[key], 64) + if err != nil { + nlog.Warn("fail to parse float") + return max, err + } + if val > max { + max = val + } + } + return max, nil +} + +// Min an aggregation function to calculate the minimum of two network metrics +func Min(metrics []map[string]string, key string) (float64, error) { + min := math.MaxFloat64 + for _, metric := range metrics { + val, err := strconv.ParseFloat(metric[key], 64) + if err != nil { + nlog.Warn("fail to parse float") + return min, err + } + if val < min { + min = val + } + } + return min, nil +} + +// Rate an aggregation function to calculate the rate of a network metric between to metrics +func Rate(metric1 float64, metric2 float64) float64 { + if metric2 == 0.0 { + return 0 + } + return metric1 / metric2 +} diff --git a/pkg/ssexporter/parse/domain.go b/pkg/metricexporter/domain/domain.go similarity index 76% rename from pkg/ssexporter/parse/domain.go rename to pkg/metricexporter/domain/domain.go index 0801e6a7..5cd6943d 100644 --- a/pkg/ssexporter/parse/domain.go +++ b/pkg/metricexporter/domain/domain.go @@ -1,19 +1,4 @@ -// Copyright 2023 Ant Group Co., Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package parse configures files and domain files -package parse +package domain import ( "io/ioutil" @@ -54,7 +39,7 @@ func GetClusterAddress(domainID string) (map[string][]string, error) { // get the results of config_dump resp, err := http.Get("http://localhost:10000/config_dump?resource=dynamic_active_clusters") if err != nil { - nlog.Warnf("Fail to get the results of config_dump %s", err.Error()) + nlog.Warn("Fail to get the results of config_dump", err) return endpointAddresses, err } defer resp.Body.Close() diff --git a/pkg/metricexporter/metricexporter.go b/pkg/metricexporter/metricexporter.go index 92090568..f0b0198c 100644 --- a/pkg/metricexporter/metricexporter.go +++ b/pkg/metricexporter/metricexporter.go @@ -35,9 +35,9 @@ var ( func ListPodMetricUrls(podManager pod.Manager) (map[string]string, error) { if podManager == nil { - return nil, fmt.Errorf("podManager is not initialized") - } - + return nil, fmt.Errorf("podManager is not initialized") + } + metricUrls := map[string]string{} pods := podManager.GetPods() @@ -133,14 +133,14 @@ func combine(map1, map2 map[string]string) map[string]string { func MetricExporter(ctx context.Context, metricURLs map[string]string, port string) { nlog.Infof("Start to export metrics on port %s...", port) - if podManager != nil{ + if podManager != nil { podMetrics, err := ListPodMetricUrls(podManager) if err != nil { nlog.Errorf("Error retrieving pod metrics: %v", err) - }else{ + } else { metricURLs = combine(metricURLs, podMetrics) } - }else{ + } else { nlog.Warn("podManager is nil, skipping ListPodMetricUrls call") } diff --git a/pkg/ssexporter/parse/config.go b/pkg/ssexporter/parse/config.go index 7be423b5..eb5152c2 100644 --- a/pkg/ssexporter/parse/config.go +++ b/pkg/ssexporter/parse/config.go @@ -29,8 +29,9 @@ const ( MetricRto = "rto" MetricByteSent = "bytes_sent" MetricBytesReceived = "bytes_received" - SsLocalAddr = "localAddr" - SsPeerAddr = "peerAddr" + + SsLocalAddr = "localAddr" + SsPeerAddr = "peerAddr" AggSum = "sum" AggAvg = "avg" diff --git a/pkg/ssexporter/promexporter/export.go b/pkg/ssexporter/promexporter/export.go index 6d48d804..56d51aae 100644 --- a/pkg/ssexporter/promexporter/export.go +++ b/pkg/ssexporter/promexporter/export.go @@ -22,47 +22,9 @@ import ( "github.com/prometheus/client_golang/prometheus/collectors" "github.com/secretflow/kuscia/pkg/utils/nlog" + "github.com/secretflow/kuscia/pkg/utils/promeregist" ) -func produceCounter(namespace string, name string, help string, labels map[string]string) prometheus.Counter { - return prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Name: name, - Help: help, - ConstLabels: labels, - }) -} - -func produceGauge(namespace string, name string, help string, labels map[string]string) prometheus.Gauge { - return prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: name, - Help: help, - ConstLabels: labels, - }) -} - -func produceHistogram(namespace string, name string, help string, labels map[string]string) prometheus.Histogram { - return prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: name, - Help: help, - ConstLabels: labels, - }) -} -func produceSummary(namespace string, name string, help string, labels map[string]string) prometheus.Summary { - return prometheus.NewSummary( - prometheus.SummaryOpts{ - Namespace: namespace, - Name: name, - Help: help, - ConstLabels: labels, - }) -} - var counters = make(map[string]prometheus.Counter) var gauges = make(map[string]prometheus.Gauge) var histograms = make(map[string]prometheus.Histogram) @@ -79,26 +41,20 @@ func produceMetric(reg *prometheus.Registry, help := name + " aggregated by " + labels["aggregation_function"] + " from ss" nameSpace := "ss" if metricType == "Counter" { - counters[metricID] = produceCounter(nameSpace, name, help, labels) + counters[metricID] = promeregist.ProduceCounter(nameSpace, name, help, labels) reg.MustRegister(counters[metricID]) } else if metricType == "Gauge" { - gauges[metricID] = produceGauge(nameSpace, name, help, labels) + gauges[metricID] = promeregist.ProduceGauge(nameSpace, name, help, labels) reg.MustRegister(gauges[metricID]) } else if metricType == "Histogram" { - histograms[metricID] = produceHistogram(nameSpace, name, help, labels) + histograms[metricID] = promeregist.ProduceHistogram(nameSpace, name, help, labels) reg.MustRegister(histograms[metricID]) } else if metricType == "Summary" { - summaries[metricID] = produceSummary(nameSpace, name, help, labels) + summaries[metricID] = promeregist.ProduceSummary(nameSpace, name, help, labels) reg.MustRegister(summaries[metricID]) } return reg } -func formalize(metric string) string { - metric = strings.Replace(metric, "-", "_", -1) - metric = strings.Replace(metric, ".", "__", -1) - metric = strings.ToLower(metric) - return metric -} func ProduceRegister() *prometheus.Registry { reg := prometheus.NewRegistry() reg.MustRegister( @@ -111,7 +67,7 @@ func ProduceRegister() *prometheus.Registry { func UpdateMetrics(reg *prometheus.Registry, clusterResults map[string]float64, MetricTypes map[string]string) { for metric, val := range clusterResults { - metricID := formalize(metric) + metricID := promeregist.Formalize(metric) splitedMetric := strings.Split(metric, ";") var metricTypeID string metricTypeID = splitedMetric[len(splitedMetric)-2] diff --git a/pkg/ssexporter/ssexporter.go b/pkg/ssexporter/ssexporter.go index a4b32958..9c4be962 100644 --- a/pkg/ssexporter/ssexporter.go +++ b/pkg/ssexporter/ssexporter.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" pkgcom "github.com/secretflow/kuscia/pkg/common" + "github.com/secretflow/kuscia/pkg/metricexporter/domain" "github.com/secretflow/kuscia/pkg/ssexporter/parse" "github.com/secretflow/kuscia/pkg/ssexporter/promexporter" "github.com/secretflow/kuscia/pkg/ssexporter/ssmetrics" @@ -36,7 +37,7 @@ var ( func SsExporter(ctx context.Context, runMode pkgcom.RunModeType, domainID string, exportPeriod uint, port string) error { // read the config _, AggregationMetrics := parse.LoadMetricConfig() - clusterAddresses, _ := parse.GetClusterAddress(domainID) + clusterAddresses, _ := domain.GetClusterAddress(domainID) localDomainName := domainID var MetricTypes = promexporter.NewMetricTypes() @@ -52,7 +53,7 @@ func SsExporter(ctx context.Context, runMode pkgcom.RunModeType, domainID string go func(runMode pkgcom.RunModeType, reg *prometheus.Registry, MetricTypes map[string]string, exportPeriods uint, lastClusterMetricValues map[string]float64) { for range ticker.C { // get clusterName and clusterAddress - clusterAddresses, _ = parse.GetClusterAddress(domainID) + clusterAddresses, _ = domain.GetClusterAddress(domainID) // get cluster metrics currentClusterMetricValues, err := ssmetrics.GetSsMetricResults(runMode, localDomainName, clusterAddresses, AggregationMetrics, exportPeriods) if err != nil { diff --git a/pkg/ssexporter/ssmetrics/ssmetrics.go b/pkg/ssexporter/ssmetrics/ssmetrics.go index f04c84e3..431e8524 100644 --- a/pkg/ssexporter/ssmetrics/ssmetrics.go +++ b/pkg/ssexporter/ssmetrics/ssmetrics.go @@ -16,13 +16,13 @@ package ssmetrics import ( - "math" "os" "os/exec" - "strconv" "strings" pkgcom "github.com/secretflow/kuscia/pkg/common" + "github.com/secretflow/kuscia/pkg/metricexporter/aggfunc" + "github.com/secretflow/kuscia/pkg/metricexporter/domain" "github.com/secretflow/kuscia/pkg/ssexporter/parse" "github.com/secretflow/kuscia/pkg/utils/nlog" ) @@ -116,70 +116,6 @@ func Filter(ssMetrics []map[string]string, srcIP string, dstIP string, srcPort s return results } -// Sum an aggregation function to sum up two network metrics -func Sum(metrics []map[string]string, key string) (float64, error) { - sum := 0.0 - for _, metric := range metrics { - val, err := strconv.ParseFloat(metric[key], 64) - if err != nil { - nlog.Warnf("fail to parse float: %s, key: %s, value: %f", metric[key], key, val) - return sum, err - } - sum += val - } - return sum, nil -} - -// Avg an aggregation function to calculate the average of two network metrics -func Avg(metrics []map[string]string, key string) (float64, error) { - sum, err := Sum(metrics, key) - if err != nil { - nlog.Warnf("Fail to get the sum of ss metrics, err: %v", err) - return sum, err - } - return sum / float64(len(metrics)), nil -} - -// Max an aggregation function to calculate the maximum of two network metrics -func Max(metrics []map[string]string, key string) (float64, error) { - max := math.MaxFloat64 * (-1) - for _, metric := range metrics { - val, err := strconv.ParseFloat(metric[key], 64) - if err != nil { - nlog.Warn("fail to parse float") - return max, err - } - if val > max { - max = val - } - } - return max, nil -} - -// Min an aggregation function to calculate the minimum of two network metrics -func Min(metrics []map[string]string, key string) (float64, error) { - min := math.MaxFloat64 - for _, metric := range metrics { - val, err := strconv.ParseFloat(metric[key], 64) - if err != nil { - nlog.Warn("fail to parse float") - return min, err - } - if val < min { - min = val - } - } - return min, nil -} - -// Rate an aggregation function to calculate the rate of a network metric between to metrics -func Rate(metric1 float64, metric2 float64) float64 { - if metric2 == 0.0 { - return 0 - } - return metric1 / metric2 -} - // Alert an alert function that reports whether a metric value exceeds a given threshold func Alert(metric float64, threshold float64) bool { return metric > threshold @@ -197,28 +133,28 @@ func AggregateStatistics(localDomainName string, clusterResults map[string]float if aggFunc == parse.AggRate { if metric == parse.MetricRetranRate { threshold := 0.0 - retranSum, err := Sum(networkResults, parse.MetricRetrans) - connectSum, err := Sum(networkResults, parse.MetricTotalConnections) + retranSum, err := aggfunc.Sum(networkResults, parse.MetricRetrans) + connectSum, err := aggfunc.Sum(networkResults, parse.MetricTotalConnections) if err != nil { return clusterResults, err } - clusterResults[metricID] = Rate(retranSum-threshold, connectSum) + clusterResults[metricID] = aggfunc.Rate(retranSum-threshold, connectSum) } } else if aggFunc == parse.AggSum { - clusterResults[metricID], err = Sum(networkResults, metric) + clusterResults[metricID], err = aggfunc.Sum(networkResults, metric) } else if aggFunc == parse.AggAvg { - clusterResults[metricID], err = Avg(networkResults, metric) + clusterResults[metricID], err = aggfunc.Avg(networkResults, metric) } else if aggFunc == parse.AggMax { - clusterResults[metricID], err = Max(networkResults, metric) + clusterResults[metricID], err = aggfunc.Max(networkResults, metric) } else if aggFunc == parse.AggMin { - clusterResults[metricID], err = Min(networkResults, metric) + clusterResults[metricID], err = aggfunc.Min(networkResults, metric) } if err != nil { nlog.Warnf("Fail to get clusterResults from aggregation functions, err: %v", err) return clusterResults, err } if metric == parse.MetricByteSent || metric == parse.MetricBytesReceived { - clusterResults[metricID] = Rate(clusterResults[metricID], float64(MonitorPeriods)) + clusterResults[metricID] = aggfunc.Rate(clusterResults[metricID], float64(MonitorPeriods)) } } } @@ -240,11 +176,11 @@ func GetSsMetricResults(runMode pkgcom.RunModeType, localDomainName string, clus nlog.Warnf("Fail to get the hostname, err: %v", err) return ssResults, err } - sourceIP := parse.GetIPFromDomain(hostName) + sourceIP := domain.GetIPFromDomain(hostName) destinationIP := make(map[string][]string) for _, endpointAddresses := range clusterAddresses { for _, endpointAddress := range endpointAddresses { - destinationIP[endpointAddress] = parse.GetIPFromDomain(strings.Split(endpointAddress, ":")[0]) + destinationIP[endpointAddress] = domain.GetIPFromDomain(strings.Split(endpointAddress, ":")[0]) } } // group metrics by the domain name diff --git a/pkg/utils/promeregist/register.go b/pkg/utils/promeregist/register.go new file mode 100644 index 00000000..e78cdfae --- /dev/null +++ b/pkg/utils/promeregist/register.go @@ -0,0 +1,53 @@ +package promeregist + +import ( + "strings" + + "github.com/prometheus/client_golang/prometheus" +) + +func ProduceCounter(namespace string, name string, help string, labels map[string]string) prometheus.Counter { + return prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: name, + Help: help, + ConstLabels: labels, + }) +} + +func ProduceGauge(namespace string, name string, help string, labels map[string]string) prometheus.Gauge { + return prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: name, + Help: help, + ConstLabels: labels, + }) +} + +func ProduceHistogram(namespace string, name string, help string, labels map[string]string) prometheus.Histogram { + return prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: name, + Help: help, + ConstLabels: labels, + }) +} +func ProduceSummary(namespace string, name string, help string, labels map[string]string) prometheus.Summary { + return prometheus.NewSummary( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: name, + Help: help, + ConstLabels: labels, + }) +} + +func Formalize(metric string) string { + metric = strings.Replace(metric, "-", "_", -1) + metric = strings.Replace(metric, ".", "__", -1) + metric = strings.ToLower(metric) + return metric +} diff --git a/scripts/templates/app_image.secretflow.yaml b/scripts/templates/app_image.secretflow.yaml index 31281182..74366f2a 100644 --- a/scripts/templates/app_image.secretflow.yaml +++ b/scripts/templates/app_image.secretflow.yaml @@ -48,9 +48,9 @@ spec: protocol: HTTP scope: Cluster workingDir: /root - metricProbe: - path: /metrics - port: {{.METRIC_PORT}} + # metricProbe: + # path: /metrics + # port: {{.METRIC_PORT}} restartPolicy: Never image: name: {{.SF_IMAGE_NAME}} diff --git a/scripts/tools/register_app_image/secretflow-image.yaml b/scripts/tools/register_app_image/secretflow-image.yaml index 62767eae..a61926a4 100644 --- a/scripts/tools/register_app_image/secretflow-image.yaml +++ b/scripts/tools/register_app_image/secretflow-image.yaml @@ -47,10 +47,10 @@ spec: - name: inference protocol: HTTP scope: Cluster - metricProbe: - httpGet: - path: /metrics - port: {{.METRIC_PORT}} + # metricProbe: + # httpGet: + # path: /metrics + # port: {{.METRIC_PORT}} workingDir: /work restartPolicy: Never image: