Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: kafka consuer lag size metrics are not dropped after related actors are dropped #16064

Open
MrCroxx opened this issue Apr 2, 2024 · 5 comments
Assignees
Labels
type/bug Something isn't working
Milestone

Comments

@MrCroxx
Copy link
Contributor

MrCroxx commented Apr 2, 2024

Describe the bug

Some source related metrics (e.g. source_latest_message_id, source_kafka_high_watermark , and maybe more) are not dropped after related actors are dropped. Use LabelGuardedMetrics instead to automatically drop the metrics.

Error message/log

No response

To Reproduce

No response

Expected behavior

Source related metrics should be dropped after the related actors are dropped.

How did you deploy RisingWave?

No response

The version of RisingWave

No response

Additional context

No response

@MrCroxx MrCroxx added the type/bug Something isn't working label Apr 2, 2024
@MrCroxx MrCroxx self-assigned this Apr 2, 2024
@github-actions github-actions bot added this to the release-1.8 milestone Apr 2, 2024
@MrCroxx
Copy link
Contributor Author

MrCroxx commented Apr 2, 2024

The difficulty lies in that certain labels have the same lifetime as the owner's, but some specific labels are random and do not have a clear lifetime relationship. 🤔

e.g.

For KafkaSplitReader, when reporting the metrics, the first two labels can be decided when the reader is created.

impl KafkaSplitReader {
    fn report_latest_message_id(&self, split_id: &str, offset: i64) {
        self.source_ctx
            .metrics
            .latest_message_id
            .with_label_values(&[
                // source name is not available here
                &self.source_ctx.source_id.to_string(),
                &self.source_ctx.actor_id.to_string(),
                split_id,
            ])
            .set(offset);
    }
}

But the last label is decided by its received message.

        #[for_await]
        'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(max_chunk_size) {
            let msgs: Vec<_> = msgs
                .into_iter()
                .collect::<std::result::Result<_, KafkaError>>()?;

            // ... ...

            for (partition, offset) in split_msg_offsets {
                let split_id = partition.to_string();
                self.report_latest_message_id(&split_id, offset);
            }

cc @tabVersion @wenym1

@MrCroxx
Copy link
Contributor Author

MrCroxx commented Apr 2, 2024

@wangrunji0408 may be facing the same problem.

@MrCroxx
Copy link
Contributor Author

MrCroxx commented Apr 2, 2024

Should we introduce a new metrics guard that holds a label map internally?

@wenym1
Copy link
Contributor

wenym1 commented Apr 8, 2024

Should we introduce a new metrics guard that holds a label map internally?

For simplicity, we can maintain a map in KafkaSplitReader to cache the labeled metrics? We can try to get the created labeled metrics from the map, and if not found, we can create a new one from the new split id.

Copy link
Contributor

github-actions bot commented Jun 8, 2024

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants