Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): tenant topics #15977

Merged
merged 17 commits into from
Jan 30, 2025
Merged

feat(kafka): tenant topics #15977

merged 17 commits into from
Jan 30, 2025

Conversation

owen-d
Copy link
Member

@owen-d owen-d commented Jan 28, 2025

Configurable Topic Partitioning Strategies for Kafka

This PR introduces configurable topic partitioning strategies for the Kafka tenant topic writer. Users can now choose between two strategies:

Simple Strategy (Default)

  • Creates one topic per tenant with multiple partitions
  • Uses hash-based partitioning for log distribution
  • Suitable for basic use cases where tenant data volume is predictable

Automatic Strategy

  • Creates single-partition topics in the format <prefix>.<tenant>.<shard>
  • Dynamically scales by creating new shards as needed
  • Allows both scaling up and down (by stopping writes to higher-numbered shards)
  • Better suited for tenants with varying data volumes

Configuration

The strategy can be configured via YAML or flags:

tenant_topic:
  enabled: true
  strategy: "simple" # or "automatic"
  # ... other config options

@github-actions github-actions bot added the type/docs Issues related to technical documentation; the Docs Squad uses this label across many repositories label Jan 28, 2025
@owen-d owen-d marked this pull request as ready for review January 29, 2025 20:43
@owen-d owen-d requested a review from a team as a code owner January 29, 2025 20:43
@@ -2368,6 +2368,31 @@ otlp_config:
# Enable writes to Ingesters during Push requests. Defaults to true.
# CLI flag: -distributor.ingester-writes-enabled
[ingester_writes_enabled: <boolean> | default = true]

tenant_topic:
# Enable the tenant topic tee
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably define what a "tee" is here.


# Maximum size of a single Kafka record in bytes
# CLI flag: -distributor.tenant-topic-tee.max-record-size-bytes
[maxrecordsizebytes: <int> | default = 15MiB249KiB]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the default a typo? Or is this some developer magic I just haven't encountered before?

Copy link
Member Author

@owen-d owen-d Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a derived value based on some existing constants in the code, which unfortunately creates a less than pleasant reading experience for the default.

	// ProducerBatchMaxBytes is the max allowed size of a batch of Kafka records.
	ProducerBatchMaxBytes = 16_000_000

	// MaxProducerRecordDataBytesLimit is the max allowed size of a single record data. Given we have a limit
	// on the max batch size (ProducerBatchMaxBytes), a Kafka record data can't be bigger than the batch size
	// minus some overhead required to serialise the batch and the record itself. We use 16KB as such overhead
	// in the worst case scenario, which is expected to be way above the actual one.
	MaxProducerRecordDataBytesLimit = ProducerBatchMaxBytes - 16384

Comment on lines 2378 to 2388
# Prefix to prepend to tenant IDs to form the final Kafka topic name
# CLI flag: -distributor.tenant-topic-tee.topic-prefix
[topicprefix: <string> | default = "loki.tenant"]

# Maximum number of bytes that can be buffered before producing to Kafka
# CLI flag: -distributor.tenant-topic-tee.max-buffered-bytes
[maxbufferedbytes: <int> | default = 100MiB]

# Maximum size of a single Kafka record in bytes
# CLI flag: -distributor.tenant-topic-tee.max-record-size-bytes
[maxrecordsizebytes: <int> | default = 15MiB249KiB]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should these names be snake_cased?

Comment on lines +43 to +53
// ParseStrategy converts a string to a Strategy
func ParseStrategy(s string) (Strategy, error) {
switch s {
case "simple":
return SimpleStrategy, nil
case "automatic":
return AutomaticStrategy, nil
default:
return SimpleStrategy, fmt.Errorf("invalid strategy %q, must be either 'simple' or 'automatic'", s)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would implementing encoding.TextUnmarshaler be more idiomatic?

Comment on lines +329 to +331
if len(streams) == 0 {
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiniest nit: this condition could be moved to TenantTopicWrite.Duplicate to avoid spawning a goroutine when there's no work to do

Comment on lines +129 to +136
// 1. Dynamic Scaling:
// - Topics are created in the form "<prefix>.<tenant>.<shard>"
// - Each topic has exactly one partition, with shards serving the same purpose
// as traditional partitions
// - Unlike traditional partitions which can only be increased but never decreased,
// this approach allows for both scaling up and down
// - When volume decreases, we can stop writing to higher-numbered shards
// - Old shards will be automatically cleaned up through normal retention policies
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the topic name for sharding is neat, I'm interested in seeing how it goes.

If we ever need to, another solution would be to support "scale down" by adding a generation number to the topic, and writing to the newest generation (with some background discovery on a timer).

That would come with its own set of problems, probably, but it would allow us to rely on Kafka's builtin partitioning more heavily.

Either way, I don't think something like that needs to be implemented right now and I just wanted to openly share the idea. This looks good 👍

@owen-d owen-d enabled auto-merge (squash) January 30, 2025 17:40
@owen-d owen-d merged commit c258419 into grafana:main Jan 30, 2025
60 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size/L type/docs Issues related to technical documentation; the Docs Squad uses this label across many repositories
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants