From 6a57cd135d6a04f767137d3b6de02f39699c8dae Mon Sep 17 00:00:00 2001 From: George Robinson Date: Tue, 28 Jan 2025 16:53:27 +0000 Subject: [PATCH 01/15] feat: add ring for ingest-limits-frontend --- docs/sources/shared/configuration.md | 141 +++++++++++++++++++++++++++ pkg/limits/frontend/config.go | 8 +- pkg/limits/frontend/frontend.go | 34 ++++++- pkg/loki/loki.go | 5 +- pkg/loki/modules.go | 37 ++++++- 5 files changed, 218 insertions(+), 7 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index f3bebb20947d9..9a04d2c38badd 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1069,6 +1069,145 @@ ingest_limits_frontend: # CLI flag: -ingest-limits-frontend.remote-timeout [remote_timeout: | default = 1s] + lifecycler: + ring: + kvstore: + # Backend storage to use for the ring. Supported values are: consul, + # etcd, inmemory, memberlist, multi. + # CLI flag: -ingest-limits-frontend.store + [store: | default = "consul"] + + # The prefix for the keys in the store. Should end with a /. + # CLI flag: -ingest-limits-frontend.prefix + [prefix: | default = "collectors/"] + + # Configuration for a Consul client. Only applies if the selected + # kvstore is consul. + # The CLI flags prefix for this block configuration is: + # ingest-limits-frontend + [consul: ] + + # Configuration for an ETCD v3 client. Only applies if the selected + # kvstore is etcd. + # The CLI flags prefix for this block configuration is: + # ingest-limits-frontend + [etcd: ] + + multi: + # Primary backend storage used by multi-client. + # CLI flag: -ingest-limits-frontend.multi.primary + [primary: | default = ""] + + # Secondary backend storage used by multi-client. + # CLI flag: -ingest-limits-frontend.multi.secondary + [secondary: | default = ""] + + # Mirror writes to secondary store. + # CLI flag: -ingest-limits-frontend.multi.mirror-enabled + [mirror_enabled: | default = false] + + # Timeout for storing value to secondary store. + # CLI flag: -ingest-limits-frontend.multi.mirror-timeout + [mirror_timeout: | default = 2s] + + # The heartbeat timeout after which ingesters are skipped for + # reads/writes. 0 = never (timeout disabled). + # CLI flag: -ingest-limits-frontend.ring.heartbeat-timeout + [heartbeat_timeout: | default = 1m] + + # The number of ingesters to write to and read from. + # CLI flag: -ingest-limits-frontend.distributor.replication-factor + [replication_factor: | default = 3] + + # True to enable the zone-awareness and replicate ingested samples across + # different availability zones. + # CLI flag: -ingest-limits-frontend.distributor.zone-awareness-enabled + [zone_awareness_enabled: | default = false] + + # Comma-separated list of zones to exclude from the ring. Instances in + # excluded zones will be filtered out from the ring. + # CLI flag: -ingest-limits-frontend.distributor.excluded-zones + [excluded_zones: | default = ""] + + # Number of tokens for each ingester. + # CLI flag: -ingest-limits-frontend.num-tokens + [num_tokens: | default = 128] + + # Period at which to heartbeat to consul. 0 = disabled. + # CLI flag: -ingest-limits-frontend.heartbeat-period + [heartbeat_period: | default = 5s] + + # Heartbeat timeout after which instance is assumed to be unhealthy. 0 = + # disabled. + # CLI flag: -ingest-limits-frontend.heartbeat-timeout + [heartbeat_timeout: | default = 1m] + + # Observe tokens after generating to resolve collisions. Useful when using + # gossiping ring. + # CLI flag: -ingest-limits-frontend.observe-period + [observe_period: | default = 0s] + + # Period to wait for a claim from another member; will join automatically + # after this. + # CLI flag: -ingest-limits-frontend.join-after + [join_after: | default = 0s] + + # Minimum duration to wait after the internal readiness checks have passed + # but before succeeding the readiness endpoint. This is used to slowdown + # deployment controllers (eg. Kubernetes) after an instance is ready and + # before they proceed with a rolling update, to give the rest of the cluster + # instances enough time to receive ring updates. + # CLI flag: -ingest-limits-frontend.min-ready-duration + [min_ready_duration: | default = 15s] + + # Name of network interface to read address from. + # CLI flag: -ingest-limits-frontend.lifecycler.interface + [interface_names: | default = []] + + # Enable IPv6 support. Required to make use of IP addresses from IPv6 + # interfaces. + # CLI flag: -ingest-limits-frontend.enable-inet6 + [enable_inet6: | default = false] + + # Duration to sleep for before exiting, to ensure metrics are scraped. + # CLI flag: -ingest-limits-frontend.final-sleep + [final_sleep: | default = 0s] + + # File path where tokens are stored. If empty, tokens are not stored at + # shutdown and restored at startup. + # CLI flag: -ingest-limits-frontend.tokens-file-path + [tokens_file_path: | default = ""] + + # The availability zone where this instance is running. + # CLI flag: -ingest-limits-frontend.availability-zone + [availability_zone: | default = ""] + + # Unregister from the ring upon clean shutdown. It can be useful to disable + # for rolling restarts with consistent naming in conjunction with + # -distributor.extend-writes=false. + # CLI flag: -ingest-limits-frontend.unregister-on-shutdown + [unregister_on_shutdown: | default = true] + + # When enabled the readiness probe succeeds only after all instances are + # ACTIVE and healthy in the ring, otherwise only the instance itself is + # checked. This option should be disabled if in your cluster multiple + # instances can be rolled out simultaneously, otherwise rolling updates may + # be slowed down. + # CLI flag: -ingest-limits-frontend.readiness-check-ring-health + [readiness_check_ring_health: | default = true] + + # IP address to advertise in the ring. + # CLI flag: -ingest-limits-frontend.lifecycler.addr + [address: | default = ""] + + # port to advertise in consul (defaults to server.grpc-listen-port). + # CLI flag: -ingest-limits-frontend.lifecycler.port + [port: | default = 0] + + # ID to register in the ring. + # CLI flag: -ingest-limits-frontend.lifecycler.ID + [id: | default = ""] + # Configuration for 'runtime config' module, responsible for reloading runtime # configuration file. [runtime_config: ] @@ -2315,6 +2454,7 @@ Configuration for a Consul client. Only applies if the selected kvstore is `cons - `distributor.ring` - `index-gateway.ring` - `ingest-limits` +- `ingest-limits-frontend` - `ingester.partition-ring` - `pattern-ingester` - `query-scheduler.ring` @@ -2548,6 +2688,7 @@ Configuration for an ETCD v3 client. Only applies if the selected kvstore is `et - `distributor.ring` - `index-gateway.ring` - `ingest-limits` +- `ingest-limits-frontend` - `ingester.partition-ring` - `pattern-ingester` - `query-scheduler.ring` diff --git a/pkg/limits/frontend/config.go b/pkg/limits/frontend/config.go index b120856bc9591..2641f687bf4e4 100644 --- a/pkg/limits/frontend/config.go +++ b/pkg/limits/frontend/config.go @@ -3,14 +3,20 @@ package frontend import ( "flag" "fmt" + + "github.com/grafana/dskit/ring" + + util_log "github.com/grafana/loki/v3/pkg/util/log" ) type Config struct { - ClientConfig BackendClientConfig `yaml:"client_config"` + ClientConfig BackendClientConfig `yaml:"client_config"` + LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.ClientConfig.RegisterFlagsWithPrefix("ingest-limits-frontend", f) + cfg.LifecyclerConfig.RegisterFlagsWithPrefix("ingest-limits-frontend.", f, util_log.Logger) } func (cfg *Config) Validate() error { diff --git a/pkg/limits/frontend/frontend.go b/pkg/limits/frontend/frontend.go index a4ef88686bd4b..7f459d378b40e 100644 --- a/pkg/limits/frontend/frontend.go +++ b/pkg/limits/frontend/frontend.go @@ -7,6 +7,7 @@ package frontend import ( "context" + "fmt" "encoding/json" "net/http" @@ -16,6 +17,7 @@ import ( ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" + "github.com/prometheus/client_golang/prometheus" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -23,6 +25,11 @@ import ( "github.com/grafana/loki/v3/pkg/util" ) +const ( + RingKey = "ingest-limits-frontend" + RingName = "ingest-limits-frontend" +) + // Frontend is the limits-frontend service, and acts a service wrapper for // all components needed to run the limits-frontend. type Frontend struct { @@ -35,18 +42,21 @@ type Frontend struct { subservicesWatcher *services.FailureWatcher limits IngestLimitsService + + lifecycler *ring.Lifecycler + lifecyclerWatcher *services.FailureWatcher } // New returns a new Frontend. -func New(cfg Config, ringName string, ring ring.ReadRing, limits Limits, logger log.Logger, reg prometheus.Registerer) (*Frontend, error) { +func New(cfg Config, ringName string, readRing ring.ReadRing, limits Limits, logger log.Logger, reg prometheus.Registerer) (*Frontend, error) { var servs []services.Service factory := ring_client.PoolAddrFunc(func(addr string) (ring_client.PoolClient, error) { return NewIngestLimitsBackendClient(cfg.ClientConfig, addr) }) - pool := NewIngestLimitsClientPool(ringName, cfg.ClientConfig.PoolConfig, ring, factory, logger) - limitsSrv := NewRingIngestLimitsService(ring, pool, limits, logger, reg) + pool := NewIngestLimitsClientPool(ringName, cfg.ClientConfig.PoolConfig, readRing, factory, logger) + limitsSrv := NewRingIngestLimitsService(readRing, pool, limits, logger) f := &Frontend{ cfg: cfg, @@ -54,6 +64,16 @@ func New(cfg Config, ringName string, ring ring.ReadRing, limits Limits, logger limits: limitsSrv, } + var err error + f.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, f, RingName, RingKey, true, logger, reg) + if err != nil { + return nil, fmt.Errorf("failed to create %s lifecycler: %w", RingName, err) + } + // Watch the lifecycler + f.lifecyclerWatcher = services.NewFailureWatcher() + f.lifecyclerWatcher.WatchService(f.lifecycler) + + servs = append(servs, f.lifecycler) servs = append(servs, pool) mgr, err := services.NewManager(servs...) if err != nil { @@ -68,6 +88,14 @@ func New(cfg Config, ringName string, ring ring.ReadRing, limits Limits, logger return f, nil } +// Flush implements ring.FlushTransferer. It transfers state to another ingest limits frontend instance. +func (s *Frontend) Flush() {} + +// TransferOut implements ring.FlushTransferer. It transfers state to another ingest limits frontend instance. +func (s *Frontend) TransferOut(_ context.Context) error { + return nil +} + // starting implements services.Service. func (f *Frontend) starting(ctx context.Context) error { return services.StartManagerAndAwaitHealthy(ctx, f.subservices) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 989b326811077..7419700223a68 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -373,6 +373,7 @@ type Loki struct { ingestLimits *limits.IngestLimits ingestLimitsRing *ring.Ring ingestLimitsFrontend *limits_frontend.Frontend + ingestLimitsFrontendRing *ring.Ring Ingester ingester.Interface PatternIngester *pattern.Ingester PatternRingClient pattern.RingClient @@ -692,6 +693,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(MemberlistKV, t.initMemberlistKV, modules.UserInvisibleModule) mm.RegisterModule(Ring, t.initRing, modules.UserInvisibleModule) mm.RegisterModule(IngestLimitsRing, t.initIngestLimitsRing, modules.UserInvisibleModule) + mm.RegisterModule(IngestLimitsFrontendRing, t.initIngestLimitsFrontendRing, modules.UserInvisibleModule) mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule) mm.RegisterModule(OverridesExporter, t.initOverridesExporter) mm.RegisterModule(TenantConfigs, t.initTenantConfigs, modules.UserInvisibleModule) @@ -743,7 +745,8 @@ func (t *Loki) setupModuleManager() error { Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing}, IngestLimitsRing: {RuntimeConfig, Server, MemberlistKV}, IngestLimits: {MemberlistKV, Server}, - IngestLimitsFrontend: {IngestLimitsRing, Overrides, Server}, + IngestLimitsFrontend: {IngestLimitsFrontendRing, IngestLimitsRing, Overrides, Server}, + IngestLimitsFrontendRing: {RuntimeConfig, Server, MemberlistKV}, Store: {Overrides, IndexGatewayRing}, Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics, PartitionRing}, Querier: {Store, Ring, Server, IngesterQuerier, PatternRingClient, Overrides, Analytics, CacheGenerationLoader, QuerySchedulerRing}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index a87a928c42a10..951eadb391256 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -112,6 +112,7 @@ const ( IngestLimits = "ingest-limits" IngestLimitsRing = "ingest-limits-ring" IngestLimitsFrontend = "ingest-limits-frontend" + IngestLimitsFrontendRing = "ingest-limits-frontend-ring" Ingester = "ingester" PatternIngester = "pattern-ingester" PatternRingClient = "pattern-ring-client" @@ -404,8 +405,8 @@ func (t *Loki) initIngestLimitsRing() (_ services.Service, err error) { t.ingestLimitsRing, err = ring.New( t.Cfg.IngestLimits.LifecyclerConfig.RingConfig, - limits.RingName, - limits.RingKey, + limits_frontend.RingName, + limits_frontend.RingKey, util_log.Logger, reg, ) @@ -447,6 +448,38 @@ func (t *Loki) initIngestLimits() (services.Service, error) { return ingestLimits, nil } +func (t *Loki) initIngestLimitsFrontendRing() (_ services.Service, err error) { + if !t.Cfg.IngestLimits.Enabled { + return nil, nil + } + + // Members of the ring are expected to listen on their gRPC server port. + t.Cfg.IngestLimitsFrontend.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort + + reg := prometheus.WrapRegistererWithPrefix(t.Cfg.MetricsNamespace+"_", prometheus.DefaultRegisterer) + + if t.ingestLimitsFrontendRing, err = ring.New( + t.Cfg.IngestLimitsFrontend.LifecyclerConfig.RingConfig, + limits_frontend.RingName, + limits_frontend.RingKey, + util_log.Logger, + reg, + ); err != nil { + return nil, fmt.Errorf("failed to create %s ring: %w", limits_frontend.RingName, err) + } + + t.Server.HTTP.Path("/ingest-limits-frontend/ring"). + Methods("GET", "POST"). + Handler(t.ingestLimitsFrontendRing) + if t.Cfg.InternalServer.Enable { + t.InternalServer.HTTP.Path("/ingest-limits-frontend/ring"). + Methods("GET", "POST"). + Handler(t.ingestLimitsFrontendRing) + } + + return t.ingestLimitsFrontendRing, nil +} + func (t *Loki) initIngestLimitsFrontend() (services.Service, error) { if !t.Cfg.IngestLimits.Enabled { return nil, nil From 83700a6750c2314de0b5947035c8e9650acbb6ba Mon Sep 17 00:00:00 2001 From: George Robinson Date: Tue, 28 Jan 2025 17:32:32 +0000 Subject: [PATCH 02/15] Fix module initialization --- pkg/loki/loki.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 7419700223a68..eed738c7546c7 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -742,10 +742,10 @@ func (t *Loki) setupModuleManager() error { Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, Server}, TenantConfigs: {RuntimeConfig}, - Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing}, + Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing, IngestLimitsFrontendRing}, IngestLimitsRing: {RuntimeConfig, Server, MemberlistKV}, IngestLimits: {MemberlistKV, Server}, - IngestLimitsFrontend: {IngestLimitsFrontendRing, IngestLimitsRing, Overrides, Server}, + IngestLimitsFrontend: {IngestLimitsRing, Overrides, Server, MemberlistKV}, IngestLimitsFrontendRing: {RuntimeConfig, Server, MemberlistKV}, Store: {Overrides, IndexGatewayRing}, Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics, PartitionRing}, From 3dfcd66bdae4203293427a02b8f16038705ebfeb Mon Sep 17 00:00:00 2001 From: George Robinson Date: Tue, 28 Jan 2025 17:37:59 +0000 Subject: [PATCH 03/15] Fix lint --- pkg/loki/modules.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 951eadb391256..5a9fdb5887f03 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -469,8 +469,8 @@ func (t *Loki) initIngestLimitsFrontendRing() (_ services.Service, err error) { } t.Server.HTTP.Path("/ingest-limits-frontend/ring"). - Methods("GET", "POST"). - Handler(t.ingestLimitsFrontendRing) + Methods("GET", "POST"). + Handler(t.ingestLimitsFrontendRing) if t.Cfg.InternalServer.Enable { t.InternalServer.HTTP.Path("/ingest-limits-frontend/ring"). Methods("GET", "POST"). From 4fbc95cf35b899e0b0d209997a113445d0f21fce Mon Sep 17 00:00:00 2001 From: George Robinson Date: Tue, 28 Jan 2025 17:59:36 +0000 Subject: [PATCH 04/15] Fix lint again --- pkg/limits/frontend/frontend.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/limits/frontend/frontend.go b/pkg/limits/frontend/frontend.go index 7f459d378b40e..9c9a246404a09 100644 --- a/pkg/limits/frontend/frontend.go +++ b/pkg/limits/frontend/frontend.go @@ -7,8 +7,8 @@ package frontend import ( "context" - "fmt" "encoding/json" + "fmt" "net/http" "github.com/go-kit/log" @@ -17,7 +17,6 @@ import ( ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" - "github.com/prometheus/client_golang/prometheus" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" From 27b80c4097945dd3cfa2c1ad280b98284d07fff5 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 30 Jan 2025 11:03:54 +0000 Subject: [PATCH 05/15] Fix duplicate metrics collection registration --- pkg/loki/modules.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 5a9fdb5887f03..a4656d9d69e12 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -405,8 +405,8 @@ func (t *Loki) initIngestLimitsRing() (_ services.Service, err error) { t.ingestLimitsRing, err = ring.New( t.Cfg.IngestLimits.LifecyclerConfig.RingConfig, - limits_frontend.RingName, - limits_frontend.RingKey, + limits.RingName, + limits.RingKey, util_log.Logger, reg, ) From 04049e7332699908acbbcd1a8fede5ce3db872e8 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 30 Jan 2025 11:06:42 +0000 Subject: [PATCH 06/15] Fix missing argument --- pkg/limits/frontend/frontend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/limits/frontend/frontend.go b/pkg/limits/frontend/frontend.go index 9c9a246404a09..7eea4236c5da6 100644 --- a/pkg/limits/frontend/frontend.go +++ b/pkg/limits/frontend/frontend.go @@ -55,7 +55,7 @@ func New(cfg Config, ringName string, readRing ring.ReadRing, limits Limits, log }) pool := NewIngestLimitsClientPool(ringName, cfg.ClientConfig.PoolConfig, readRing, factory, logger) - limitsSrv := NewRingIngestLimitsService(readRing, pool, limits, logger) + limitsSrv := NewRingIngestLimitsService(readRing, pool, limits, logger, reg) f := &Frontend{ cfg: cfg, From d72fa5b58f49e5854d3bea34b10c508125de3bc1 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 30 Jan 2025 11:14:26 +0000 Subject: [PATCH 07/15] Fix lint --- pkg/limits/frontend/frontend.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/limits/frontend/frontend.go b/pkg/limits/frontend/frontend.go index 7eea4236c5da6..f3362b03bb074 100644 --- a/pkg/limits/frontend/frontend.go +++ b/pkg/limits/frontend/frontend.go @@ -88,10 +88,10 @@ func New(cfg Config, ringName string, readRing ring.ReadRing, limits Limits, log } // Flush implements ring.FlushTransferer. It transfers state to another ingest limits frontend instance. -func (s *Frontend) Flush() {} +func (f *Frontend) Flush() {} // TransferOut implements ring.FlushTransferer. It transfers state to another ingest limits frontend instance. -func (s *Frontend) TransferOut(_ context.Context) error { +func (f *Frontend) TransferOut(_ context.Context) error { return nil } From ae381ac02d1878f2a2eeaf9e5bb9d7815024df41 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 30 Jan 2025 13:43:40 +0000 Subject: [PATCH 08/15] test --- pkg/limits/frontend/frontend.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/pkg/limits/frontend/frontend.go b/pkg/limits/frontend/frontend.go index f3362b03bb074..147eaeef3d6b1 100644 --- a/pkg/limits/frontend/frontend.go +++ b/pkg/limits/frontend/frontend.go @@ -72,7 +72,7 @@ func New(cfg Config, ringName string, readRing ring.ReadRing, limits Limits, log f.lifecyclerWatcher = services.NewFailureWatcher() f.lifecyclerWatcher.WatchService(f.lifecycler) - servs = append(servs, f.lifecycler) + // servs = append(servs, f.lifecycler) servs = append(servs, pool) mgr, err := services.NewManager(servs...) if err != nil { @@ -96,7 +96,26 @@ func (f *Frontend) TransferOut(_ context.Context) error { } // starting implements services.Service. -func (f *Frontend) starting(ctx context.Context) error { +func (f *Frontend) starting(ctx context.Context) (err error) { + defer func() { + if err != nil { + // if starting() fails for any reason (e.g., context canceled), + // the lifecycler must be stopped. + _ = services.StopAndAwaitTerminated(context.Background(), f.lifecycler) + } + }() + + // pass new context to lifecycler, so that it doesn't stop automatically when IngestLimits's service context is done + err = f.lifecycler.StartAsync(context.Background()) + if err != nil { + return err + } + + err = f.lifecycler.AwaitRunning(ctx) + if err != nil { + return err + } + return services.StartManagerAndAwaitHealthy(ctx, f.subservices) } From f06a8c6b8781c8d9631e7b2db1c622054e66c947 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 30 Jan 2025 14:54:58 +0000 Subject: [PATCH 09/15] Fix missing MemberlistKV config for IngestLimitsFrontend --- pkg/loki/modules.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index a4656d9d69e12..11da465d88c64 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1543,6 +1543,7 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.IngestLimits.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Pattern.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV From 139fcd68ef65d18f90058f6b2239540a383e1c40 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 30 Jan 2025 15:13:24 +0000 Subject: [PATCH 10/15] Add component log label for ingest-limits-frontend --- pkg/loki/modules.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 11da465d88c64..97a467de094bf 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -485,12 +485,13 @@ func (t *Loki) initIngestLimitsFrontend() (services.Service, error) { return nil, nil } + logger := log.With(util_log.Logger, "component", "ingest-limits-frontend") ingestLimitsFrontend, err := limits_frontend.New( t.Cfg.IngestLimitsFrontend, limits.RingName, t.ingestLimitsRing, t.Overrides, - util_log.Logger, + logger, prometheus.DefaultRegisterer, ) if err != nil { From 07e9c9b0e2cef1feea9aad637e4592a0ddb0d9fd Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 30 Jan 2025 15:14:01 +0000 Subject: [PATCH 11/15] Can we use subservices? --- pkg/limits/frontend/frontend.go | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/pkg/limits/frontend/frontend.go b/pkg/limits/frontend/frontend.go index 147eaeef3d6b1..54a6b0296a98a 100644 --- a/pkg/limits/frontend/frontend.go +++ b/pkg/limits/frontend/frontend.go @@ -72,7 +72,7 @@ func New(cfg Config, ringName string, readRing ring.ReadRing, limits Limits, log f.lifecyclerWatcher = services.NewFailureWatcher() f.lifecyclerWatcher.WatchService(f.lifecycler) - // servs = append(servs, f.lifecycler) + servs = append(servs, f.lifecycler) servs = append(servs, pool) mgr, err := services.NewManager(servs...) if err != nil { @@ -98,25 +98,21 @@ func (f *Frontend) TransferOut(_ context.Context) error { // starting implements services.Service. func (f *Frontend) starting(ctx context.Context) (err error) { defer func() { - if err != nil { - // if starting() fails for any reason (e.g., context canceled), - // the lifecycler must be stopped. - _ = services.StopAndAwaitTerminated(context.Background(), f.lifecycler) + if err == nil { + return + } + stopErr := services.StopManagerAndAwaitStopped(context.Background(), f.subservices) + if stopErr != nil { + level.Error(f.logger).Log("msg", "failed to stop ingest-limits-frontend subservices", "err", stopErr) } }() - // pass new context to lifecycler, so that it doesn't stop automatically when IngestLimits's service context is done - err = f.lifecycler.StartAsync(context.Background()) - if err != nil { - return err - } - - err = f.lifecycler.AwaitRunning(ctx) - if err != nil { - return err + level.Info(f.logger).Log("msg", "starting ingest-limits-frontend subservices") + if err := services.StartManagerAndAwaitHealthy(ctx, f.subservices); err != nil { + return fmt.Errorf("failed to start ingest-limits-frontend subservices: %w", err) } - return services.StartManagerAndAwaitHealthy(ctx, f.subservices) + return nil } // running implements services.Service. From 43748459898ac57979b26208cbb3aedbbfb9f480 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 30 Jan 2025 15:20:11 +0000 Subject: [PATCH 12/15] Don't need name as we have component label --- pkg/limits/frontend/frontend.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/limits/frontend/frontend.go b/pkg/limits/frontend/frontend.go index 54a6b0296a98a..1db6187d4a277 100644 --- a/pkg/limits/frontend/frontend.go +++ b/pkg/limits/frontend/frontend.go @@ -103,13 +103,13 @@ func (f *Frontend) starting(ctx context.Context) (err error) { } stopErr := services.StopManagerAndAwaitStopped(context.Background(), f.subservices) if stopErr != nil { - level.Error(f.logger).Log("msg", "failed to stop ingest-limits-frontend subservices", "err", stopErr) + level.Error(f.logger).Log("msg", "failed to stop subservices", "err", stopErr) } }() - level.Info(f.logger).Log("msg", "starting ingest-limits-frontend subservices") + level.Info(f.logger).Log("msg", "starting subservices") if err := services.StartManagerAndAwaitHealthy(ctx, f.subservices); err != nil { - return fmt.Errorf("failed to start ingest-limits-frontend subservices: %w", err) + return fmt.Errorf("failed to start subservices: %w", err) } return nil From efe7a7f8a8cda4c6c5678fa6454a1f2750b70783 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 30 Jan 2025 15:43:28 +0000 Subject: [PATCH 13/15] Move listen port to initIngestLimitsFrontend --- pkg/loki/modules.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 97a467de094bf..01a7c402040c1 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -453,9 +453,6 @@ func (t *Loki) initIngestLimitsFrontendRing() (_ services.Service, err error) { return nil, nil } - // Members of the ring are expected to listen on their gRPC server port. - t.Cfg.IngestLimitsFrontend.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort - reg := prometheus.WrapRegistererWithPrefix(t.Cfg.MetricsNamespace+"_", prometheus.DefaultRegisterer) if t.ingestLimitsFrontendRing, err = ring.New( @@ -485,6 +482,9 @@ func (t *Loki) initIngestLimitsFrontend() (services.Service, error) { return nil, nil } + // Members of the ring are expected to listen on their gRPC server port. + t.Cfg.IngestLimits.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort + logger := log.With(util_log.Logger, "component", "ingest-limits-frontend") ingestLimitsFrontend, err := limits_frontend.New( t.Cfg.IngestLimitsFrontend, From 2dbd6f75bf730b8c696fe1eaa5d09bdee8a4af0c Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 30 Jan 2025 16:00:44 +0000 Subject: [PATCH 14/15] Perhaps it needs to be both? --- pkg/loki/modules.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 01a7c402040c1..15a3ab200e195 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -453,6 +453,9 @@ func (t *Loki) initIngestLimitsFrontendRing() (_ services.Service, err error) { return nil, nil } + // Members of the ring are expected to listen on their gRPC server port. + t.Cfg.IngestLimits.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort + reg := prometheus.WrapRegistererWithPrefix(t.Cfg.MetricsNamespace+"_", prometheus.DefaultRegisterer) if t.ingestLimitsFrontendRing, err = ring.New( From d50ca07b3e2c26566a13cb27c044e386cc0c01e7 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 30 Jan 2025 16:18:47 +0000 Subject: [PATCH 15/15] Another attempt? --- pkg/loki/modules.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 15a3ab200e195..fc5c5febe7761 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -453,9 +453,6 @@ func (t *Loki) initIngestLimitsFrontendRing() (_ services.Service, err error) { return nil, nil } - // Members of the ring are expected to listen on their gRPC server port. - t.Cfg.IngestLimits.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort - reg := prometheus.WrapRegistererWithPrefix(t.Cfg.MetricsNamespace+"_", prometheus.DefaultRegisterer) if t.ingestLimitsFrontendRing, err = ring.New( @@ -486,7 +483,7 @@ func (t *Loki) initIngestLimitsFrontend() (services.Service, error) { } // Members of the ring are expected to listen on their gRPC server port. - t.Cfg.IngestLimits.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.IngestLimitsFrontend.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort logger := log.With(util_log.Logger, "component", "ingest-limits-frontend") ingestLimitsFrontend, err := limits_frontend.New(