Skip to content

Commit

Permalink
implement kafka log store
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Nov 29, 2023
1 parent ff80f77 commit dd4cee5
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 29 deletions.
1 change: 1 addition & 0 deletions src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use serde::{Deserialize, Serialize};

pub type KafkaTopic = String;
pub const TOPIC_NAME_PREFIX: &str = "greptime_topic";
pub const TOPIC_KEY: &str = "kafka_topic";

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum Compression {
Expand Down
56 changes: 40 additions & 16 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::any::Any;
use std::num::TryFromIntError;

use common_config::wal::kafka::KafkaTopic;
use common_error::ext::ErrorExt;
Expand All @@ -21,7 +22,8 @@ use common_runtime::error::Error as RuntimeError;
use rskafka::client::error::Error as RsKafkaError;
use snafu::{Location, Snafu};
use store_api::logstore::entry::Id as EntryId;
use store_api::storage::RegionId;
use store_api::logstore::namespace::Id as NamespaceId;
use store_api::logstore::RegionWalOptions;

#[derive(Snafu)]
#[snafu(visibility(pub))]
Expand Down Expand Up @@ -113,36 +115,58 @@ pub enum Error {
error: RsKafkaError,
},

#[snafu(display(
"Failed to get a Kafka topic client, topic: {}, region id: {}",
topic,
region_id
))]
#[snafu(display("Failed to get a Kafka topic client, topic: {}", topic))]
GetKafkaTopicClient {
topic: KafkaTopic,
region_id: u64,
location: Location,
},

#[snafu(display(
"Failed to write entries to Kafka, topic: {}, region id: {}",
topic,
region_id
))]
#[snafu(display("Failed to write entries to Kafka, topic: {}", topic))]
WriteEntriesToKafka {
topic: KafkaTopic,
region_id: RegionId,
location: Location,
#[snafu(source)]
error: rskafka::client::producer::Error,
},

#[snafu(display("Empty log entries provided"))]
EmptyLogEntries { location: Location },

#[snafu(display("Returned Kafka offsets are empty"))]
EmptyKafkaOffsets { location: Location },

#[snafu(display(
"Failed to convert an rskafka offset to entry offset, rskafka_offset: {}",
rskafka_offset
))]
ConvertRsKafkaOffsetToEntryOffset {
rskafka_offset: i64,
location: Location,
#[snafu(source)]
error: TryFromIntError,
},

#[snafu(display(
"Missing required entry offset, entry_id: {}, region_id: {}, topic: {}",
entry_id,
region_id,
topic
))]
MissingEntryOffset {
entry_id: EntryId,
region_id: u64,
topic: KafkaTopic,
location: Location,
},

#[snafu(display(
"Missing required kafka topic, ns_id: {}, region_wal_options: {:?}",
ns_id,
region_wal_options,
))]
MissingKafkaTopic {
ns_id: NamespaceId,
region_wal_options: RegionWalOptions,
location: Location,
},

#[snafu(display("Failed to serialize an entry meta"))]
SerEntryMeta {
location: Location,
Expand Down
Loading

0 comments on commit dd4cee5

Please sign in to comment.