From 4f95fe338cd5c7899275a585188b04938991ca4d Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 28 Dec 2023 14:53:07 +0000 Subject: [PATCH] feat: append a noop record after kafka topic initialization --- src/common/meta/src/error.rs | 23 +++++++++ .../meta/src/wal/kafka/topic_manager.rs | 48 +++++++++++++++++-- 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index c120c8ba939d..323d922b9cda 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -321,6 +321,27 @@ pub enum Error { error: rskafka::client::error::Error, }, + #[snafu(display( + "Failed to build a Kafka partition client, topic: {}, partition: {}", + topic, + partition + ))] + BuildKafkaPartitionClient { + topic: String, + partition: i32, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))] + ProduceRecord { + topic: String, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + #[snafu(display("Failed to create a Kafka wal topic"))] CreateKafkaWalTopic { location: Location, @@ -368,6 +389,8 @@ impl ErrorExt for Error { | EncodeWalOptions { .. } | BuildKafkaClient { .. } | BuildKafkaCtrlClient { .. } + | BuildKafkaPartitionClient { .. } + | ProduceRecord { .. } | CreateKafkaWalTopic { .. } | EmptyTopicPool { .. } => StatusCode::Unexpected, diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 860192b97071..54a5a9ab85df 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -21,13 +21,16 @@ use common_telemetry::{debug, error, info}; use rskafka::client::controller::ControllerClient; use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::ProtocolError::TopicAlreadyExists; -use rskafka::client::ClientBuilder; +use rskafka::client::partition::{Compression, UnknownTopicHandling}; +use rskafka::client::{Client, ClientBuilder}; +use rskafka::record::Record; use rskafka::BackoffConfig; use snafu::{ensure, AsErrorSource, ResultExt}; use crate::error::{ - BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu, - EncodeJsonSnafu, InvalidNumTopicsSnafu, Result, + BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu, + CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu, + ProduceRecordSnafu, Result, }; use crate::kv_backend::KvBackendRef; use crate::rpc::store::PutRequest; @@ -37,6 +40,10 @@ use crate::wal::kafka::KafkaConfig; const CREATED_TOPICS_KEY: &str = "__created_wal_topics/kafka/"; +// Each topic only has one partition for now. +// The `DEFAULT_PARTITION` refers to the index of the partition. +const DEFAULT_PARTITION: i32 = 0; + /// Manages topic initialization and selection. pub struct TopicManager { config: KafkaConfig, @@ -117,14 +124,20 @@ impl TopicManager { .await .with_context(|_| BuildKafkaClientSnafu { broker_endpoints: self.config.broker_endpoints.clone(), - })? + })?; + + let control_client = client .controller_client() .context(BuildKafkaCtrlClientSnafu)?; // Try to create missing topics. let tasks = to_be_created .iter() - .map(|i| self.try_create_topic(&topics[*i], &client)) + .map(|i| async { + self.try_create_topic(&topics[*i], &control_client).await?; + self.try_create_noop_record(&topics[*i], &client).await?; + Ok(()) + }) .collect::>(); futures::future::try_join_all(tasks).await.map(|_| ()) } @@ -141,6 +154,31 @@ impl TopicManager { .collect() } + async fn try_create_noop_record(&self, topic: &Topic, client: &Client) -> Result<()> { + let partition_client = client + .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) + .await + .context(BuildKafkaPartitionClientSnafu { + topic, + partition: DEFAULT_PARTITION, + })?; + + partition_client + .produce( + vec![Record { + key: None, + value: None, + timestamp: rskafka::chrono::Utc::now(), + headers: Default::default(), + }], + Compression::NoCompression, + ) + .await + .context(ProduceRecordSnafu { topic })?; + + Ok(()) + } + async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> { match client .create_topic(