Skip to content

Commit

Permalink
Merge branch 'feature-pod-status'
Browse files Browse the repository at this point in the history
  • Loading branch information
rg0now committed Feb 21, 2024
2 parents fd532d8 + 3397486 commit 1b95538
Show file tree
Hide file tree
Showing 25 changed files with 2,058 additions and 464 deletions.
208 changes: 175 additions & 33 deletions cmd/stunnerctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"time"

"github.com/pion/logging"
"github.com/spf13/cobra"
Expand All @@ -29,16 +31,16 @@ import (
var (
output string
watch, all, verbose bool
jsonQuery *jsonpath.JSONPath
k8sConfigFlags *cliopt.ConfigFlags
cdsConfigFlags *cdsclient.CDSConfigFlags
podConfigFlags *cdsclient.PodConfigFlags
loggerFactory *logger.LeveledLoggerFactory
log logging.LeveledLogger

rootCmd = &cobra.Command{
Use: "stunnerctl",
Short: "A command line utility to inspect STUNner dataplane configurations.",
Long: "The stunnerctl tool is a CLI for inspecting, watching and troublehssooting the configuration of STUNner gateways",
Short: "A command line utility to inspect STUNner dataplane .",
Long: "The stunnerctl tool is a CLI for inspecting, watching and troublehssooting STUNner gateways",
DisableAutoGenTag: true,
}
)
Expand All @@ -47,7 +49,7 @@ var (
configCmd = &cobra.Command{
Use: "config",
Aliases: []string{"stunner-config"},
Short: "Gets or watches STUNner configs",
Short: "Get or watch dataplane configs",
Args: cobra.RangeArgs(0, 1),
DisableAutoGenTag: true,
Run: func(cmd *cobra.Command, args []string) {
Expand All @@ -57,23 +59,44 @@ var (
}
},
}
statusCmd = &cobra.Command{
Use: "status",
Aliases: []string{"dataplane-status"},
Short: "Read status from dataplane pods",
Args: cobra.RangeArgs(0, 1),
DisableAutoGenTag: true,
Run: func(cmd *cobra.Command, args []string) {
if err := runStatus(cmd, args); err != nil {
fmt.Println(err)
os.Exit(1)
}
},
}
)

func init() {
rootCmd.PersistentFlags().BoolVarP(&watch, "watch", "w", false, "Watch for config updates from server")
rootCmd.PersistentFlags().BoolVarP(&all, "all-namespaces", "a", false, "Consider all namespaces")
rootCmd.PersistentFlags().StringVarP(&output, "output", "o", "summary", "Output format")
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose logging, identical to -l all:DEBUG")

// Kubernetes config flags
// Kubernetes config flags: persistent, all commands
k8sConfigFlags = cliopt.NewConfigFlags(true)
k8sConfigFlags.AddFlags(rootCmd.PersistentFlags())

// CDS server discovery flags
// CDS server discovery flags: only for "config" command
cdsConfigFlags = cdsclient.NewCDSConfigFlags()
cdsConfigFlags.AddFlags(rootCmd.PersistentFlags())
cdsConfigFlags.AddFlags(configCmd.Flags())

// watch flag: only for config
configCmd.Flags().BoolVarP(&watch, "watch", "w", false, "Watch for config updates from server")

// Pod discovery flags: only for "status" command
podConfigFlags = cdsclient.NewPodConfigFlags()
podConfigFlags.AddFlags(statusCmd.Flags())

// Add commands
rootCmd.AddCommand(configCmd)
rootCmd.AddCommand(statusCmd)
}

func main() {
Expand All @@ -96,45 +119,31 @@ func runConfig(cmd *cobra.Command, args []string) error {
gwNs = *k8sConfigFlags.Namespace
}

if strings.HasPrefix(output, "jsonpath") {
as := strings.Split(output, "=")
if len(as) != 2 || as[0] != "jsonpath" {
return fmt.Errorf("invalid jsonpath output definition %q", output)
}

jsonQuery = jsonpath.New("output")

// Parse and print jsonpath
fields, err := RelaxedJSONPathExpression(as[1])
if err != nil {
return fmt.Errorf("invalid jsonpath query %w", err)
}

if err := jsonQuery.Parse(fields); err != nil {
return fmt.Errorf("cannor parse jsonpath query %w", err)
}
output = "jsonpath"
jsonQuery, output, err := ParseJSONPathFlag(output)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log.Debug("Searching for CDS server")
cdsAddr, err := cdsclient.DiscoverK8sCDSServer(ctx, k8sConfigFlags, cdsConfigFlags,
pod, err := cdsclient.DiscoverK8sCDSServer(ctx, k8sConfigFlags, cdsConfigFlags,
loggerFactory.NewLogger("cds-fwd"))
if err != nil {
return fmt.Errorf("error searching for CDS server: %w", err)
}

log.Debugf("Connecting to CDS server: %s", pod.String())
var cds cdsclient.CdsApi
cdslog := loggerFactory.NewLogger("cds-client")
if all {
cds, err = cdsclient.NewAllConfigsAPI(cdsAddr, cdslog)
cds, err = cdsclient.NewAllConfigsAPI(pod.Addr, cdslog)
} else if len(args) == 0 {
cds, err = cdsclient.NewConfigsNamespaceAPI(cdsAddr, gwNs, cdslog)
cds, err = cdsclient.NewConfigsNamespaceAPI(pod.Addr, gwNs, cdslog)
} else {
gwName := args[0]
cds, err = cdsclient.NewConfigNamespaceNameAPI(cdsAddr, gwNs, gwName, cdslog)
cds, err = cdsclient.NewConfigNamespaceNameAPI(pod.Addr, gwNs, gwName, cdslog)
}

if err != nil {
Expand Down Expand Up @@ -202,9 +211,7 @@ func runConfig(cmd *cobra.Command, args []string) error {
}
}
case "summary":
fmt.Print(string(c.Summary()))
case "status":
fallthrough
fmt.Print(c.Summary())
default:
fmt.Println(c.String())
}
Expand All @@ -213,6 +220,116 @@ func runConfig(cmd *cobra.Command, args []string) error {
return nil
}

func runStatus(cmd *cobra.Command, args []string) error {
loglevel := "all:WARN"
if verbose {
loglevel = "all:TRACE"
}
loggerFactory = logger.NewLoggerFactory(loglevel)
log = loggerFactory.NewLogger("stunnerctl")

jsonQuery, output, err := ParseJSONPathFlag(output)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

gwNs := "default"
extraLog := "in namespace default"
if k8sConfigFlags.Namespace != nil && *k8sConfigFlags.Namespace != "" {
gwNs = *k8sConfigFlags.Namespace
extraLog = fmt.Sprintf("in namespace %s", gwNs)
}
// --all-namespaces overrides -n
if all {
gwNs = ""
extraLog = "in all namespaces"
}

gw := ""
if len(args) > 0 {
gw = args[0]
}
if gwNs != "" && gw != "" {
extraLog += fmt.Sprintf("for gateway %s", gw)
}

log.Debug("Searching for dataplane pods " + extraLog)
pods, err := cdsclient.DiscoverK8sStunnerdPods(ctx, k8sConfigFlags, podConfigFlags,
gwNs, gw, loggerFactory.NewLogger("stunnerd-fwd"))
if err != nil {
return fmt.Errorf("error searching for stunnerd pods: %w", err)
}

for _, pod := range pods {
client := http.Client{
Timeout: 5 * time.Second,
}
url := fmt.Sprintf("http://%s/status", pod.Addr)
res, err := client.Get(url)
if err != nil {
log.Errorf("Error querying status for stunnerd pod at URL %q on %s: %s",
url, pod.String(), err.Error())
continue
}

if res.StatusCode != http.StatusOK {
log.Errorf("Status query failed on %s with HTTP error code %s",
pod.String(), res.Status)
continue
}

s := stnrv1.StunnerStatus{}
err = json.NewDecoder(res.Body).Decode(&s)
if err != nil {
log.Errorf("Could not decode status response: %s", err.Error())
continue
}

switch output {
case "yaml":
if out, err := yaml.Marshal(s); err != nil {
return err
} else {
fmt.Println(string(out))
}
case "json":
if out, err := json.Marshal(s); err != nil {
return err
} else {
fmt.Println(string(out))
}
case "jsonpath":
values, err := jsonQuery.FindResults(s)
if err != nil {
return err
}

if len(values) == 0 || len(values[0]) == 0 {
fmt.Println("<none>")
}

for arrIx := range values {
for valIx := range values[arrIx] {
fmt.Printf("%v\n", values[arrIx][valIx].Interface())
}
}
case "summary":
fallthrough
default:
if pod.Proxy {
fmt.Printf("%s/%s:\n\t%s\n", pod.Namespace, pod.Name, s.Summary())
} else {
fmt.Printf("%s:\n\t%s\n", pod.Addr, s.Summary())
}
}
}

return nil
}

var jsonRegexp = regexp.MustCompile(`^\{\.?([^{}]+)\}$|^\.?([^{}]+)$`)

// k8s.io/kubectl/pkg/cmd/get
Expand All @@ -235,3 +352,28 @@ func RelaxedJSONPathExpression(pathExpression string) (string, error) {
}
return fmt.Sprintf("{.%s}", fieldSpec), nil
}

func ParseJSONPathFlag(output string) (*jsonpath.JSONPath, string, error) {
if !strings.HasPrefix(output, "jsonpath") {
return nil, output, nil
}

as := strings.Split(output, "=")
if len(as) != 2 || as[0] != "jsonpath" {
return nil, output, fmt.Errorf("invalid jsonpath output definition %q", output)
}

jsonQuery := jsonpath.New("output")

// Parse and print jsonpath
fields, err := RelaxedJSONPathExpression(as[1])
if err != nil {
return nil, output, fmt.Errorf("invalid jsonpath query %w", err)
}

if err := jsonQuery.Parse(fields); err != nil {
return nil, output, fmt.Errorf("cannor parse jsonpath query %w", err)
}

return jsonQuery, "jsonpath", nil
}
41 changes: 37 additions & 4 deletions cmd/stunnerd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

flag "github.com/spf13/pflag"
cliopt "k8s.io/cli-runtime/pkg/genericclioptions"

"github.com/l7mp/stunner"
stnrv1 "github.com/l7mp/stunner/pkg/apis/v1"
Expand All @@ -19,14 +20,23 @@ import (

func main() {
os.Args[0] = "stunnerd"
var config = flag.StringP("config", "c", "", "Config origin, either a valid address in the format IP:port, or HTTP URL to the CDS server, or a proper file name URI in the format file://<path-to-config-file> (overrides: STUNNER_CONFIG_ORIGIN)")
var config = flag.StringP("config", "c", "", "Config origin, either a valid address in the format IP:port, or HTTP URL to the CDS server, or literal \"k8s\" to discover the CDS server from Kubernetes, or a proper file name URI in the format file://<path-to-config-file> (overrides: STUNNER_CONFIG_ORIGIN)")
var level = flag.StringP("log", "l", "", "Log level (format: <scope>:<level>, overrides: PION_LOG_*, default: all:INFO)")
var id = flag.StringP("id", "i", "", "Id for identifying with the CDS server (format: <namespace>/<name>, overrides: STUNNER_NAMESPACE/STUNNER_NAME, default: <default/stunnerd-hostname>)")
var watch = flag.BoolP("watch", "w", false, "Watch config file for updates (default: false)")
var udpThreadNum = flag.IntP("udp-thread-num", "u", 0,
"Number of readloop threads (CPU cores) per UDP listener. Zero disables UDP multithreading (default: 0)")
var dryRun = flag.BoolP("dry-run", "d", false, "Suppress side-effects, intended for testing (default: false)")
var verbose = flag.BoolP("verbose", "v", false, "Verbose logging, identical to <-l all:DEBUG>")

// Kubernetes config flags
k8sConfigFlags := cliopt.NewConfigFlags(true)
k8sConfigFlags.AddFlags(flag.CommandLine)

// CDS server discovery flags
cdsConfigFlags := cdsclient.NewCDSConfigFlags()
cdsConfigFlags.AddFlags(flag.CommandLine)

flag.Parse()

logLevel := stnrv1.DefaultLogLevel
Expand Down Expand Up @@ -83,13 +93,26 @@ func main() {
conf <- c

} else if !*watch {
log.Infof("loading configuration from origin %q", configOrigin)
ctx, cancel := context.WithCancel(context.Background())

if configOrigin == "k8s" {
log.Info("discovering configuration from Kubernetes")
cdsAddr, err := cdsclient.DiscoverK8sCDSServer(ctx, k8sConfigFlags, cdsConfigFlags,
st.GetLogger().NewLogger("cds-fwd"))
if err != nil {
log.Errorf("error searching for CDS server: %s", err.Error())
os.Exit(1)
}
configOrigin = cdsAddr.Addr
}

log.Infof("loading configuration from origin %q", configOrigin)
c, err := st.LoadConfig(configOrigin)
if err != nil {
log.Error(err.Error())
os.Exit(1)
}
cancel()

conf <- c

Expand All @@ -98,12 +121,22 @@ func main() {
z := cdsclient.ZeroConfig(st.GetId())
conf <- z

log.Infof("watching configuration at origin %q", configOrigin)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cancelConfigLoader = cancel

// Watch closes the channel
if configOrigin == "k8s" {
log.Info("discovering configuration from Kubernetes")
cdsAddr, err := cdsclient.DiscoverK8sCDSServer(ctx, k8sConfigFlags, cdsConfigFlags,
st.GetLogger().NewLogger("cds-fwd"))
if err != nil {
log.Errorf("error searching for CDS server: %s", err.Error())
os.Exit(1)
}
configOrigin = cdsAddr.Addr
}

log.Infof("watching configuration at origin %q", configOrigin)
if err := st.WatchConfig(ctx, configOrigin, conf); err != nil {
log.Errorf("could not run config watcher: %s", err.Error())
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/turncat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func getStunnerConfFromK8s(def string) (*stnrv1.StunnerConfig, error) {
return nil, fmt.Errorf("error searching for CDS server: %w", err)
}

cds, err := cdsclient.NewConfigNamespaceNameAPI(cdsAddr, namespace, name,
cds, err := cdsclient.NewConfigNamespaceNameAPI(cdsAddr.Addr, namespace, name,
loggerFactory.NewLogger("cds-client"))
if err != nil {
return nil, fmt.Errorf("error creating CDS client: %w", err)
Expand Down
Loading

0 comments on commit 1b95538

Please sign in to comment.