Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ring for ingest-limits-frontend #15985

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,145 @@ ingest_limits_frontend:
# CLI flag: -ingest-limits-frontend.remote-timeout
[remote_timeout: <duration> | default = 1s]

lifecycler:
ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul,
# etcd, inmemory, memberlist, multi.
# CLI flag: -ingest-limits-frontend.store
[store: <string> | default = "consul"]

# The prefix for the keys in the store. Should end with a /.
# CLI flag: -ingest-limits-frontend.prefix
[prefix: <string> | default = "collectors/"]

# Configuration for a Consul client. Only applies if the selected
# kvstore is consul.
# The CLI flags prefix for this block configuration is:
# ingest-limits-frontend
[consul: <consul>]

# Configuration for an ETCD v3 client. Only applies if the selected
# kvstore is etcd.
# The CLI flags prefix for this block configuration is:
# ingest-limits-frontend
[etcd: <etcd>]

multi:
# Primary backend storage used by multi-client.
# CLI flag: -ingest-limits-frontend.multi.primary
[primary: <string> | default = ""]

# Secondary backend storage used by multi-client.
# CLI flag: -ingest-limits-frontend.multi.secondary
[secondary: <string> | default = ""]

# Mirror writes to secondary store.
# CLI flag: -ingest-limits-frontend.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]

# Timeout for storing value to secondary store.
# CLI flag: -ingest-limits-frontend.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]

# The heartbeat timeout after which ingesters are skipped for
# reads/writes. 0 = never (timeout disabled).
# CLI flag: -ingest-limits-frontend.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# The number of ingesters to write to and read from.
# CLI flag: -ingest-limits-frontend.distributor.replication-factor
[replication_factor: <int> | default = 3]

# True to enable the zone-awareness and replicate ingested samples across
# different availability zones.
# CLI flag: -ingest-limits-frontend.distributor.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]

# Comma-separated list of zones to exclude from the ring. Instances in
# excluded zones will be filtered out from the ring.
# CLI flag: -ingest-limits-frontend.distributor.excluded-zones
[excluded_zones: <string> | default = ""]

# Number of tokens for each ingester.
# CLI flag: -ingest-limits-frontend.num-tokens
[num_tokens: <int> | default = 128]

# Period at which to heartbeat to consul. 0 = disabled.
# CLI flag: -ingest-limits-frontend.heartbeat-period
[heartbeat_period: <duration> | default = 5s]

# Heartbeat timeout after which instance is assumed to be unhealthy. 0 =
# disabled.
# CLI flag: -ingest-limits-frontend.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# Observe tokens after generating to resolve collisions. Useful when using
# gossiping ring.
# CLI flag: -ingest-limits-frontend.observe-period
[observe_period: <duration> | default = 0s]

# Period to wait for a claim from another member; will join automatically
# after this.
# CLI flag: -ingest-limits-frontend.join-after
[join_after: <duration> | default = 0s]

# Minimum duration to wait after the internal readiness checks have passed
# but before succeeding the readiness endpoint. This is used to slowdown
# deployment controllers (eg. Kubernetes) after an instance is ready and
# before they proceed with a rolling update, to give the rest of the cluster
# instances enough time to receive ring updates.
# CLI flag: -ingest-limits-frontend.min-ready-duration
[min_ready_duration: <duration> | default = 15s]

# Name of network interface to read address from.
# CLI flag: -ingest-limits-frontend.lifecycler.interface
[interface_names: <list of strings> | default = [<private network interfaces>]]

# Enable IPv6 support. Required to make use of IP addresses from IPv6
# interfaces.
# CLI flag: -ingest-limits-frontend.enable-inet6
[enable_inet6: <boolean> | default = false]

# Duration to sleep for before exiting, to ensure metrics are scraped.
# CLI flag: -ingest-limits-frontend.final-sleep
[final_sleep: <duration> | default = 0s]

# File path where tokens are stored. If empty, tokens are not stored at
# shutdown and restored at startup.
# CLI flag: -ingest-limits-frontend.tokens-file-path
[tokens_file_path: <string> | default = ""]

# The availability zone where this instance is running.
# CLI flag: -ingest-limits-frontend.availability-zone
[availability_zone: <string> | default = ""]

# Unregister from the ring upon clean shutdown. It can be useful to disable
# for rolling restarts with consistent naming in conjunction with
# -distributor.extend-writes=false.
# CLI flag: -ingest-limits-frontend.unregister-on-shutdown
[unregister_on_shutdown: <boolean> | default = true]

# When enabled the readiness probe succeeds only after all instances are
# ACTIVE and healthy in the ring, otherwise only the instance itself is
# checked. This option should be disabled if in your cluster multiple
# instances can be rolled out simultaneously, otherwise rolling updates may
# be slowed down.
# CLI flag: -ingest-limits-frontend.readiness-check-ring-health
[readiness_check_ring_health: <boolean> | default = true]

# IP address to advertise in the ring.
# CLI flag: -ingest-limits-frontend.lifecycler.addr
[address: <string> | default = ""]

# port to advertise in consul (defaults to server.grpc-listen-port).
# CLI flag: -ingest-limits-frontend.lifecycler.port
[port: <int> | default = 0]

# ID to register in the ring.
# CLI flag: -ingest-limits-frontend.lifecycler.ID
[id: <string> | default = "<hostname>"]

# Configuration for 'runtime config' module, responsible for reloading runtime
# configuration file.
[runtime_config: <runtime_config>]
Expand Down Expand Up @@ -2315,6 +2454,7 @@ Configuration for a Consul client. Only applies if the selected kvstore is `cons
- `distributor.ring`
- `index-gateway.ring`
- `ingest-limits`
- `ingest-limits-frontend`
- `ingester.partition-ring`
- `pattern-ingester`
- `query-scheduler.ring`
Expand Down Expand Up @@ -2548,6 +2688,7 @@ Configuration for an ETCD v3 client. Only applies if the selected kvstore is `et
- `distributor.ring`
- `index-gateway.ring`
- `ingest-limits`
- `ingest-limits-frontend`
- `ingester.partition-ring`
- `pattern-ingester`
- `query-scheduler.ring`
Expand Down
8 changes: 7 additions & 1 deletion pkg/limits/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ package frontend
import (
"flag"
"fmt"

"github.com/grafana/dskit/ring"

util_log "github.com/grafana/loki/v3/pkg/util/log"
)

type Config struct {
ClientConfig BackendClientConfig `yaml:"client_config"`
ClientConfig BackendClientConfig `yaml:"client_config"`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.ClientConfig.RegisterFlagsWithPrefix("ingest-limits-frontend", f)
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("ingest-limits-frontend.", f, util_log.Logger)
grobinson-grafana marked this conversation as resolved.
Show resolved Hide resolved
}

func (cfg *Config) Validate() error {
Expand Down
52 changes: 47 additions & 5 deletions pkg/limits/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package frontend
import (
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/go-kit/log"
Expand All @@ -23,6 +24,11 @@ import (
"github.com/grafana/loki/v3/pkg/util"
)

const (
RingKey = "ingest-limits-frontend"
RingName = "ingest-limits-frontend"
)

// Frontend is the limits-frontend service, and acts a service wrapper for
// all components needed to run the limits-frontend.
type Frontend struct {
Expand All @@ -35,25 +41,38 @@ type Frontend struct {
subservicesWatcher *services.FailureWatcher

limits IngestLimitsService

lifecycler *ring.Lifecycler
grobinson-grafana marked this conversation as resolved.
Show resolved Hide resolved
lifecyclerWatcher *services.FailureWatcher
}

// New returns a new Frontend.
func New(cfg Config, ringName string, ring ring.ReadRing, limits Limits, logger log.Logger, reg prometheus.Registerer) (*Frontend, error) {
func New(cfg Config, ringName string, readRing ring.ReadRing, limits Limits, logger log.Logger, reg prometheus.Registerer) (*Frontend, error) {
var servs []services.Service

factory := ring_client.PoolAddrFunc(func(addr string) (ring_client.PoolClient, error) {
return NewIngestLimitsBackendClient(cfg.ClientConfig, addr)
})

pool := NewIngestLimitsClientPool(ringName, cfg.ClientConfig.PoolConfig, ring, factory, logger)
limitsSrv := NewRingIngestLimitsService(ring, pool, limits, logger, reg)
pool := NewIngestLimitsClientPool(ringName, cfg.ClientConfig.PoolConfig, readRing, factory, logger)
limitsSrv := NewRingIngestLimitsService(readRing, pool, limits, logger, reg)

f := &Frontend{
cfg: cfg,
logger: logger,
limits: limitsSrv,
}

var err error
f.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, f, RingName, RingKey, true, logger, reg)
if err != nil {
return nil, fmt.Errorf("failed to create %s lifecycler: %w", RingName, err)
}
// Watch the lifecycler
f.lifecyclerWatcher = services.NewFailureWatcher()
f.lifecyclerWatcher.WatchService(f.lifecycler)

servs = append(servs, f.lifecycler)
servs = append(servs, pool)
mgr, err := services.NewManager(servs...)
if err != nil {
Expand All @@ -68,9 +87,32 @@ func New(cfg Config, ringName string, ring ring.ReadRing, limits Limits, logger
return f, nil
}

// Flush implements ring.FlushTransferer. It transfers state to another ingest limits frontend instance.
func (f *Frontend) Flush() {}

// TransferOut implements ring.FlushTransferer. It transfers state to another ingest limits frontend instance.
func (f *Frontend) TransferOut(_ context.Context) error {
return nil
}

// starting implements services.Service.
func (f *Frontend) starting(ctx context.Context) error {
return services.StartManagerAndAwaitHealthy(ctx, f.subservices)
func (f *Frontend) starting(ctx context.Context) (err error) {
defer func() {
if err == nil {
return
}
stopErr := services.StopManagerAndAwaitStopped(context.Background(), f.subservices)
if stopErr != nil {
level.Error(f.logger).Log("msg", "failed to stop subservices", "err", stopErr)
}
}()

level.Info(f.logger).Log("msg", "starting subservices")
if err := services.StartManagerAndAwaitHealthy(ctx, f.subservices); err != nil {
return fmt.Errorf("failed to start subservices: %w", err)
}

return nil
}

// running implements services.Service.
Expand Down
7 changes: 5 additions & 2 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ type Loki struct {
ingestLimits *limits.IngestLimits
ingestLimitsRing *ring.Ring
ingestLimitsFrontend *limits_frontend.Frontend
ingestLimitsFrontendRing *ring.Ring
Ingester ingester.Interface
PatternIngester *pattern.Ingester
PatternRingClient pattern.RingClient
Expand Down Expand Up @@ -692,6 +693,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(MemberlistKV, t.initMemberlistKV, modules.UserInvisibleModule)
mm.RegisterModule(Ring, t.initRing, modules.UserInvisibleModule)
mm.RegisterModule(IngestLimitsRing, t.initIngestLimitsRing, modules.UserInvisibleModule)
mm.RegisterModule(IngestLimitsFrontendRing, t.initIngestLimitsFrontendRing, modules.UserInvisibleModule)
mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule)
mm.RegisterModule(OverridesExporter, t.initOverridesExporter)
mm.RegisterModule(TenantConfigs, t.initTenantConfigs, modules.UserInvisibleModule)
Expand Down Expand Up @@ -740,10 +742,11 @@ func (t *Loki) setupModuleManager() error {
Overrides: {RuntimeConfig},
OverridesExporter: {Overrides, Server},
TenantConfigs: {RuntimeConfig},
Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing},
Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing, IngestLimitsFrontendRing},
IngestLimitsRing: {RuntimeConfig, Server, MemberlistKV},
IngestLimits: {MemberlistKV, Server},
IngestLimitsFrontend: {IngestLimitsRing, Overrides, Server},
IngestLimitsFrontend: {IngestLimitsRing, Overrides, Server, MemberlistKV},
IngestLimitsFrontendRing: {RuntimeConfig, Server, MemberlistKV},
Store: {Overrides, IndexGatewayRing},
Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics, PartitionRing},
Querier: {Store, Ring, Server, IngesterQuerier, PatternRingClient, Overrides, Analytics, CacheGenerationLoader, QuerySchedulerRing},
Expand Down
37 changes: 36 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ const (
IngestLimits = "ingest-limits"
IngestLimitsRing = "ingest-limits-ring"
IngestLimitsFrontend = "ingest-limits-frontend"
IngestLimitsFrontendRing = "ingest-limits-frontend-ring"
Ingester = "ingester"
PatternIngester = "pattern-ingester"
PatternRingClient = "pattern-ring-client"
Expand Down Expand Up @@ -447,17 +448,50 @@ func (t *Loki) initIngestLimits() (services.Service, error) {
return ingestLimits, nil
}

func (t *Loki) initIngestLimitsFrontendRing() (_ services.Service, err error) {
if !t.Cfg.IngestLimits.Enabled {
return nil, nil
}

reg := prometheus.WrapRegistererWithPrefix(t.Cfg.MetricsNamespace+"_", prometheus.DefaultRegisterer)

if t.ingestLimitsFrontendRing, err = ring.New(
t.Cfg.IngestLimitsFrontend.LifecyclerConfig.RingConfig,
limits_frontend.RingName,
limits_frontend.RingKey,
util_log.Logger,
reg,
); err != nil {
return nil, fmt.Errorf("failed to create %s ring: %w", limits_frontend.RingName, err)
}

t.Server.HTTP.Path("/ingest-limits-frontend/ring").
Methods("GET", "POST").
Handler(t.ingestLimitsFrontendRing)
if t.Cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/ingest-limits-frontend/ring").
Methods("GET", "POST").
Handler(t.ingestLimitsFrontendRing)
}

return t.ingestLimitsFrontendRing, nil
}

func (t *Loki) initIngestLimitsFrontend() (services.Service, error) {
if !t.Cfg.IngestLimits.Enabled {
return nil, nil
}

// Members of the ring are expected to listen on their gRPC server port.
t.Cfg.IngestLimitsFrontend.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort

logger := log.With(util_log.Logger, "component", "ingest-limits-frontend")
ingestLimitsFrontend, err := limits_frontend.New(
t.Cfg.IngestLimitsFrontend,
limits.RingName,
t.ingestLimitsRing,
t.Overrides,
util_log.Logger,
logger,
prometheus.DefaultRegisterer,
)
if err != nil {
Expand Down Expand Up @@ -1510,6 +1544,7 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.IngestLimits.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Pattern.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
Expand Down
Loading