diff --git a/CHANGELOG.md b/CHANGELOG.md index 8279c67197..e47e5ede53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ Main (unreleased) - Add three new stdlib functions to_base64, from_URLbase64 and to_URLbase64 (@ravishankar15) - Add `ignore_older_than` option for local.file_match (@ravishankar15) - Add livedebugging support for `discover.relabel` (@ravishankar15) +- Performance optimization for live debugging feature (@ravishankar15) - Use a forked `github.com/goccy/go-json` module which reduces the memory consumption of an Alloy instance by 20MB. If Alloy is running certain otelcol components, this reduction will not apply. (@ptodev) diff --git a/docs/sources/reference/cli/run.md b/docs/sources/reference/cli/run.md index 4cbe811690..d572700bfb 100644 --- a/docs/sources/reference/cli/run.md +++ b/docs/sources/reference/cli/run.md @@ -38,6 +38,7 @@ The following flags are supported: * `--server.http.memory-addr`: Address to listen for [in-memory HTTP traffic][] on (default `alloy.internal:12345`). * `--server.http.listen-addr`: Address to listen for HTTP traffic on (default `127.0.0.1:12345`). * `--server.http.ui-path-prefix`: Base path where the UI is exposed (default `/`). +* `--server.http.live-debugging-buffer-stream-size`: Buffer stream size used for buffering the live debugging entries (default `1000`) * `--storage.path`: Base directory where components can store data (default `data-alloy/`). * `--disable-reporting`: Disable [data collection][] (default `false`). * `--disable-support-bundle`: Disable [support bundle][] endpoint (default `false`). diff --git a/internal/alloycli/cmd_run.go b/internal/alloycli/cmd_run.go index 7ef521ed19..e89abfe577 100644 --- a/internal/alloycli/cmd_run.go +++ b/internal/alloycli/cmd_run.go @@ -52,18 +52,19 @@ import ( func runCommand() *cobra.Command { r := &alloyRun{ - inMemoryAddr: "alloy.internal:12345", - httpListenAddr: "127.0.0.1:12345", - storagePath: "data-alloy/", - minStability: featuregate.StabilityGenerallyAvailable, - uiPrefix: "/", - disableReporting: false, - enablePprof: true, - configFormat: "alloy", - clusterAdvInterfaces: advertise.DefaultInterfaces, - clusterMaxJoinPeers: 5, - clusterRejoinInterval: 60 * time.Second, - disableSupportBundle: false, + inMemoryAddr: "alloy.internal:12345", + httpListenAddr: "127.0.0.1:12345", + storagePath: "data-alloy/", + minStability: featuregate.StabilityGenerallyAvailable, + uiPrefix: "/", + disableReporting: false, + enablePprof: true, + configFormat: "alloy", + clusterAdvInterfaces: advertise.DefaultInterfaces, + clusterMaxJoinPeers: 5, + clusterRejoinInterval: 60 * time.Second, + disableSupportBundle: false, + liveDebuggingBufferStreamSize: 1000, } cmd := &cobra.Command{ @@ -113,6 +114,8 @@ depending on the nature of the reload error. BoolVar(&r.enablePprof, "server.http.enable-pprof", r.enablePprof, "Enable /debug/pprof profiling endpoints.") cmd.Flags(). BoolVar(&r.disableSupportBundle, "server.http.disable-support-bundle", r.disableSupportBundle, "Disable /-/support support bundle retrieval.") + cmd.Flags(). + IntVar(&r.liveDebuggingBufferStreamSize, "server.http.live-debugging-buffer-stream-size", r.liveDebuggingBufferStreamSize, "Buffer stream size used for buffering the live debugging entries") // Cluster flags cmd.Flags(). @@ -161,32 +164,33 @@ depending on the nature of the reload error. } type alloyRun struct { - inMemoryAddr string - httpListenAddr string - storagePath string - minStability featuregate.Stability - uiPrefix string - enablePprof bool - disableReporting bool - clusterEnabled bool - clusterNodeName string - clusterAdvAddr string - clusterJoinAddr string - clusterDiscoverPeers string - clusterAdvInterfaces []string - clusterRejoinInterval time.Duration - clusterMaxJoinPeers int - clusterName string - clusterEnableTLS bool - clusterTLSCAPath string - clusterTLSCertPath string - clusterTLSKeyPath string - clusterTLSServerName string - configFormat string - configBypassConversionErrors bool - configExtraArgs string - enableCommunityComps bool - disableSupportBundle bool + inMemoryAddr string + httpListenAddr string + storagePath string + minStability featuregate.Stability + uiPrefix string + enablePprof bool + disableReporting bool + clusterEnabled bool + clusterNodeName string + clusterAdvAddr string + clusterJoinAddr string + clusterDiscoverPeers string + clusterAdvInterfaces []string + clusterRejoinInterval time.Duration + clusterMaxJoinPeers int + clusterName string + clusterEnableTLS bool + clusterTLSCAPath string + clusterTLSCertPath string + clusterTLSKeyPath string + clusterTLSServerName string + configFormat string + configBypassConversionErrors bool + configExtraArgs string + enableCommunityComps bool + disableSupportBundle bool + liveDebuggingBufferStreamSize int } func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error { @@ -316,8 +320,9 @@ func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error { liveDebuggingService := livedebugging.New() uiService := uiservice.New(uiservice.Options{ - UIPrefix: fr.uiPrefix, - CallbackManager: liveDebuggingService.Data().(livedebugging.CallbackManager), + UIPrefix: fr.uiPrefix, + CallbackManager: liveDebuggingService.Data().(livedebugging.CallbackManager), + LiveDebuggingBufferStreamSize: fr.liveDebuggingBufferStreamSize, }) otelService := otel_service.New(l) diff --git a/internal/service/ui/ui.go b/internal/service/ui/ui.go index ea69448a82..2683c1d878 100644 --- a/internal/service/ui/ui.go +++ b/internal/service/ui/ui.go @@ -23,8 +23,9 @@ const ServiceName = "ui" // Options are used to configure the UI service. Options are constant for the // lifetime of the UI service. type Options struct { - UIPrefix string // Path prefix to host the UI at. - CallbackManager livedebugging.CallbackManager // CallbackManager is used for live debugging in the UI. + UIPrefix string // Path prefix to host the UI at. + CallbackManager livedebugging.CallbackManager // CallbackManager is used for live debugging in the UI. + LiveDebuggingBufferStreamSize int // Buffer size for the live debugging stream channel } // Service implements the UI service. @@ -78,7 +79,7 @@ func (s *Service) Data() any { func (s *Service) ServiceHandler(host service.Host) (base string, handler http.Handler) { r := mux.NewRouter() - fa := api.NewAlloyAPI(host, s.opts.CallbackManager) + fa := api.NewAlloyAPI(host, s.opts.CallbackManager, s.opts.LiveDebuggingBufferStreamSize) fa.RegisterRoutes(path.Join(s.opts.UIPrefix, "/api/v0/web"), r) ui.RegisterRoutes(s.opts.UIPrefix, r) diff --git a/internal/web/api/api.go b/internal/web/api/api.go index 53592b744a..8e365491e5 100644 --- a/internal/web/api/api.go +++ b/internal/web/api/api.go @@ -11,6 +11,7 @@ import ( "path" "strconv" "strings" + "time" "github.com/google/uuid" "github.com/gorilla/mux" @@ -24,13 +25,14 @@ import ( // AlloyAPI is a wrapper around the component API. type AlloyAPI struct { - alloy service.Host - CallbackManager livedebugging.CallbackManager + alloy service.Host + CallbackManager livedebugging.CallbackManager + liveDebuggingBufferStreamSize int } // NewAlloyAPI instantiates a new Alloy API. -func NewAlloyAPI(alloy service.Host, CallbackManager livedebugging.CallbackManager) *AlloyAPI { - return &AlloyAPI{alloy: alloy, CallbackManager: CallbackManager} +func NewAlloyAPI(alloy service.Host, CallbackManager livedebugging.CallbackManager, liveDebuggingBufferStreamSize int) *AlloyAPI { + return &AlloyAPI{alloy: alloy, CallbackManager: CallbackManager, liveDebuggingBufferStreamSize: liveDebuggingBufferStreamSize} } // RegisterRoutes registers all the API's routes. @@ -49,7 +51,7 @@ func (a *AlloyAPI) RegisterRoutes(urlPrefix string, r *mux.Router) { r.Handle(path.Join(urlPrefix, "/remotecfg/components/{id:.+}"), httputil.CompressionHandler{Handler: getComponentHandlerRemoteCfg(a.alloy)}) r.Handle(path.Join(urlPrefix, "/peers"), httputil.CompressionHandler{Handler: getClusteringPeersHandler(a.alloy)}) - r.Handle(path.Join(urlPrefix, "/debug/{id:.+}"), liveDebugging(a.alloy, a.CallbackManager)) + r.Handle(path.Join(urlPrefix, "/debug/{id:.+}"), liveDebugging(a.alloy, a.CallbackManager, a.liveDebuggingBufferStreamSize)) } func listComponentsHandler(host service.Host) http.HandlerFunc { @@ -165,14 +167,12 @@ func getClusteringPeersHandler(host service.Host) http.HandlerFunc { } } -func liveDebugging(host service.Host, callbackManager livedebugging.CallbackManager) http.HandlerFunc { +func liveDebugging(_ service.Host, callbackManager livedebugging.CallbackManager, liveDebuggingBufferStreamSize int) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) componentID := livedebugging.ComponentID(vars["id"]) - // Buffer of 1000 entries to handle load spikes and prevent this functionality from eating up too much memory. - // TODO: in the future we may want to make this value configurable to handle heavy load - dataCh := make(chan string, 1000) + dataCh := make(chan string, liveDebuggingBufferStreamSize) ctx := r.Context() sampleProb := setSampleProb(w, r.URL.Query().Get("sampleProb")) @@ -200,9 +200,12 @@ func liveDebugging(host service.Host, callbackManager livedebugging.CallbackMana return } + flushTicker := time.NewTicker(time.Second) + defer func() { close(dataCh) callbackManager.DeleteCallback(id, componentID) + flushTicker.Stop() }() for { @@ -216,7 +219,7 @@ func liveDebugging(host service.Host, callbackManager livedebugging.CallbackMana if writeErr != nil { return } - // TODO: flushing at a regular interval might be better performance wise + case <-flushTicker.C: w.(http.Flusher).Flush() case <-ctx.Done(): return