Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remote_wal): impl kafka log store #2971

Merged
merged 25 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
594db45
feat: introduce client manager
niebayes Dec 21, 2023
ea3a077
chore: add errors for client manager
niebayes Dec 21, 2023
071b974
chore: add record utils
niebayes Dec 21, 2023
3b779b9
chore: impl kafka log store
niebayes Dec 21, 2023
eb597ce
chore: build kafka log store upon starting datanode
niebayes Dec 21, 2023
eed0def
chore: update comments for kafka log store
niebayes Dec 21, 2023
047941b
chore: add a todo for getting entry offset
niebayes Dec 21, 2023
27181d6
fix: typo
niebayes Dec 21, 2023
2260cf9
chore: remove unused
niebayes Dec 21, 2023
66f7d9c
chore: update comments
niebayes Dec 21, 2023
45ac518
fix: typo
niebayes Dec 21, 2023
8a5dab8
fix: resolve some review conversations
niebayes Dec 21, 2023
7241994
Merge branch 'develop' into feat/impl_kafka_log_store
niebayes Dec 22, 2023
bc7938d
chore: move commonly referenced crates to workspace Cargo.toml
niebayes Dec 22, 2023
d19178d
fix: style
niebayes Dec 22, 2023
f8307a3
Merge branch 'develop' into feat/impl_kafka_log_store
niebayes Dec 22, 2023
ffa4c4a
fix: style
niebayes Dec 22, 2023
4bc1cc1
chore: unify topic name prefix
niebayes Dec 22, 2023
94c53a4
chore: make backoff config configurable by users
niebayes Dec 22, 2023
ba21a0e
chore: properly use backoff config in wal config
niebayes Dec 22, 2023
a718a9a
refactor: read/write of kafka log store
niebayes Dec 23, 2023
ed4d2cf
fix: typo
niebayes Dec 23, 2023
62a8991
fix: typo
niebayes Dec 23, 2023
af419f0
fix: resolve conflicts and review conversations
niebayes Dec 25, 2023
0f4ef3e
fix: resolve review conversations
niebayes Dec 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,11 @@ impl DatanodeBuilder {

/// Builds [KafkaLogStore].
async fn build_kafka_log_store(config: &KafkaConfig) -> Result<Arc<KafkaLogStore>> {
let _ = config;
todo!()
KafkaLogStore::try_new(config)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)
.map(Arc::new)
}

/// Builds [ObjectStoreManager]
Expand Down
4 changes: 4 additions & 0 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ common-macro.workspace = true
common-meta.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
dashmap = "5.4"
futures-util.workspace = true
futures.workspace = true
protobuf = { version = "2", features = ["bytes"] }
raft-engine.workspace = true
rskafka = "0.5"
niebayes marked this conversation as resolved.
Show resolved Hide resolved
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api.workspace = true
tokio-util.workspace = true
Expand Down
92 changes: 92 additions & 0 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::any::Any;

use common_config::wal::KafkaWalTopic;
use common_error::ext::ErrorExt;
use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
Expand Down Expand Up @@ -84,6 +85,97 @@ pub enum Error {
attempt_index: u64,
location: Location,
},

#[snafu(display(
"Failed to build an rskafka client, broker endpoints: {:?}",
niebayes marked this conversation as resolved.
Show resolved Hide resolved
broker_endpoints
))]
BuildClient {
broker_endpoints: Vec<String>,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display(
"Failed to build an rskafka partition client, topic: {}, partition: {}",
niebayes marked this conversation as resolved.
Show resolved Hide resolved
topic,
partition
))]
BuildPartitionClient {
topic: String,
partition: i32,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display(
"Failed to get a Kafka topic client, topic: {}, source: {}",
topic,
error
))]
fengys1996 marked this conversation as resolved.
Show resolved Hide resolved
GetClient {
topic: KafkaWalTopic,
location: Location,
error: String,
},

#[snafu(display("Failed to encode a record key, key: {}", key))]
EncodeKey {
key: String,
location: Location,
#[snafu(source)]
error: serde_json::Error,
},

#[snafu(display("Failed to decode a record key"))]
DecodeKey {
location: Location,
#[snafu(source)]
error: serde_json::Error,
},

#[snafu(display("Missing required key in a record"))]
MissingKey { location: Location },

#[snafu(display("Missing required value in a record"))]
MissingValue { location: Location },

#[snafu(display("Failed to produce entries to Kafka, topic: {}", topic))]
ProduceEntries {
topic: KafkaWalTopic,
location: Location,
#[snafu(source)]
error: rskafka::client::producer::Error,
},

#[snafu(display("The produce tasks return an empty offset vector, topic: {}", topic))]
EmptyOffsets {
topic: KafkaWalTopic,
location: Location,
},

#[snafu(display(
"Failed to cast an rskafka offset to entry offset, rskafka_offset: {}",
offset
))]
CastOffset { offset: i64, location: Location },

#[snafu(display(
"Failed to read a record from Kafka, offset {}, topic: {}, region id: {}",
offset,
topic,
region_id,
))]
ConsumeRecord {
offset: i64,
topic: String,
region_id: u64,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
}

impl ErrorExt for Error {
Expand Down
10 changes: 2 additions & 8 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod client_manager;
pub mod log_store;
mod record_utils;

use common_meta::wal::KafkaWalTopic as Topic;
use store_api::logstore::entry::{Entry, Id as EntryId};
Expand All @@ -31,14 +33,6 @@ impl NamespaceImpl {
fn new(region_id: u64, topic: Topic) -> Self {
Self { region_id, topic }
}

fn region_id(&self) -> u64 {
self.region_id
}

fn topic(&self) -> &Topic {
&self.topic
}
}

impl Namespace for NamespaceImpl {
Expand Down
130 changes: 130 additions & 0 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic};
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
use rskafka::client::producer::aggregator::RecordAggregator;
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
use rskafka::client::{Client as RsKafkaClient, ClientBuilder};
use rskafka::BackoffConfig;
use snafu::ResultExt;

use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result};

// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
niebayes marked this conversation as resolved.
Show resolved Hide resolved

/// Arc wrapper of Client.
pub(super) type ClientRef = Arc<Client>;
niebayes marked this conversation as resolved.
Show resolved Hide resolved
/// Arc wrapper of ClientManager.
pub(super) type ClientManagerRef = Arc<ClientManager>;

/// A client through which to contact Kafka cluster. Each client associates with one partition of a topic.
/// Since a topic only has one partition in our design, the mapping between clients and topics are one-one.
#[derive(Debug)]
pub(super) struct Client {
/// A raw client used to construct a batch producer and/or a stream consumer for a specific topic.
pub(super) raw_client: Arc<PartitionClient>,
/// A producer used to buffer log entries for a specific topic before sending them in a batching manner.
pub(super) producer: Arc<BatchProducer<RecordAggregator>>,
}

impl Client {
/// Creates a Client from the raw client.
pub(super) fn new(raw_client: Arc<PartitionClient>, config: &KafkaConfig) -> Self {
let record_aggregator = RecordAggregator::new(config.max_batch_size.as_bytes() as usize);
let batch_producer = BatchProducerBuilder::new(raw_client.clone())
.with_compression(config.compression)
.with_linger(config.linger)
.build(record_aggregator);

Self {
raw_client,
producer: Arc::new(batch_producer),
}
}
}

/// Manages client construction and accesses.
#[derive(Debug)]
pub(super) struct ClientManager {
config: KafkaConfig,
/// Top-level client in rskafka. All clients are constructed by this client.
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
/// Key: a topic. Value: the associated client of the topic.
client_pool: DashMap<Topic, ClientRef>,
}

impl ClientManager {
/// Tries to create a ClientManager.
pub(super) async fn try_new(config: &KafkaConfig) -> Result<Self> {
// Sets backoff config for the top-level rskafka client and all clients constructed by it.
let backoff_config = BackoffConfig {
init_backoff: Duration::from_millis(500),
max_backoff: Duration::from_secs(10),
base: 2.,
// Stop reconnecting if the total wait time reaches the deadline.
deadline: Some(Duration::from_secs(60 * 5)),
};
niebayes marked this conversation as resolved.
Show resolved Hide resolved
let client = ClientBuilder::new(config.broker_endpoints.clone())
.backoff_config(backoff_config)
.build()
.await
.with_context(|_| BuildClientSnafu {
broker_endpoints: config.broker_endpoints.clone(),
})?;

Ok(Self {
config: config.clone(),
client_factory: client,
client_pool: DashMap::with_capacity(config.num_topics),
})
}

/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(super) async fn get_or_insert(&self, topic: &Topic) -> Result<ClientRef> {
match self.client_pool.entry(topic.to_string()) {
DashMapEntry::Occupied(entry) => Ok(entry.get().clone()),
DashMapEntry::Vacant(entry) => {
let topic_client = self.try_create_client(topic).await?;
Ok(entry.insert(topic_client).clone())
}
}
}

async fn try_create_client(&self, topic: &Topic) -> Result<ClientRef> {
// Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error.
// That's because the topic is believed to exist as the metasrv is expected to create required topics upon start.
// The reconnecting won't stop until succeed or a different error returns.
let raw_client = self
.client_factory
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
.context(BuildPartitionClientSnafu {
topic,
partition: DEFAULT_PARTITION,
})
.map(Arc::new)?;

Ok(Arc::new(Client::new(raw_client, &self.config)))
}
}
Loading
Loading