-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(kafka): tenant topics #15977
feat(kafka): tenant topics #15977
Conversation
docs/sources/shared/configuration.md
Outdated
@@ -2368,6 +2368,31 @@ otlp_config: | |||
# Enable writes to Ingesters during Push requests. Defaults to true. | |||
# CLI flag: -distributor.ingester-writes-enabled | |||
[ingester_writes_enabled: <boolean> | default = true] | |||
|
|||
tenant_topic: | |||
# Enable the tenant topic tee |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should probably define what a "tee" is here.
docs/sources/shared/configuration.md
Outdated
|
||
# Maximum size of a single Kafka record in bytes | ||
# CLI flag: -distributor.tenant-topic-tee.max-record-size-bytes | ||
[maxrecordsizebytes: <int> | default = 15MiB249KiB] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the default a typo? Or is this some developer magic I just haven't encountered before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a derived value based on some existing constants in the code, which unfortunately creates a less than pleasant reading experience for the default.
// ProducerBatchMaxBytes is the max allowed size of a batch of Kafka records.
ProducerBatchMaxBytes = 16_000_000
// MaxProducerRecordDataBytesLimit is the max allowed size of a single record data. Given we have a limit
// on the max batch size (ProducerBatchMaxBytes), a Kafka record data can't be bigger than the batch size
// minus some overhead required to serialise the batch and the record itself. We use 16KB as such overhead
// in the worst case scenario, which is expected to be way above the actual one.
MaxProducerRecordDataBytesLimit = ProducerBatchMaxBytes - 16384
docs/sources/shared/configuration.md
Outdated
# Prefix to prepend to tenant IDs to form the final Kafka topic name | ||
# CLI flag: -distributor.tenant-topic-tee.topic-prefix | ||
[topicprefix: <string> | default = "loki.tenant"] | ||
|
||
# Maximum number of bytes that can be buffered before producing to Kafka | ||
# CLI flag: -distributor.tenant-topic-tee.max-buffered-bytes | ||
[maxbufferedbytes: <int> | default = 100MiB] | ||
|
||
# Maximum size of a single Kafka record in bytes | ||
# CLI flag: -distributor.tenant-topic-tee.max-record-size-bytes | ||
[maxrecordsizebytes: <int> | default = 15MiB249KiB] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should these names be snake_cased?
// ParseStrategy converts a string to a Strategy | ||
func ParseStrategy(s string) (Strategy, error) { | ||
switch s { | ||
case "simple": | ||
return SimpleStrategy, nil | ||
case "automatic": | ||
return AutomaticStrategy, nil | ||
default: | ||
return SimpleStrategy, fmt.Errorf("invalid strategy %q, must be either 'simple' or 'automatic'", s) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would implementing encoding.TextUnmarshaler be more idiomatic?
if len(streams) == 0 { | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiniest nit: this condition could be moved to TenantTopicWrite.Duplicate to avoid spawning a goroutine when there's no work to do
// 1. Dynamic Scaling: | ||
// - Topics are created in the form "<prefix>.<tenant>.<shard>" | ||
// - Each topic has exactly one partition, with shards serving the same purpose | ||
// as traditional partitions | ||
// - Unlike traditional partitions which can only be increased but never decreased, | ||
// this approach allows for both scaling up and down | ||
// - When volume decreases, we can stop writing to higher-numbered shards | ||
// - Old shards will be automatically cleaned up through normal retention policies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the topic name for sharding is neat, I'm interested in seeing how it goes.
If we ever need to, another solution would be to support "scale down" by adding a generation number to the topic, and writing to the newest generation (with some background discovery on a timer).
That would come with its own set of problems, probably, but it would allow us to rely on Kafka's builtin partitioning more heavily.
Either way, I don't think something like that needs to be implemented right now and I just wanted to openly share the idea. This looks good 👍
Configurable Topic Partitioning Strategies for Kafka
This PR introduces configurable topic partitioning strategies for the Kafka tenant topic writer. Users can now choose between two strategies:
Simple Strategy (Default)
Automatic Strategy
<prefix>.<tenant>.<shard>
Configuration
The strategy can be configured via YAML or flags: