From 6834c3a29dbdc28d0c46dacce1cc3c9c04b2dfc8 Mon Sep 17 00:00:00 2001 From: Tom Manville Date: Sat, 13 Apr 2024 02:55:20 -0700 Subject: [PATCH] Add a GRPC client/server to kando to run/check processes --- build/protoc.sh | 21 ++ docker/build/Dockerfile | 3 +- go.mod | 13 +- go.sum | 20 +- pkg/kando/kando.go | 1 + pkg/kando/process.go | 42 +++ pkg/kando/process_client.go | 43 +++ pkg/kando/process_client_create.go | 50 +++ pkg/kando/process_client_list.go | 63 ++++ pkg/kando/process_client_output.go | 47 +++ pkg/kando/process_server.go | 42 +++ pkg/kanx/client.go | 115 +++++++ pkg/kanx/kanx.pb.go | 511 +++++++++++++++++++++++++++++ pkg/kanx/kanx.proto | 42 +++ pkg/kanx/kanx_grpc.pb.go | 295 +++++++++++++++++ pkg/kanx/kanx_test.go | 271 +++++++++++++++ pkg/kanx/server.go | 215 ++++++++++++ 17 files changed, 1777 insertions(+), 17 deletions(-) create mode 100755 build/protoc.sh create mode 100644 pkg/kando/process.go create mode 100644 pkg/kando/process_client.go create mode 100644 pkg/kando/process_client_create.go create mode 100644 pkg/kando/process_client_list.go create mode 100644 pkg/kando/process_client_output.go create mode 100644 pkg/kando/process_server.go create mode 100644 pkg/kanx/client.go create mode 100644 pkg/kanx/kanx.pb.go create mode 100644 pkg/kanx/kanx.proto create mode 100644 pkg/kanx/kanx_grpc.pb.go create mode 100644 pkg/kanx/kanx_test.go create mode 100644 pkg/kanx/server.go diff --git a/build/protoc.sh b/build/protoc.sh new file mode 100755 index 00000000000..f2c4e4e1d92 --- /dev/null +++ b/build/protoc.sh @@ -0,0 +1,21 @@ +#!/bin/sh + +# Copyright 2024 The Kanister Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o errexit +set -o nounset + +find ./pkg -name "*.proto" | xargs -t -I PROTO_FILE \ + protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative PROTO_FILE diff --git a/docker/build/Dockerfile b/docker/build/Dockerfile index 4a3552f5776..53e2a21b365 100644 --- a/docker/build/Dockerfile +++ b/docker/build/Dockerfile @@ -3,7 +3,8 @@ LABEL maintainer="Tom Manville" ARG TARGETPLATFORM -RUN apt-get update && apt-get -y install apt-transport-https ca-certificates bash git gnupg2 software-properties-common curl jq wget \ +RUN apt-get update && apt-get -y install \ + apt-transport-https bash ca-certificates curl git gnupg2 jq protobuff-compiler software-properties-common wget \ && update-ca-certificates RUN curl -fsSL https://download.docker.com/linux/debian/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \ diff --git a/go.mod b/go.mod index 7cf19007278..16cda60a597 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( k8s.io/client-go v0.29.3 k8s.io/code-generator v0.29.3 k8s.io/kubectl v0.29.3 - k8s.io/utils v0.0.0-20230726121419-3b25d923346b + k8s.io/utils v0.0.0-20240310230437-4693a0247e57 sigs.k8s.io/controller-runtime v0.16.5 sigs.k8s.io/yaml v1.3.0 @@ -89,7 +89,7 @@ require ( github.com/chai2010/gettext-go v1.0.2 // indirect github.com/chmduquesne/rollinghash v4.0.0+incompatible // indirect github.com/danieljoos/wincred v1.2.1 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect @@ -178,7 +178,7 @@ require ( go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect - go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect + go.starlark.net v0.0.0-20240314022150-ee8ed142361c // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect @@ -194,8 +194,8 @@ require ( google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect - google.golang.org/grpc v1.62.1 // indirect - google.golang.org/protobuf v1.33.0 // indirect + google.golang.org/grpc v1.62.1 + google.golang.org/protobuf v1.33.0 gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/kothar/go-backblaze.v0 v0.0.0-20210124194846-35409b867216 // indirect @@ -223,11 +223,12 @@ require ( github.com/go-ole/go-ole v1.3.0 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/google/gnostic-models v0.6.8 // indirect - github.com/gorilla/websocket v1.5.0 // indirect + github.com/gorilla/websocket v1.5.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect github.com/hashicorp/cronexpr v1.1.2 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/mxk/go-vss v1.2.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect diff --git a/go.sum b/go.sum index 4e542eb3af8..7531e02d00c 100644 --- a/go.sum +++ b/go.sum @@ -141,8 +141,9 @@ github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr github.com/danieljoos/wincred v1.2.1 h1:dl9cBrupW8+r5250DYkYxocLeZ1Y4vB1kxgtjxw8GQs= github.com/danieljoos/wincred v1.2.1/go.mod h1:uGaFL9fDn3OLTvzCGulzE+SzjEe5NGlh5FdCcyfPwps= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= @@ -280,7 +281,6 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -321,8 +321,8 @@ github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3i github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is= @@ -484,8 +484,9 @@ github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA= github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo= github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo= github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7 h1:xoIK0ctDddBMnc74udxJYBqlo9Ylnsp1waqjLsnef20= github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= @@ -574,8 +575,8 @@ go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI= go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY= -go.starlark.net v0.0.0-20230525235612-a134d8f9ddca h1:VdD38733bfYv5tUZwEIskMM93VanwNIi5bIKnDrJdEY= -go.starlark.net v0.0.0-20230525235612-a134d8f9ddca/go.mod h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds= +go.starlark.net v0.0.0-20240314022150-ee8ed142361c h1:roAjH18hZcwI4hHStHbkXjF5b7UUyZ/0SG3hXNN1SjA= +go.starlark.net v0.0.0-20240314022150-ee8ed142361c/go.mod h1:YKMCv9b1WrfWmeqdV5MAuEHWsu5iC+fe6kYl2sQjdI8= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -682,7 +683,6 @@ golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20220526004731-065cf7ba2467/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= @@ -851,8 +851,8 @@ k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdz k8s.io/kubectl v0.29.3 h1:RuwyyIU42MAISRIePaa8Q7A3U74Q9P4MoJbDFz9o3us= k8s.io/kubectl v0.29.3/go.mod h1:yCxfY1dbwgVdEt2zkJ6d5NNLOhhWgTyrqACIoFhpdd4= k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= -k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= -k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0gQBEuevE/AaBsHY= +k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= sigs.k8s.io/controller-runtime v0.16.5 h1:yr1cEJbX08xsTW6XEIzT13KHHmIyX8Umvme2cULvFZw= sigs.k8s.io/controller-runtime v0.16.5/go.mod h1:j7bialYoSn142nv9sCOJmQgDXQXxnroFU4VnX/brVJ0= diff --git a/pkg/kando/kando.go b/pkg/kando/kando.go index d86f6dbf549..b02ebb8d0e8 100644 --- a/pkg/kando/kando.go +++ b/pkg/kando/kando.go @@ -53,6 +53,7 @@ func newRootCommand() *cobra.Command { rootCmd.AddCommand(newOutputCommand()) rootCmd.AddCommand(newChronicleCommand()) rootCmd.AddCommand(newStreamCommand()) + rootCmd.AddCommand(newProcessCommand()) return rootCmd } diff --git a/pkg/kando/process.go b/pkg/kando/process.go new file mode 100644 index 00000000000..fa41d8d442d --- /dev/null +++ b/pkg/kando/process.go @@ -0,0 +1,42 @@ +// Copyright 2020 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kando + +import ( + "net" + + "github.com/spf13/cobra" +) + +const ( + processAddressFlagName = "address" +) + +func newProcessCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "process ", + Short: "Manage kando processes", + } + cmd.AddCommand(newProcessServerCommand()) + cmd.AddCommand(newProcessClientCommand()) + cmd.PersistentFlags().StringP(processAddressFlagName, "a", "/tmp/kanister.sock", "The path of a unix socket of the process server") + return cmd +} + +func processAddressFlagValue(cmd *cobra.Command) (string, error) { + a := cmd.Flag(processAddressFlagName).Value.String() + _, err := net.ResolveUnixAddr("unix", a) + return a, err +} diff --git a/pkg/kando/process_client.go b/pkg/kando/process_client.go new file mode 100644 index 00000000000..b075f6354c1 --- /dev/null +++ b/pkg/kando/process_client.go @@ -0,0 +1,43 @@ +// Copyright 2020 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kando + +import ( + "github.com/spf13/cobra" +) + +const ( + processAsJSONFlagName = "as-json" +) + +func newProcessClientCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "client ", + Short: "Send commands to the process server", + } + cmd.AddCommand(newProcessClientCreateCommand()) + cmd.AddCommand(newProcessClientListCommand()) + cmd.AddCommand(newProcessClientOutputCommand()) + cmd.PersistentFlags().BoolP(processAsJSONFlagName, "j", false, "Display output as json") + return cmd +} + +func processAsJSONFlagValue(cmd *cobra.Command) bool { + b, err := cmd.Flags().GetBool(processAsJSONFlagName) + if err != nil { + panic(err.Error()) + } + return b +} diff --git a/pkg/kando/process_client_create.go b/pkg/kando/process_client_create.go new file mode 100644 index 00000000000..1aaee6ab30a --- /dev/null +++ b/pkg/kando/process_client_create.go @@ -0,0 +1,50 @@ +// Copyright 2020 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kando + +import ( + "fmt" + + "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/kanisterio/kanister/pkg/kanx" +) + +func newProcessClientCreateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "create", + Short: "create", + RunE: runProcessClientCreate, + } + return cmd +} + +func runProcessClientCreate(cmd *cobra.Command, args []string) error { + addr, err := processAddressFlagValue(cmd) + if err != nil { + return err + } + asJSON := processAsJSONFlagValue(cmd) + cmd.SilenceUsage = true + p, err := kanx.CreateProcess(cmd.Context(), addr, args[0], args[1:]) + if !asJSON { + fmt.Printf("Created process: %v\n", p) + return err + } + buf, err := protojson.Marshal(p) + fmt.Println(string(buf)) + return err +} diff --git a/pkg/kando/process_client_list.go b/pkg/kando/process_client_list.go new file mode 100644 index 00000000000..36f732cff85 --- /dev/null +++ b/pkg/kando/process_client_list.go @@ -0,0 +1,63 @@ +// Copyright 2020 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kando + +import ( + "fmt" + "io" + "os" + + "github.com/kanisterio/kanister/pkg/kanx" + "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" +) + +func newProcessClientListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "list", + RunE: runProcessClientList, + } + return cmd +} + +func runProcessClientList(cmd *cobra.Command, args []string) error { + return runProcessClientListWithOutput(os.Stdout, cmd, args) +} + +func runProcessClientListWithOutput(out io.Writer, cmd *cobra.Command, args []string) error { + addr, err := processAddressFlagValue(cmd) + if err != nil { + return err + } + asJSON := processAsJSONFlagValue(cmd) + cmd.SilenceUsage = true + ps, err := kanx.ListProcesses(cmd.Context(), addr) + if err != nil { + return err + } + for _, p := range ps { + if asJSON { + buf, err := protojson.Marshal(p) + if err != nil { + return err + } + fmt.Fprintln(out, string(buf)) + } else { + fmt.Fprintln(out, "Process: ", p.String()) + } + } + return nil +} diff --git a/pkg/kando/process_client_output.go b/pkg/kando/process_client_output.go new file mode 100644 index 00000000000..28031ddd7ac --- /dev/null +++ b/pkg/kando/process_client_output.go @@ -0,0 +1,47 @@ +// Copyright 2020 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kando + +import ( + "context" + "os" + "strconv" + + "github.com/kanisterio/kanister/pkg/kanx" + "github.com/spf13/cobra" +) + +func newProcessClientOutputCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "output", + Short: "output", + RunE: runProcessClientOutput, + } + return cmd +} + +func runProcessClientOutput(cmd *cobra.Command, args []string) error { + cmd.SetContext(context.Background()) + pid, err := strconv.Atoi(args[0]) + if err != nil { + return err + } + addr, err := processAddressFlagValue(cmd) + if err != nil { + return err + } + cmd.SilenceUsage = true + return kanx.Stdout(cmd.Context(), addr, int64(pid), os.Stdout) +} diff --git a/pkg/kando/process_server.go b/pkg/kando/process_server.go new file mode 100644 index 00000000000..4e63da276e8 --- /dev/null +++ b/pkg/kando/process_server.go @@ -0,0 +1,42 @@ +// Copyright 2020 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kando + +import ( + "context" + + "github.com/spf13/cobra" + + "github.com/kanisterio/kanister/pkg/kanx" +) + +func newProcessServerCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "server", + Short: "server", + RunE: runProcessServer, + } + return cmd +} + +func runProcessServer(cmd *cobra.Command, args []string) error { + address, err := processAddressFlagValue(cmd) + if err != nil { + return err + } + cmd.SilenceUsage = true + + return kanx.NewServer().Serve(context.Background(), address) +} diff --git a/pkg/kanx/client.go b/pkg/kanx/client.go new file mode 100644 index 00000000000..0299ec83153 --- /dev/null +++ b/pkg/kanx/client.go @@ -0,0 +1,115 @@ +package kanx + +import ( + "context" + "io" + "net" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func unixDialer(addr string, t time.Duration) (net.Conn, error) { + return net.Dial("unix", addr) +} +func newGRPCConnection(addr string) (*grpc.ClientConn, error) { + var opts []grpc.DialOption + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + opts = append(opts, grpc.WithDialer(unixDialer)) + + return grpc.Dial(addr, opts...) +} + +func CreateProcess(ctx context.Context, addr string, name string, args []string) (*Process, error) { + conn, err := newGRPCConnection(addr) + if err != nil { + return nil, err + } + defer conn.Close() + c := NewProcessServiceClient(conn) + in := &CreateProcessRequest{ + Name: name, + Args: args, + } + return c.CreateProcesses(ctx, in) +} + +func ListProcesses(ctx context.Context, addr string) ([]*Process, error) { + conn, err := newGRPCConnection(addr) + if err != nil { + return nil, err + } + defer conn.Close() + c := NewProcessServiceClient(conn) + in := &ListProcessesRequest{} + lpc, err := c.ListProcesses(ctx, in) + if err != nil { + return nil, err + } + ps := []*Process{} + for { + p, err := lpc.Recv() + switch { + case err == io.EOF: + return ps, nil + case err != nil: + return nil, err + } + ps = append(ps, p) + } +} + +func Stdout(ctx context.Context, addr string, pid int64, out io.Writer) error { + conn, err := newGRPCConnection(addr) + if err != nil { + return err + } + defer conn.Close() + c := NewProcessServiceClient(conn) + in := &ProcessOutputRequest{ + Pid: pid, + } + sc, err := c.Stdout(ctx, in) + if err != nil { + return err + } + return output(ctx, addr, out, sc) +} + +func Stderr(ctx context.Context, addr string, pid int64, out io.Writer) error { + conn, err := newGRPCConnection(addr) + if err != nil { + return err + } + defer conn.Close() + c := NewProcessServiceClient(conn) + in := &ProcessOutputRequest{ + Pid: pid, + } + sc, err := c.Stderr(ctx, in) + if err != nil { + return err + } + return output(ctx, addr, out, sc) +} + +type recver interface { + Recv() (*Output, error) +} + +func output(ctx context.Context, addr string, out io.Writer, sc recver) error { + for { + p, err := sc.Recv() + switch { + case err == io.EOF: + return nil + case err != nil: + return err + } + _, err = out.Write([]byte(p.Output)) + if err != nil { + return err + } + } +} diff --git a/pkg/kanx/kanx.pb.go b/pkg/kanx/kanx.pb.go new file mode 100644 index 00000000000..483b9a87d68 --- /dev/null +++ b/pkg/kanx/kanx.pb.go @@ -0,0 +1,511 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.12.4 +// source: pkg/kanx/kanx.proto + +package kanx + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ProcessState int32 + +const ( + ProcessState_PROCESS_STATE_UNSPECIFIED ProcessState = 0 + ProcessState_PROCESS_STATE_RUNNING ProcessState = 1 + ProcessState_PROCESS_STATE_SUCCEEDED ProcessState = 2 + ProcessState_PROCESS_STATE_FAILED ProcessState = 3 +) + +// Enum value maps for ProcessState. +var ( + ProcessState_name = map[int32]string{ + 0: "PROCESS_STATE_UNSPECIFIED", + 1: "PROCESS_STATE_RUNNING", + 2: "PROCESS_STATE_SUCCEEDED", + 3: "PROCESS_STATE_FAILED", + } + ProcessState_value = map[string]int32{ + "PROCESS_STATE_UNSPECIFIED": 0, + "PROCESS_STATE_RUNNING": 1, + "PROCESS_STATE_SUCCEEDED": 2, + "PROCESS_STATE_FAILED": 3, + } +) + +func (x ProcessState) Enum() *ProcessState { + p := new(ProcessState) + *p = x + return p +} + +func (x ProcessState) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ProcessState) Descriptor() protoreflect.EnumDescriptor { + return file_pkg_kanx_kanx_proto_enumTypes[0].Descriptor() +} + +func (ProcessState) Type() protoreflect.EnumType { + return &file_pkg_kanx_kanx_proto_enumTypes[0] +} + +func (x ProcessState) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ProcessState.Descriptor instead. +func (ProcessState) EnumDescriptor() ([]byte, []int) { + return file_pkg_kanx_kanx_proto_rawDescGZIP(), []int{0} +} + +type CreateProcessRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Args []string `protobuf:"bytes,2,rep,name=args,proto3" json:"args,omitempty"` +} + +func (x *CreateProcessRequest) Reset() { + *x = CreateProcessRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_kanx_kanx_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateProcessRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateProcessRequest) ProtoMessage() {} + +func (x *CreateProcessRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_kanx_kanx_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateProcessRequest.ProtoReflect.Descriptor instead. +func (*CreateProcessRequest) Descriptor() ([]byte, []int) { + return file_pkg_kanx_kanx_proto_rawDescGZIP(), []int{0} +} + +func (x *CreateProcessRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *CreateProcessRequest) GetArgs() []string { + if x != nil { + return x.Args + } + return nil +} + +type ListProcessesRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ListProcessesRequest) Reset() { + *x = ListProcessesRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_kanx_kanx_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListProcessesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListProcessesRequest) ProtoMessage() {} + +func (x *ListProcessesRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_kanx_kanx_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListProcessesRequest.ProtoReflect.Descriptor instead. +func (*ListProcessesRequest) Descriptor() ([]byte, []int) { + return file_pkg_kanx_kanx_proto_rawDescGZIP(), []int{1} +} + +type ProcessOutputRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Pid int64 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"` +} + +func (x *ProcessOutputRequest) Reset() { + *x = ProcessOutputRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_kanx_kanx_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ProcessOutputRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProcessOutputRequest) ProtoMessage() {} + +func (x *ProcessOutputRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_kanx_kanx_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProcessOutputRequest.ProtoReflect.Descriptor instead. +func (*ProcessOutputRequest) Descriptor() ([]byte, []int) { + return file_pkg_kanx_kanx_proto_rawDescGZIP(), []int{2} +} + +func (x *ProcessOutputRequest) GetPid() int64 { + if x != nil { + return x.Pid + } + return 0 +} + +type Process struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Pid int64 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"` + State ProcessState `protobuf:"varint,2,opt,name=state,proto3,enum=kanx.ProcessState" json:"state,omitempty"` + ExitCode int64 `protobuf:"varint,3,opt,name=exitCode,proto3" json:"exitCode,omitempty"` + ExitErr string `protobuf:"bytes,4,opt,name=exitErr,proto3" json:"exitErr,omitempty"` +} + +func (x *Process) Reset() { + *x = Process{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_kanx_kanx_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Process) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Process) ProtoMessage() {} + +func (x *Process) ProtoReflect() protoreflect.Message { + mi := &file_pkg_kanx_kanx_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Process.ProtoReflect.Descriptor instead. +func (*Process) Descriptor() ([]byte, []int) { + return file_pkg_kanx_kanx_proto_rawDescGZIP(), []int{3} +} + +func (x *Process) GetPid() int64 { + if x != nil { + return x.Pid + } + return 0 +} + +func (x *Process) GetState() ProcessState { + if x != nil { + return x.State + } + return ProcessState_PROCESS_STATE_UNSPECIFIED +} + +func (x *Process) GetExitCode() int64 { + if x != nil { + return x.ExitCode + } + return 0 +} + +func (x *Process) GetExitErr() string { + if x != nil { + return x.ExitErr + } + return "" +} + +type Output struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Output string `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` +} + +func (x *Output) Reset() { + *x = Output{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_kanx_kanx_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Output) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Output) ProtoMessage() {} + +func (x *Output) ProtoReflect() protoreflect.Message { + mi := &file_pkg_kanx_kanx_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Output.ProtoReflect.Descriptor instead. +func (*Output) Descriptor() ([]byte, []int) { + return file_pkg_kanx_kanx_proto_rawDescGZIP(), []int{4} +} + +func (x *Output) GetOutput() string { + if x != nil { + return x.Output + } + return "" +} + +var File_pkg_kanx_kanx_proto protoreflect.FileDescriptor + +var file_pkg_kanx_kanx_proto_rawDesc = []byte{ + 0x0a, 0x13, 0x70, 0x6b, 0x67, 0x2f, 0x6b, 0x61, 0x6e, 0x78, 0x2f, 0x6b, 0x61, 0x6e, 0x78, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x6b, 0x61, 0x6e, 0x78, 0x22, 0x3e, 0x0a, 0x14, 0x43, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x22, 0x16, 0x0a, 0x14, 0x4c, + 0x69, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x22, 0x28, 0x0a, 0x14, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x4f, 0x75, + 0x74, 0x70, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x70, 0x69, 0x64, 0x22, 0x7b, 0x0a, + 0x07, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x28, 0x0a, 0x05, 0x73, 0x74, + 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x6b, 0x61, 0x6e, 0x78, + 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, + 0x74, 0x61, 0x74, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x78, 0x69, 0x74, 0x43, 0x6f, 0x64, 0x65, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x65, 0x78, 0x69, 0x74, 0x43, 0x6f, 0x64, 0x65, + 0x12, 0x18, 0x0a, 0x07, 0x65, 0x78, 0x69, 0x74, 0x45, 0x72, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x65, 0x78, 0x69, 0x74, 0x45, 0x72, 0x72, 0x22, 0x20, 0x0a, 0x06, 0x4f, 0x75, + 0x74, 0x70, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x2a, 0x7f, 0x0a, 0x0c, + 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1d, 0x0a, 0x19, + 0x50, 0x52, 0x4f, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x55, 0x4e, + 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x50, + 0x52, 0x4f, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x52, 0x55, 0x4e, + 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x1b, 0x0a, 0x17, 0x50, 0x52, 0x4f, 0x43, 0x45, 0x53, + 0x53, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x53, 0x55, 0x43, 0x43, 0x45, 0x45, 0x44, 0x45, + 0x44, 0x10, 0x02, 0x12, 0x18, 0x0a, 0x14, 0x50, 0x52, 0x4f, 0x43, 0x45, 0x53, 0x53, 0x5f, 0x53, + 0x54, 0x41, 0x54, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x32, 0x80, 0x02, + 0x0a, 0x0e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x3e, 0x0a, 0x0f, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, + 0x73, 0x65, 0x73, 0x12, 0x1a, 0x2e, 0x6b, 0x61, 0x6e, 0x78, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, + 0x65, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x0d, 0x2e, 0x6b, 0x61, 0x6e, 0x78, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x22, 0x00, + 0x12, 0x3e, 0x0a, 0x0d, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, + 0x73, 0x12, 0x1a, 0x2e, 0x6b, 0x61, 0x6e, 0x78, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x72, 0x6f, + 0x63, 0x65, 0x73, 0x73, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, + 0x6b, 0x61, 0x6e, 0x78, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x22, 0x00, 0x30, 0x01, + 0x12, 0x36, 0x0a, 0x06, 0x53, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x2e, 0x6b, 0x61, 0x6e, + 0x78, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x6b, 0x61, 0x6e, 0x78, 0x2e, 0x4f, 0x75, + 0x74, 0x70, 0x75, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x06, 0x53, 0x74, 0x64, 0x65, + 0x72, 0x72, 0x12, 0x1a, 0x2e, 0x6b, 0x61, 0x6e, 0x78, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, + 0x73, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, + 0x2e, 0x6b, 0x61, 0x6e, 0x78, 0x2e, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x22, 0x00, 0x30, 0x01, + 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, + 0x61, 0x6e, 0x69, 0x73, 0x74, 0x65, 0x72, 0x69, 0x6f, 0x2f, 0x6b, 0x61, 0x6e, 0x69, 0x73, 0x74, + 0x65, 0x72, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x6b, 0x61, 0x6e, 0x78, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +} + +var ( + file_pkg_kanx_kanx_proto_rawDescOnce sync.Once + file_pkg_kanx_kanx_proto_rawDescData = file_pkg_kanx_kanx_proto_rawDesc +) + +func file_pkg_kanx_kanx_proto_rawDescGZIP() []byte { + file_pkg_kanx_kanx_proto_rawDescOnce.Do(func() { + file_pkg_kanx_kanx_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_kanx_kanx_proto_rawDescData) + }) + return file_pkg_kanx_kanx_proto_rawDescData +} + +var file_pkg_kanx_kanx_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_pkg_kanx_kanx_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_pkg_kanx_kanx_proto_goTypes = []interface{}{ + (ProcessState)(0), // 0: kanx.ProcessState + (*CreateProcessRequest)(nil), // 1: kanx.CreateProcessRequest + (*ListProcessesRequest)(nil), // 2: kanx.ListProcessesRequest + (*ProcessOutputRequest)(nil), // 3: kanx.ProcessOutputRequest + (*Process)(nil), // 4: kanx.Process + (*Output)(nil), // 5: kanx.Output +} +var file_pkg_kanx_kanx_proto_depIdxs = []int32{ + 0, // 0: kanx.Process.state:type_name -> kanx.ProcessState + 1, // 1: kanx.ProcessService.CreateProcesses:input_type -> kanx.CreateProcessRequest + 2, // 2: kanx.ProcessService.ListProcesses:input_type -> kanx.ListProcessesRequest + 3, // 3: kanx.ProcessService.Stdout:input_type -> kanx.ProcessOutputRequest + 3, // 4: kanx.ProcessService.Stderr:input_type -> kanx.ProcessOutputRequest + 4, // 5: kanx.ProcessService.CreateProcesses:output_type -> kanx.Process + 4, // 6: kanx.ProcessService.ListProcesses:output_type -> kanx.Process + 5, // 7: kanx.ProcessService.Stdout:output_type -> kanx.Output + 5, // 8: kanx.ProcessService.Stderr:output_type -> kanx.Output + 5, // [5:9] is the sub-list for method output_type + 1, // [1:5] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_pkg_kanx_kanx_proto_init() } +func file_pkg_kanx_kanx_proto_init() { + if File_pkg_kanx_kanx_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pkg_kanx_kanx_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreateProcessRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_kanx_kanx_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListProcessesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_kanx_kanx_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ProcessOutputRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_kanx_kanx_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Process); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_kanx_kanx_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Output); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkg_kanx_kanx_proto_rawDesc, + NumEnums: 1, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pkg_kanx_kanx_proto_goTypes, + DependencyIndexes: file_pkg_kanx_kanx_proto_depIdxs, + EnumInfos: file_pkg_kanx_kanx_proto_enumTypes, + MessageInfos: file_pkg_kanx_kanx_proto_msgTypes, + }.Build() + File_pkg_kanx_kanx_proto = out.File + file_pkg_kanx_kanx_proto_rawDesc = nil + file_pkg_kanx_kanx_proto_goTypes = nil + file_pkg_kanx_kanx_proto_depIdxs = nil +} diff --git a/pkg/kanx/kanx.proto b/pkg/kanx/kanx.proto new file mode 100644 index 00000000000..69ec9c3444c --- /dev/null +++ b/pkg/kanx/kanx.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +option go_package = "github.com/kanisterio/kanister/pkg/kanx"; + +package kanx; + +service ProcessService { + rpc CreateProcesses (CreateProcessRequest) returns (Process) {} + rpc ListProcesses (ListProcessesRequest) returns (stream Process) {} + rpc Stdout (ProcessOutputRequest) returns (stream Output) {} + rpc Stderr (ProcessOutputRequest) returns (stream Output) {} +} + +message CreateProcessRequest { + string name = 1; + repeated string args = 2; +} + +message ListProcessesRequest { +} + +message ProcessOutputRequest { + int64 pid = 1; +} + +message Process { + int64 pid = 1; + ProcessState state = 2; + int64 exitCode = 3; + string exitErr = 4; +} + +enum ProcessState { + PROCESS_STATE_UNSPECIFIED = 0; + PROCESS_STATE_RUNNING = 1; + PROCESS_STATE_SUCCEEDED = 2; + PROCESS_STATE_FAILED = 3; +} + +message Output { + string output = 1; +} diff --git a/pkg/kanx/kanx_grpc.pb.go b/pkg/kanx/kanx_grpc.pb.go new file mode 100644 index 00000000000..fde36543bb5 --- /dev/null +++ b/pkg/kanx/kanx_grpc.pb.go @@ -0,0 +1,295 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.12.4 +// source: pkg/kanx/kanx.proto + +package kanx + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// ProcessServiceClient is the client API for ProcessService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ProcessServiceClient interface { + CreateProcesses(ctx context.Context, in *CreateProcessRequest, opts ...grpc.CallOption) (*Process, error) + ListProcesses(ctx context.Context, in *ListProcessesRequest, opts ...grpc.CallOption) (ProcessService_ListProcessesClient, error) + Stdout(ctx context.Context, in *ProcessOutputRequest, opts ...grpc.CallOption) (ProcessService_StdoutClient, error) + Stderr(ctx context.Context, in *ProcessOutputRequest, opts ...grpc.CallOption) (ProcessService_StderrClient, error) +} + +type processServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewProcessServiceClient(cc grpc.ClientConnInterface) ProcessServiceClient { + return &processServiceClient{cc} +} + +func (c *processServiceClient) CreateProcesses(ctx context.Context, in *CreateProcessRequest, opts ...grpc.CallOption) (*Process, error) { + out := new(Process) + err := c.cc.Invoke(ctx, "/kanx.ProcessService/CreateProcesses", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *processServiceClient) ListProcesses(ctx context.Context, in *ListProcessesRequest, opts ...grpc.CallOption) (ProcessService_ListProcessesClient, error) { + stream, err := c.cc.NewStream(ctx, &ProcessService_ServiceDesc.Streams[0], "/kanx.ProcessService/ListProcesses", opts...) + if err != nil { + return nil, err + } + x := &processServiceListProcessesClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type ProcessService_ListProcessesClient interface { + Recv() (*Process, error) + grpc.ClientStream +} + +type processServiceListProcessesClient struct { + grpc.ClientStream +} + +func (x *processServiceListProcessesClient) Recv() (*Process, error) { + m := new(Process) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *processServiceClient) Stdout(ctx context.Context, in *ProcessOutputRequest, opts ...grpc.CallOption) (ProcessService_StdoutClient, error) { + stream, err := c.cc.NewStream(ctx, &ProcessService_ServiceDesc.Streams[1], "/kanx.ProcessService/Stdout", opts...) + if err != nil { + return nil, err + } + x := &processServiceStdoutClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type ProcessService_StdoutClient interface { + Recv() (*Output, error) + grpc.ClientStream +} + +type processServiceStdoutClient struct { + grpc.ClientStream +} + +func (x *processServiceStdoutClient) Recv() (*Output, error) { + m := new(Output) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *processServiceClient) Stderr(ctx context.Context, in *ProcessOutputRequest, opts ...grpc.CallOption) (ProcessService_StderrClient, error) { + stream, err := c.cc.NewStream(ctx, &ProcessService_ServiceDesc.Streams[2], "/kanx.ProcessService/Stderr", opts...) + if err != nil { + return nil, err + } + x := &processServiceStderrClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type ProcessService_StderrClient interface { + Recv() (*Output, error) + grpc.ClientStream +} + +type processServiceStderrClient struct { + grpc.ClientStream +} + +func (x *processServiceStderrClient) Recv() (*Output, error) { + m := new(Output) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// ProcessServiceServer is the server API for ProcessService service. +// All implementations must embed UnimplementedProcessServiceServer +// for forward compatibility +type ProcessServiceServer interface { + CreateProcesses(context.Context, *CreateProcessRequest) (*Process, error) + ListProcesses(*ListProcessesRequest, ProcessService_ListProcessesServer) error + Stdout(*ProcessOutputRequest, ProcessService_StdoutServer) error + Stderr(*ProcessOutputRequest, ProcessService_StderrServer) error + mustEmbedUnimplementedProcessServiceServer() +} + +// UnimplementedProcessServiceServer must be embedded to have forward compatible implementations. +type UnimplementedProcessServiceServer struct { +} + +func (UnimplementedProcessServiceServer) CreateProcesses(context.Context, *CreateProcessRequest) (*Process, error) { + return nil, status.Errorf(codes.Unimplemented, "method CreateProcesses not implemented") +} +func (UnimplementedProcessServiceServer) ListProcesses(*ListProcessesRequest, ProcessService_ListProcessesServer) error { + return status.Errorf(codes.Unimplemented, "method ListProcesses not implemented") +} +func (UnimplementedProcessServiceServer) Stdout(*ProcessOutputRequest, ProcessService_StdoutServer) error { + return status.Errorf(codes.Unimplemented, "method Stdout not implemented") +} +func (UnimplementedProcessServiceServer) Stderr(*ProcessOutputRequest, ProcessService_StderrServer) error { + return status.Errorf(codes.Unimplemented, "method Stderr not implemented") +} +func (UnimplementedProcessServiceServer) mustEmbedUnimplementedProcessServiceServer() {} + +// UnsafeProcessServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ProcessServiceServer will +// result in compilation errors. +type UnsafeProcessServiceServer interface { + mustEmbedUnimplementedProcessServiceServer() +} + +func RegisterProcessServiceServer(s grpc.ServiceRegistrar, srv ProcessServiceServer) { + s.RegisterService(&ProcessService_ServiceDesc, srv) +} + +func _ProcessService_CreateProcesses_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CreateProcessRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ProcessServiceServer).CreateProcesses(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/kanx.ProcessService/CreateProcesses", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ProcessServiceServer).CreateProcesses(ctx, req.(*CreateProcessRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _ProcessService_ListProcesses_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ListProcessesRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ProcessServiceServer).ListProcesses(m, &processServiceListProcessesServer{stream}) +} + +type ProcessService_ListProcessesServer interface { + Send(*Process) error + grpc.ServerStream +} + +type processServiceListProcessesServer struct { + grpc.ServerStream +} + +func (x *processServiceListProcessesServer) Send(m *Process) error { + return x.ServerStream.SendMsg(m) +} + +func _ProcessService_Stdout_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ProcessOutputRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ProcessServiceServer).Stdout(m, &processServiceStdoutServer{stream}) +} + +type ProcessService_StdoutServer interface { + Send(*Output) error + grpc.ServerStream +} + +type processServiceStdoutServer struct { + grpc.ServerStream +} + +func (x *processServiceStdoutServer) Send(m *Output) error { + return x.ServerStream.SendMsg(m) +} + +func _ProcessService_Stderr_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ProcessOutputRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ProcessServiceServer).Stderr(m, &processServiceStderrServer{stream}) +} + +type ProcessService_StderrServer interface { + Send(*Output) error + grpc.ServerStream +} + +type processServiceStderrServer struct { + grpc.ServerStream +} + +func (x *processServiceStderrServer) Send(m *Output) error { + return x.ServerStream.SendMsg(m) +} + +// ProcessService_ServiceDesc is the grpc.ServiceDesc for ProcessService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var ProcessService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "kanx.ProcessService", + HandlerType: (*ProcessServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "CreateProcesses", + Handler: _ProcessService_CreateProcesses_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "ListProcesses", + Handler: _ProcessService_ListProcesses_Handler, + ServerStreams: true, + }, + { + StreamName: "Stdout", + Handler: _ProcessService_Stdout_Handler, + ServerStreams: true, + }, + { + StreamName: "Stderr", + Handler: _ProcessService_Stderr_Handler, + ServerStreams: true, + }, + }, + Metadata: "pkg/kanx/kanx.proto", +} diff --git a/pkg/kanx/kanx_test.go b/pkg/kanx/kanx_test.go new file mode 100644 index 00000000000..669323515ee --- /dev/null +++ b/pkg/kanx/kanx_test.go @@ -0,0 +1,271 @@ +package kanx + +import ( + "bytes" + "context" + "io" + "os" + "path" + "sync" + "testing" + "time" + + "google.golang.org/grpc" + . "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { TestingT(t) } + +type KanXSuite struct{} + +var _ = Suite(&KanXSuite{}) + +func tmpDir(c *C) string { + d, err := os.MkdirTemp("", c.TestName()) + c.Log("Directory: ", d) + c.Assert(err, IsNil) + return d +} + +func newTestServer(dir string) *Server { + var opts []grpc.ServerOption + return &Server{ + grpcs: grpc.NewServer(opts...), + pss: &processServiceServer{ + processes: map[int64]*process{}, + outputDir: dir, + tailTickDuration: time.Nanosecond, + }, + } +} + +func serverReady(ctx context.Context, addr string) { + ctx, can := context.WithTimeout(ctx, 10*time.Second) + defer can() + for { + select { + case <-ctx.Done(): + return + default: + } + _, err := ListProcesses(ctx, addr) + if err == nil { + return + } + } +} + +func (s *KanXSuite) TestServerCancellation(c *C) { + d := tmpDir(c) + addr := path.Join(d, "kanx.sock") + ctx, can := context.WithCancel(context.Background()) + go func() { + serverReady(ctx, addr) + can() + }() + err := newTestServer(d).Serve(ctx, addr) + c.Assert(err, IsNil) +} + +func (s *KanXSuite) TestShortProcess(c *C) { + d := tmpDir(c) + addr := path.Join(d, "kanx.sock") + ctx, can := context.WithCancel(context.Background()) + defer can() + go func() { + err := newTestServer(d).Serve(ctx, addr) + c.Assert(err, IsNil) + }() + serverReady(ctx, addr) + + p, err := CreateProcess(ctx, addr, "echo", []string{"hello"}) + c.Assert(err, IsNil) + c.Assert(p.GetPid(), Not(Equals), 0) + c.Assert(p.GetState(), Equals, ProcessState_PROCESS_STATE_RUNNING) + c.Assert(p.GetExitErr(), Equals, "") + c.Assert(p.GetExitCode(), Equals, int64(0)) + + buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) + err = Stdout(ctx, addr, p.GetPid(), buf) + c.Assert(err, IsNil) + c.Assert(buf.String(), Equals, "hello\n") + + buf.Reset() + err = Stderr(ctx, addr, p.GetPid(), buf) + c.Assert(err, IsNil) + c.Assert(buf.String(), Equals, "") + + ps, err := ListProcesses(ctx, addr) + c.Assert(err, IsNil) + c.Assert(ps, HasLen, 1) + c.Assert(ps[0].GetPid(), Equals, p.GetPid()) + c.Assert(ps[0].GetState(), Equals, ProcessState_PROCESS_STATE_SUCCEEDED) + c.Assert(ps[0].GetExitErr(), Equals, "") + c.Assert(ps[0].GetExitCode(), Equals, int64(0)) +} + +func (s *KanXSuite) TestLongProcess(c *C) { + d := tmpDir(c) + addr := path.Join(d, "kanx.sock") + ctx, can := context.WithCancel(context.Background()) + defer can() + server := newTestServer(d) + go func() { + err := server.Serve(ctx, addr) + c.Assert(err, IsNil) + }() + serverReady(ctx, addr) + + p, err := CreateProcess(ctx, addr, "tail", []string{"-f", "/dev/null"}) + c.Assert(err, IsNil) + c.Assert(p.GetPid(), Not(Equals), 0) + c.Assert(p.GetState(), Equals, ProcessState_PROCESS_STATE_RUNNING) + c.Assert(p.GetExitErr(), Equals, "") + c.Assert(p.GetExitCode(), Equals, int64(0)) + + ps, err := ListProcesses(ctx, addr) + c.Assert(err, IsNil) + c.Assert(ps, HasLen, 1) + c.Assert(ps[0].GetPid(), Equals, p.GetPid()) + c.Assert(ps[0].GetState(), Equals, ProcessState_PROCESS_STATE_RUNNING) + c.Assert(ps[0].GetExitErr(), Equals, "") + c.Assert(ps[0].GetExitCode(), Equals, int64(0)) + + ctx = context.Background() + buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) + isCancelled := false + go func() { + err := Stdout(ctx, addr, p.GetPid(), buf) + c.Assert(err, IsNil) + c.Assert(buf.String(), Equals, "") + c.Assert(isCancelled, Equals, true) + }() + sp, ok := server.pss.processes[p.GetPid()] + c.Assert(ok, Equals, true) + isCancelled = true + err = sp.cmd.Process.Kill() + c.Assert(err, IsNil) + + buf.Reset() + err = Stdout(ctx, addr, p.GetPid(), buf) + c.Assert(err, IsNil) + c.Assert(buf.String(), Equals, "") + + buf.Reset() + err = Stderr(ctx, addr, p.GetPid(), buf) + c.Assert(err, IsNil) + c.Assert(buf.String(), Equals, "") +} + +func (s *KanXSuite) TestError(c *C) { + d := tmpDir(c) + addr := path.Join(d, "kanx.sock") + ctx, can := context.WithCancel(context.Background()) + defer can() + server := newTestServer(d) + go func() { + err := server.Serve(ctx, addr) + c.Assert(err, IsNil) + }() + serverReady(ctx, addr) + + p, err := CreateProcess(ctx, addr, "tail", []string{"-f", "/dev/null"}) + c.Assert(err, IsNil) + c.Assert(p.GetPid(), Not(Equals), 0) + c.Assert(p.GetState(), Equals, ProcessState_PROCESS_STATE_RUNNING) + c.Assert(p.GetExitErr(), Equals, "") + c.Assert(p.GetExitCode(), Equals, int64(0)) + + ps, err := ListProcesses(ctx, addr) + c.Assert(err, IsNil) + c.Assert(ps, HasLen, 1) + c.Assert(ps[0].GetPid(), Equals, p.GetPid()) + c.Assert(ps[0].GetState(), Equals, ProcessState_PROCESS_STATE_RUNNING) + c.Assert(ps[0].GetExitErr(), Equals, "") + c.Assert(ps[0].GetExitCode(), Equals, int64(0)) + + sp, ok := server.pss.processes[p.GetPid()] + c.Assert(ok, Equals, true) + err = sp.cmd.Process.Kill() + c.Assert(err, IsNil) + + ps, err = ListProcesses(ctx, addr) + c.Assert(err, IsNil) + c.Assert(ps, HasLen, 1) + c.Assert(ps[0].GetPid(), Equals, p.GetPid()) + c.Assert(ps[0].GetState(), Equals, ProcessState_PROCESS_STATE_FAILED) + c.Assert(ps[0].GetExitErr(), Equals, "signal: killed") + c.Assert(ps[0].GetExitCode(), Equals, int64(-1)) + + buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) + err = Stdout(ctx, addr, p.GetPid(), buf) + c.Assert(err, IsNil) + c.Assert(buf.String(), Equals, "") + + buf.Reset() + err = Stderr(ctx, addr, p.GetPid(), buf) + c.Assert(err, IsNil) + c.Assert(buf.String(), Equals, "") +} + +var _ io.Writer = (*nilWriter)(nil) + +type nilWriter struct{} + +func (nw *nilWriter) Write(p []byte) (n int, err error) { return } + +func (s *KanXSuite) TestParallelStdout(c *C) { + d := tmpDir(c) + addr := path.Join(d, "kanx.sock") + ctx, can := context.WithCancel(context.Background()) + defer can() + server := newTestServer(d) + go func() { + err := server.Serve(ctx, addr) + c.Assert(err, IsNil) + }() + serverReady(ctx, addr) + + p, err := CreateProcess(ctx, addr, "yes", nil) + c.Assert(err, IsNil) + c.Assert(p.GetPid(), Not(Equals), 0) + c.Assert(p.GetState(), Equals, ProcessState_PROCESS_STATE_RUNNING) + c.Assert(p.GetExitErr(), Equals, "") + c.Assert(p.GetExitCode(), Equals, int64(0)) + + ps, err := ListProcesses(ctx, addr) + c.Assert(err, IsNil) + c.Assert(ps, HasLen, 1) + c.Assert(ps[0].GetPid(), Equals, p.GetPid()) + c.Assert(ps[0].GetState(), Equals, ProcessState_PROCESS_STATE_RUNNING) + c.Assert(ps[0].GetExitErr(), Equals, "") + c.Assert(ps[0].GetExitCode(), Equals, int64(0)) + + nw := &nilWriter{} + wg := &sync.WaitGroup{} + defer wg.Wait() + for _, _ = range make([]struct{}, 100) { + wg.Add(1) + go func() { + defer wg.Done() + err = Stdout(ctx, addr, p.GetPid(), nw) + c.Assert(err, IsNil) + err = Stderr(ctx, addr, p.GetPid(), nw) + c.Assert(err, IsNil) + }() + } + + sp, ok := server.pss.processes[p.GetPid()] + c.Assert(ok, Equals, true) + err = sp.cmd.Process.Kill() + c.Assert(err, IsNil) + + ps, err = ListProcesses(ctx, addr) + c.Assert(err, IsNil) + c.Assert(ps, HasLen, 1) + c.Assert(ps[0].GetPid(), Equals, p.GetPid()) + c.Assert(ps[0].GetState(), Equals, ProcessState_PROCESS_STATE_FAILED) + c.Assert(ps[0].GetExitErr(), Equals, "signal: killed") + c.Assert(ps[0].GetExitCode(), Equals, int64(-1)) +} diff --git a/pkg/kanx/server.go b/pkg/kanx/server.go new file mode 100644 index 00000000000..bd6169f99d1 --- /dev/null +++ b/pkg/kanx/server.go @@ -0,0 +1,215 @@ +package kanx + +import ( + "bytes" + "context" + "fmt" + "net" + "os" + "os/exec" + "os/signal" + "syscall" + "time" + + "github.com/pkg/errors" + "google.golang.org/grpc" + + "github.com/kanisterio/kanister/pkg/field" + "github.com/kanisterio/kanister/pkg/log" +) + +type processServiceServer struct { + UnimplementedProcessServiceServer + processes map[int64]*process + outputDir string + tailTickDuration time.Duration +} + +type process struct { + cmd *exec.Cmd + doneCh chan struct{} + stdout *os.File + stderr *os.File + exitCode int + err error + cancel context.CancelFunc +} + +func newProcessServiceServer() *processServiceServer { + return &processServiceServer{ + processes: map[int64]*process{}, + tailTickDuration: 3 * time.Second, + } +} + +func (s *processServiceServer) CreateProcesses(_ context.Context, cpr *CreateProcessRequest) (*Process, error) { + stdout, err := os.CreateTemp(s.outputDir, "kando.*.stdout") + if err != nil { + return nil, err + } + stderr, err := os.CreateTemp(s.outputDir, "kando.*.stderr") + if err != nil { + return nil, err + } + // We use context.Background() here because the parameter ctx seems to get canceled when this returns. + ctx, can := context.WithCancel(context.Background()) + cmd := exec.CommandContext(ctx, cpr.GetName(), cpr.GetArgs()...) + p := &process{ + cmd: cmd, + doneCh: make(chan struct{}, 0), + stdout: stdout, + stderr: stderr, + cancel: can, + } + cmd.Stdout = p.stdout + cmd.Stderr = p.stderr + + err = cmd.Start() + if err != nil { + return nil, err + } + s.processes[int64(cmd.Process.Pid)] = p + log.Info().Print(processToProto(p).String(), field.M{"stdout": stdout.Name(), "stderr": stderr.Name()}) + go func() { + err := p.cmd.Wait() + p.err = err + if exiterr, ok := err.(*exec.ExitError); ok { + p.exitCode = exiterr.ExitCode() + } + err = stdout.Close() + if err != nil { + log.Error().WithError(err).Print("Failed to close stdout", field.M{"pid": cmd.Process.Pid}) + } + err = stderr.Close() + if err != nil { + log.Error().WithError(err).Print("Failed to close stderr", field.M{"pid": cmd.Process.Pid}) + } + close(p.doneCh) + log.Info().Print(processToProto(p).String()) + }() + return &Process{ + Pid: int64(cmd.Process.Pid), + State: ProcessState_PROCESS_STATE_RUNNING, + }, nil +} + +func (s *processServiceServer) ListProcesses(lpr *ListProcessesRequest, lps ProcessService_ListProcessesServer) error { + for _, p := range s.processes { + ps := processToProto(p) + err := lps.Send(ps) + if err != nil { + return err + } + } + return nil +} + +var processNotFoundError = fmt.Errorf("Process not found") + +func (s *processServiceServer) Stdout(por *ProcessOutputRequest, ss ProcessService_StdoutServer) error { + p, ok := s.processes[por.Pid] + if !ok { + return errors.WithStack(processNotFoundError) + } + fh, err := os.Open(p.stdout.Name()) + if err != nil { + return err + } + return s.streamOutput(ss, p, fh) +} + +func (s *processServiceServer) Stderr(por *ProcessOutputRequest, ss ProcessService_StderrServer) error { + p, ok := s.processes[por.Pid] + if !ok { + return errors.WithStack(processNotFoundError) + } + fh, err := os.Open(p.stderr.Name()) + if err != nil { + return err + } + return s.streamOutput(ss, p, fh) +} + +type sender interface { + Send(*Output) error +} + +func (s *processServiceServer) streamOutput(ss sender, p *process, fh *os.File) error { + buf := bytes.NewBuffer(make([]byte, 0, 4*1024*1024)) // 4MiB is the max size of a GRPC request + t := time.NewTicker(s.tailTickDuration) + for { + n, err := buf.ReadFrom(fh) + switch { + case err != nil: + return err + case n == 0: + select { + case <-p.doneCh: + return nil + default: + } + <-t.C + continue + } + o := &Output{Output: buf.String()} + err = ss.Send(o) + if err != nil { + return err + } + buf.Reset() + } +} + +func processToProto(p *process) *Process { + ps := &Process{ + Pid: int64(p.cmd.Process.Pid), + } + select { + case <-p.doneCh: + ps.State = ProcessState_PROCESS_STATE_SUCCEEDED + if p.err != nil { + ps.State = ProcessState_PROCESS_STATE_FAILED + ps.ExitErr = p.err.Error() + ps.ExitCode = int64(p.exitCode) + + } + default: + ps.State = ProcessState_PROCESS_STATE_RUNNING + } + return ps +} + +type Server struct { + grpcs *grpc.Server + pss *processServiceServer +} + +func NewServer() *Server { + var opts []grpc.ServerOption + return &Server{ + grpcs: grpc.NewServer(opts...), + pss: newProcessServiceServer(), + } +} + +func (s *Server) Serve(ctx context.Context, addr string) error { + stopChan := make(chan os.Signal) + signal.Notify(stopChan, syscall.SIGTERM, syscall.SIGINT) + go func() { + select { + case sig := <-stopChan: + log.Info().Print("Gracefully stopping. Received Signal", field.M{"signal": sig}) + case <-ctx.Done(): + log.Info().Print("Gracefully stopping. Context canceled") + } + s.grpcs.GracefulStop() + }() + RegisterProcessServiceServer(s.grpcs, s.pss) + lis, err := net.Listen("unix", addr) + if err != nil { + return err + } + log.Info().Print("Listening on socket", field.M{"address": lis.Addr()}) + defer os.Remove(addr) + return s.grpcs.Serve(lis) +}