diff --git a/docs/concepts.md b/docs/concepts.md index de75dc0..9caa974 100644 --- a/docs/concepts.md +++ b/docs/concepts.md @@ -4,7 +4,7 @@ BumbleBee by default uses a containerized build environment to build your BPF programs to an ELF file, then packages that in an OCI image according to our image spec. -You can then package your BPF program as standard Docker image that contains the `bee` CLI/runner in addition to your BPF programs. +Additionally, if desired, you can then package your BPF program as a standard Docker image that contains the `bee` CLI/runner in addition to your BPF programs. The end result is a standard docker image that can be distributed via standard docker-like workflows to run your BPF program anywhere you run containerized workloads, such as a K8s cluster. Note that you will need sufficient capabilities to run the image, as loading and running the BPF program is a privileged operation for most intents and purposes. @@ -17,16 +17,13 @@ $ bee build examples/tcpconnect/tcpconnect.c tcpconnect $ bee package tcpconnect bee-tcpconnect:latest SUCCESS Packaged image built and tagged at bee-tcpconnect:latest -# run the bee-tcpconnect:latest image somewhere, deploy to K8s, etc. -# this example below runs locally via `docker` but see the following paragraph for a warning on weird terminal behavior when using this exact command! -$ docker run --privileged --tty bee-tcpconnect:latest +# run the bee-tcpconnect:latest image, deploy to K8s, etc. +$ docker run --privileged bee-tcpconnect:latest ``` -Note that the `--privileged` flag is required to provide the permissions necessary and the `--tty` flag is necessary for the TUI rendered by default with `bee run`. -The `--tty` requirement will be removed shortly as we will introduce a mode that does not render the TUI. -Additionally, if you run the image as above, when you attempt to quit via your terminal may be left in a bad state. This is because the is being handled by `docker run` and not making it to the TTY. -To clear your screen, do a non-containerized run, e.g. `bee run ghcr.io/solo-io/bumblebee/tcpconnect:$(bee version)`. -Again, this will have a better UX very soon! +Note that the `--privileged` flag is used to provide the necessary permissions (alternatively this can be scoped down via capabilities through your system/orchestrator). +Since this will typically not be used interactively, by default the `CMD` for the container is `bee run --no-tty` which will not render the TUI. +Metrics can be scraped from this container to provide insight to your maps. ## BPF conventions diff --git a/docs/contributing.md b/docs/contributing.md index 5d05afe..ff6ee4b 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -59,3 +59,15 @@ You can then use the docker image in the `bee build` command, for example: ```bash bee build examples/tcpconnect/tcpconnect.c tcpconnect.o -i $DOCKER_BUILT_IMAGE ``` + +### Workflow for `bee package` dev + +``` +TAGGED_VERSION=vdev make docker-build-bee + +go run ./bee/main.go build -i ghcr.io/solo-io/bumblebee/builder:0.0.9 examples/tcpconnect/tcpconnect.c tcpconnect + +go run ./bee/main.go package tcpconnect bee-tcpconnect:v1 + +docker run --privileged bee-tcpconnect:v1 +``` diff --git a/pkg/cli/internal/commands/package/Dockerfile b/pkg/cli/internal/commands/package/Dockerfile index 0c388e6..b9de9cf 100644 --- a/pkg/cli/internal/commands/package/Dockerfile +++ b/pkg/cli/internal/commands/package/Dockerfile @@ -7,4 +7,4 @@ COPY ./store /root/.bumblebee/store/ ARG BPF_IMAGE ENV BPF_IMAGE=$BPF_IMAGE -CMD ./bee-linux-amd64 run ${BPF_IMAGE} +CMD ./bee-linux-amd64 run --no-tty ${BPF_IMAGE} diff --git a/pkg/cli/internal/commands/run/run.go b/pkg/cli/internal/commands/run/run.go index a2bc666..6e809ae 100644 --- a/pkg/cli/internal/commands/run/run.go +++ b/pkg/cli/internal/commands/run/run.go @@ -29,6 +29,7 @@ type runOptions struct { debug bool filter []string + notty bool } const filterDescription string = "Filter to apply to output from maps. Format is \"map_name,key_name,regex\" " + @@ -39,6 +40,7 @@ var stopper chan os.Signal func addToFlags(flags *pflag.FlagSet, opts *runOptions) { flags.BoolVarP(&opts.debug, "debug", "d", false, "Create a log file 'debug.log' that provides debug logs of loader and TUI execution") flags.StringSliceVarP(&opts.filter, "filter", "f", []string{}, filterDescription) + flags.BoolVar(&opts.notty, "no-tty", false, "Set to true for running without a tty allocated, so no interaction will be expected or rich output will done") } func Command(opts *options.GeneralOptions) *cobra.Command { @@ -76,22 +78,17 @@ $ bee run -f="events_hash,daddr,1.1.1.1" -f="events_ring,daddr,1.1.1.1" ghcr.io/ } func run(cmd *cobra.Command, args []string, opts *runOptions) error { - // Subscribe to signals for terminating the program. - // This is used until management of signals is passed to the TUI - stopper = make(chan os.Signal, 1) - signal.Notify(stopper, os.Interrupt, syscall.SIGTERM) - go func() { - for sig := range stopper { - if sig == os.Interrupt || sig == syscall.SIGTERM { - fmt.Println("got sigterm or interrupt") - os.Exit(0) - } - } - }() + ctx, err := buildContext(cmd.Context(), opts.debug) + if err != nil { + return err + } + contextutils.LoggerFrom(ctx).Info("starting bee run") + if opts.notty { + pterm.DisableStyling() + } - // guaranteed to be length 1 progLocation := args[0] - progReader, err := getProgram(cmd.Context(), opts.general, progLocation) + progReader, err := getProgram(ctx, opts.general, progLocation) if err != nil { return err } @@ -101,7 +98,7 @@ func run(cmd *cobra.Command, args []string, opts *runOptions) error { return fmt.Errorf("could not raise memory limit (check for sudo or setcap): %v", err) } - promProvider, err := stats.NewPrometheusMetricsProvider(cmd.Context(), &stats.PrometheusOpts{}) + promProvider, err := stats.NewPrometheusMetricsProvider(ctx, &stats.PrometheusOpts{}) if err != nil { return err } @@ -110,42 +107,51 @@ func run(cmd *cobra.Command, args []string, opts *runOptions) error { decoder.NewDecoderFactory(), promProvider, ) - - parsedELF, err := progLoader.Parse(cmd.Context(), progReader) + parsedELF, err := progLoader.Parse(ctx, progReader) if err != nil { return fmt.Errorf("could not parse BPF program: %w", err) } - // TODO: add filter to UI - filter, err := tui.BuildFilter(opts.filter, parsedELF.WatchedMaps) + tuiApp, err := buildTuiApp(&progLoader, progLocation, opts.filter, parsedELF) if err != nil { - return fmt.Errorf("could not build filter %w", err) + return err + } + loaderOpts := loader.LoadOptions{ + ParsedELF: parsedELF, + Watcher: tuiApp, } + // bail out before starting TUI if context canceled + if ctx.Err() != nil { + contextutils.LoggerFrom(ctx).Info("before calling tui.Run() context is done") + return ctx.Err() + } + if opts.notty { + fmt.Println("Calling Load...") + loaderOpts.Watcher = loader.NewNoopWatcher() + err = progLoader.Load(ctx, &loaderOpts) + return err + } else { + contextutils.LoggerFrom(ctx).Info("calling tui run()") + err = tuiApp.Run(ctx, progLoader, &loaderOpts) + contextutils.LoggerFrom(ctx).Info("after tui run()") + return err + } +} + +func buildTuiApp(loader *loader.Loader, progLocation string, filterString []string, parsedELF *loader.ParsedELF) (*tui.App, error) { + // TODO: add filter to UI + filter, err := tui.BuildFilter(filterString, parsedELF.WatchedMaps) + if err != nil { + return nil, fmt.Errorf("could not build filter %w", err) + } appOpts := tui.AppOpts{ - Loader: progLoader, ProgLocation: progLocation, ParsedELF: parsedELF, Filter: filter, } app := tui.NewApp(&appOpts) - - var sugaredLogger *zap.SugaredLogger - if opts.debug { - cfg := zap.NewDevelopmentConfig() - cfg.OutputPaths = []string{"debug.log"} - cfg.ErrorOutputPaths = []string{"debug.log"} - logger, err := cfg.Build() - if err != nil { - return fmt.Errorf("couldn't create zap logger: '%w'", err) - } - sugaredLogger = logger.Sugar() - } else { - sugaredLogger = zap.NewNop().Sugar() - } - - ctx := contextutils.WithExistingLogger(cmd.Context(), sugaredLogger) - return app.Run(ctx, progReader) + return &app, nil } func getProgram( @@ -202,3 +208,31 @@ func getProgram( return progReader, nil } + +func buildContext(ctx context.Context, debug bool) (context.Context, error) { + ctx, cancel := context.WithCancel(ctx) + stopper = make(chan os.Signal, 1) + signal.Notify(stopper, os.Interrupt, syscall.SIGTERM) + go func() { + <-stopper + fmt.Println("got sigterm or interrupt") + cancel() + }() + + var sugaredLogger *zap.SugaredLogger + if debug { + cfg := zap.NewDevelopmentConfig() + cfg.OutputPaths = []string{"debug.log"} + cfg.ErrorOutputPaths = []string{"debug.log"} + logger, err := cfg.Build() + if err != nil { + return nil, fmt.Errorf("couldn't create zap logger: '%w'", err) + } + sugaredLogger = logger.Sugar() + } else { + sugaredLogger = zap.NewNop().Sugar() + } + ctx = contextutils.WithExistingLogger(ctx, sugaredLogger) + + return ctx, nil +} diff --git a/pkg/loader/loader.go b/pkg/loader/loader.go index 7340e08..84b88d1 100644 --- a/pkg/loader/loader.go +++ b/pkg/loader/loader.go @@ -13,7 +13,6 @@ import ( "github.com/cilium/ebpf/btf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/ringbuf" - "github.com/pterm/pterm" "github.com/solo-io/bumblebee/pkg/decoder" "github.com/solo-io/bumblebee/pkg/stats" "github.com/solo-io/go-utils/contextutils" @@ -35,24 +34,6 @@ type Loader interface { Load(ctx context.Context, opts *LoadOptions) error } -type KvPair struct { - Key map[string]string - Value string - Hash uint64 -} - -type MapEntry struct { - Name string - Entry KvPair -} - -type MapWatcher interface { - NewRingBuf(name string, keys []string) - NewHashMap(name string, keys []string) - SendEntry(entry MapEntry) - PreWatchHandler() -} - type WatchedMap struct { Name string Labels []string @@ -159,67 +140,72 @@ func (l *loader) Parse(ctx context.Context, progReader io.ReaderAt) (*ParsedELF, func (l *loader) Load(ctx context.Context, opts *LoadOptions) error { // TODO: add invariant checks on opts - loaderProgress, _ := pterm.DefaultSpinner.Start("Loading BPF program and maps into Kernel") + contextutils.LoggerFrom(ctx).Info("enter Load()") + // on shutdown notify watcher we have no more entries to send + defer opts.Watcher.Close() + + // bail out before loading stuff into kernel if context canceled + if ctx.Err() != nil { + contextutils.LoggerFrom(ctx).Info("load entrypoint context is done") + return ctx.Err() + } spec := opts.ParsedELF.Spec // Load our eBPF spec into the kernel coll, err := ebpf.NewCollection(spec) if err != nil { - loaderProgress.Fail() return err } defer coll.Close() - loaderProgress.Success() - linkerProgress, _ := pterm.DefaultSpinner.Start("Linking BPF functions to associated probe/tracepoint") // For each program, add kprope/tracepoint for name, prog := range spec.Programs { - switch prog.Type { - case ebpf.Kprobe: - var kp link.Link - var err error - if strings.HasPrefix(prog.SectionName, "kretprobe/") { - kp, err = link.Kretprobe(prog.AttachTo, coll.Programs[name]) - if err != nil { - linkerProgress.Fail() - return fmt.Errorf("error attaching kretprobe '%v': %w", prog.Name, err) - } - } else { - kp, err = link.Kprobe(prog.AttachTo, coll.Programs[name]) - if err != nil { - linkerProgress.Fail() - return fmt.Errorf("error attaching kprobe '%v': %w", prog.Name, err) - } - } - defer kp.Close() - case ebpf.TracePoint: - var tp link.Link - var err error - if strings.HasPrefix(prog.SectionName, "tracepoint/") { - tokens := strings.Split(prog.AttachTo, "/") - if len(tokens) != 2 { - return fmt.Errorf("unexpected tracepoint section '%v'", prog.AttachTo) + select { + case <-ctx.Done(): + contextutils.LoggerFrom(ctx).Info("while loading progs context is done") + return ctx.Err() + default: + switch prog.Type { + case ebpf.Kprobe: + var kp link.Link + var err error + if strings.HasPrefix(prog.SectionName, "kretprobe/") { + kp, err = link.Kretprobe(prog.AttachTo, coll.Programs[name]) + if err != nil { + return fmt.Errorf("error attaching kretprobe '%v': %w", prog.Name, err) + } + } else { + kp, err = link.Kprobe(prog.AttachTo, coll.Programs[name]) + if err != nil { + return fmt.Errorf("error attaching kprobe '%v': %w", prog.Name, err) + } } - tp, err = link.Tracepoint(tokens[0], tokens[1], coll.Programs[name]) - if err != nil { - linkerProgress.Fail() - return fmt.Errorf("error attaching to tracepoint '%v': %w", prog.Name, err) + defer kp.Close() + case ebpf.TracePoint: + var tp link.Link + var err error + if strings.HasPrefix(prog.SectionName, "tracepoint/") { + tokens := strings.Split(prog.AttachTo, "/") + if len(tokens) != 2 { + return fmt.Errorf("unexpected tracepoint section '%v'", prog.AttachTo) + } + tp, err = link.Tracepoint(tokens[0], tokens[1], coll.Programs[name]) + if err != nil { + return fmt.Errorf("error attaching to tracepoint '%v': %w", prog.Name, err) + } } + defer tp.Close() + default: + return errors.New("only kprobe programs supported") } - defer tp.Close() - default: - linkerProgress.Fail() - return errors.New("only kprobe programs supported") } } - linkerProgress.Success() return l.watchMaps(ctx, opts.ParsedELF.WatchedMaps, coll, opts.Watcher) } func (l *loader) watchMaps(ctx context.Context, watchedMaps map[string]WatchedMap, coll *ebpf.Collection, watcher MapWatcher) error { - watcher.PreWatchHandler() - + contextutils.LoggerFrom(ctx).Info("enter watchMaps()") eg, ctx := errgroup.WithContext(ctx) for name, bpfMap := range watchedMaps { name := name @@ -297,7 +283,6 @@ func (l *loader) startRingBuf( for { record, err := rd.Read() - logger.Info("read...") if err != nil { if errors.Is(err, ringbuf.ErrClosed) { logger.Info("ringbuf closed...") diff --git a/pkg/loader/watcher.go b/pkg/loader/watcher.go new file mode 100644 index 0000000..13efad5 --- /dev/null +++ b/pkg/loader/watcher.go @@ -0,0 +1,38 @@ +package loader + +type KvPair struct { + Key map[string]string + Value string + Hash uint64 +} + +type MapEntry struct { + Name string + Entry KvPair +} + +type MapWatcher interface { + NewRingBuf(name string, keys []string) + NewHashMap(name string, keys []string) + SendEntry(entry MapEntry) + Close() +} + +type noopWatcher struct{} + +func (w *noopWatcher) NewRingBuf(name string, keys []string) { + // noop +} +func (w *noopWatcher) NewHashMap(name string, keys []string) { + // noop +} +func (w *noopWatcher) SendEntry(entry MapEntry) { + // noop +} +func (w *noopWatcher) Close() { + // noop +} + +func NewNoopWatcher() *noopWatcher { + return &noopWatcher{} +} diff --git a/pkg/tui/tui.go b/pkg/tui/tui.go index 80090e9..ae74c1b 100644 --- a/pkg/tui/tui.go +++ b/pkg/tui/tui.go @@ -3,7 +3,6 @@ package tui import ( "context" "fmt" - "io" "regexp" "sort" "sync" @@ -11,10 +10,11 @@ import ( "github.com/cilium/ebpf" "github.com/gdamore/tcell/v2" "github.com/mitchellh/hashstructure/v2" - "github.com/pterm/pterm" "github.com/rivo/tview" "github.com/solo-io/bumblebee/pkg/loader" "github.com/solo-io/go-utils/contextutils" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) const titleText = `[aqua] __ @@ -47,30 +47,24 @@ type MapValue struct { } type AppOpts struct { - Loader loader.Loader ProgLocation string Filter map[string]Filter ParsedELF *loader.ParsedELF } type App struct { - Entries chan loader.MapEntry - CloseChan chan struct{} + Entries chan loader.MapEntry tviewApp *tview.Application flex *tview.Flex - loader loader.Loader progLocation string filter map[string]Filter - parsedELF *loader.ParsedELF } func NewApp(opts *AppOpts) App { a := App{ - loader: opts.Loader, progLocation: opts.ProgLocation, filter: opts.Filter, - parsedELF: opts.ParsedELF, } return a } @@ -78,26 +72,13 @@ func NewApp(opts *AppOpts) App { var mapOfMaps = make(map[string]MapValue) var mapMutex = sync.RWMutex{} var currentIndex int -var preWatchChan = make(chan error, 1) -func (a *App) Run(ctx context.Context, progReader io.ReaderAt) error { - logger := contextutils.LoggerFrom(ctx) - ctx, cancel := context.WithCancel(ctx) - - var errToReturn error - closeChan := make(chan struct{}, 1) +func buildTView(logger *zap.SugaredLogger, cancel context.CancelFunc, progLocation string) (*tview.Application, *tview.Flex) { app := tview.NewApplication() app.SetInputCapture(func(event *tcell.EventKey) *tcell.EventKey { if event.Key() == tcell.KeyCtrlC || (event.Key() == tcell.KeyRune && event.Rune() == 'q') { - logger.Info("captured ctrl-c") + logger.Info("captured ctrl-c in tui, canceling context") cancel() - logger.Info("called cancel()") - // need to block here until the map watches from Load() are complete - // so that tview.App doesn't stop before they do, otherwise we get in a deadlock - // when Watch() attempts to Draw() to a stopped tview.App - <-closeChan - logger.Info("received from closeChan") - close(closeChan) } return event }) @@ -120,7 +101,7 @@ func (a *App) Run(ctx context.Context, progReader io.ReaderAt) error { fmt.Fprint(title, titleText) fetchText := tview.NewTextView().SetDynamicColors(true) - fmt.Fprintf(fetchText, "Program location: [aqua]%s", a.progLocation) + fmt.Fprintf(fetchText, "Program location: [aqua]%s", progLocation) help := tview.NewTextView().SetTextAlign(tview.AlignLeft).SetDynamicColors(true) fmt.Fprint(help, helpText) @@ -138,74 +119,62 @@ func (a *App) Run(ctx context.Context, progReader io.ReaderAt) error { header.AddItem(rightMenu, 0, 1, 1, 1, 0, 0, false) flex.AddItem(header, 10, 0, false) - a.Entries = make(chan loader.MapEntry, 20) + + return app, flex +} + +func (a *App) Close() { + close(a.Entries) +} + +func (a *App) Run(ctx context.Context, progLoader loader.Loader, loaderOpts *loader.LoadOptions) error { + logger := contextutils.LoggerFrom(ctx) + + ctx, cancel := context.WithCancel(ctx) + app, flex := buildTView(logger, cancel, a.progLocation) a.tviewApp = app a.flex = flex - a.CloseChan = closeChan - - loaderOptions := loader.LoadOptions{ - ParsedELF: a.parsedELF, - Watcher: a, - } + a.Entries = make(chan loader.MapEntry, 20) - go func() { - logger.Info("calling loader.Load()") - errToReturn = a.loader.Load(ctx, &loaderOptions) - logger.Infof("returned from Load() with err: %s", errToReturn) - // we have returned from Load(...) so we know the waitgroups on the map watches have completed - // let's close the Entries chan so a.Watch(...) will return and we will no longer call Draw() on - // the tview.App - close(a.Entries) - logger.Info("closed entries") - // send to the closeChan to signal in the case the TUI was closed via CtrlC that Load() - // has returned and no new updates will be sent to the tview.App - a.CloseChan <- struct{}{} - // send to the preWatchChan in case Load() returned from an error in the Load/Link phase - // i.e. not the Watch phase, so we don't block when loader.PreWatchHandler() hasn't been called - preWatchChan <- errToReturn - // call Stop() on the tview.App to handle the case where Load() returned with an error - // safe to call this more than once, i.e. it's ok that CtrlC handler will also Stop() the tview.App - a.tviewApp.Stop() - logger.Info("called stop") - }() - - err := <-preWatchChan - logger.Infof("received from preWatchChan with err: %s", err) - if err != nil { + eg := errgroup.Group{} + eg.Go(func() error { + logger.Info("render tui") + err := a.tviewApp.SetRoot(a.flex, true).Run() + logger.Info("tui stopped") return err - } + }) - // goroutine for updating the TUI data based on updates from loader watching maps - logger.Info("starting Watch()") - go a.watch(ctx) + eg.Go(func() error { + logger.Info("calling watch()") + a.watch(ctx) + logger.Info("returned from watch()") + return nil + }) - pterm.Info.Println("Rendering TUI...") - logger.Info("render tui") - // begin rendering the TUI - if err := a.tviewApp.SetRoot(a.flex, true).Run(); err != nil { + eg.Go(func() error { + logger.Info("calling Load()") + err := progLoader.Load(ctx, loaderOpts) + logger.Info("returned from Load()") return err - } - - logger.Infof("stopped app, errToReturn: %s", errToReturn) - return errToReturn -} + }) -func (a *App) PreWatchHandler() { - preWatchChan <- nil + err := eg.Wait() + logger.Info("after tui waitgroup") + return err } func (a *App) watch(ctx context.Context) { logger := contextutils.LoggerFrom(ctx) logger.Info("beginning Watch() loop") + // a.Entries channel will be closed by the Loader for r := range a.Entries { if mapOfMaps[r.Name].Type == ebpf.Hash { a.renderHash(ctx, r) } else if mapOfMaps[r.Name].Type == ebpf.RingBuf { a.renderRingBuf(ctx, r) } - // update the screen if the UI is still running - // don't block here as we still want to process entries as they come in, - // let the tview.App handle the synchronization of updates + // we need to queue a UI update since tview app is running in a separate goroutine + // don't block here as we still want to process entries as they come in go a.tviewApp.QueueUpdateDraw(func() {}) } logger.Info("no more entries, returning from Watch()")