Skip to content

Commit

Permalink
chore: rewrite encdec of record
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Jan 4, 2024
1 parent 5a3fbee commit 33ea9af
Showing 1 changed file with 113 additions and 53 deletions.
166 changes: 113 additions & 53 deletions src/log-store/src/kafka/util/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,13 @@ impl From<RecordType> for i32 {
}
}

/// The minimal storage unit in the Kafka log store.
/// Our own Record will be converted into a Kafka record during producing.
/// The metadata of a record.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Record {
pub struct RecordMeta {
/// The version of the record. Used for backward compatibility.
version: u32,
/// The type of the record.
pub tp: RecordType,
/// The payload of the record.
data: Vec<u8>,
/// The id of the entry the record associated with.
pub entry_id: EntryId,
/// The namespace of the entry the record associated with.
Expand All @@ -77,14 +74,24 @@ pub struct Record {
checksum: u32,
}

impl TryInto<KafkaRecord> for Record {
/// The minimal storage unit in the Kafka log store.
/// Our own Record will be converted into a Kafka record during producing.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Record {
/// The metadata of the record.
meta: RecordMeta,
/// The payload of the record.
data: Vec<u8>,
}

impl TryFrom<Record> for KafkaRecord {
type Error = crate::error::Error;

fn try_into(self) -> Result<KafkaRecord> {
let value = serde_json::to_vec(&self).context(EncodeJsonSnafu)?;
fn try_from(record: Record) -> Result<Self> {
let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?;
Ok(KafkaRecord {
key: None,
value: Some(value),
key: Some(key),
value: Some(record.data),
timestamp: rskafka::chrono::Utc::now(),
headers: Default::default(),
})
Expand All @@ -95,8 +102,12 @@ impl TryFrom<KafkaRecord> for Record {
type Error = crate::error::Error;

fn try_from(kafka_record: KafkaRecord) -> Result<Self> {
let value = kafka_record.value.context(MissingValueSnafu)?;
serde_json::from_slice(&value).context(DecodeJsonSnafu)
let key = kafka_record.key.unwrap();
let meta = serde_json::from_slice(&key)
.context(DecodeJsonSnafu)
.unwrap();
let data = kafka_record.value.context(MissingValueSnafu)?;
Ok(Self { meta, data })
}
}

Expand All @@ -106,8 +117,8 @@ impl TryFrom<Vec<Record>> for EntryImpl {
fn try_from(records: Vec<Record>) -> Result<Self> {
check_records(&records)?;

let entry_id = records[0].entry_id;
let ns = records[0].ns.clone();
let entry_id = records[0].meta.entry_id;
let ns = records[0].meta.ns.clone();
let data = records.into_iter().flat_map(|record| record.data).collect();
Ok(EntryImpl {
data,
Expand Down Expand Up @@ -197,10 +208,10 @@ fn check_records(records: &[Record]) -> Result<()> {
let mut namespaces = HashSet::with_capacity(len);
let mut checksum_matched = HashSet::with_capacity(len);
for record in records {
sequence.push(i32::from(record.tp));
entry_ids.insert(record.entry_id);
namespaces.insert(&record.ns);
checksum_matched.insert(record.checksum == crc32fast::hash(&record.data));
sequence.push(i32::from(record.meta.tp));
entry_ids.insert(record.meta.entry_id);
namespaces.insert(&record.meta.ns);
checksum_matched.insert(record.meta.checksum == crc32fast::hash(&record.data));
}

ensure!(
Expand Down Expand Up @@ -259,12 +270,14 @@ fn record_type(seq: usize, num_records: usize) -> RecordType {
fn build_records(entry: EntryImpl, max_record_size: usize) -> Vec<Record> {
if entry.data.len() <= max_record_size {
let record = Record {
version: VERSION,
tp: RecordType::Full,
checksum: crc32fast::hash(&entry.data),
meta: RecordMeta {
version: VERSION,
tp: RecordType::Full,
entry_id: entry.id,
ns: entry.ns,
checksum: crc32fast::hash(&entry.data),
},
data: entry.data,
entry_id: entry.id,
ns: entry.ns,
};
return vec![record];
}
Expand All @@ -274,12 +287,14 @@ fn build_records(entry: EntryImpl, max_record_size: usize) -> Vec<Record> {
chunks
.enumerate()
.map(|(i, chunk)| Record {
version: VERSION,
tp: record_type(i, num_chunks),
meta: RecordMeta {
version: VERSION,
tp: record_type(i, num_chunks),
entry_id: entry.id,
ns: entry.ns.clone(),
checksum: crc32fast::hash(chunk),
},
data: chunk.to_vec(),
entry_id: entry.id,
ns: entry.ns.clone(),
checksum: crc32fast::hash(chunk),
})
.collect()
}
Expand All @@ -290,15 +305,15 @@ pub fn maybe_emit_entry(
entry_records: &mut HashMap<EntryId, Vec<Record>>,
) -> Result<Option<EntryImpl>> {
let mut entry = None;
match record.tp {
match record.meta.tp {
RecordType::Full => {
entry = Some(EntryImpl::try_from(vec![record])?);
}
RecordType::Last => {
// There must have a sequence prefix before a Last record is read.
let mut records =
entry_records
.remove(&record.entry_id)
.remove(&record.meta.entry_id)
.context(IllegalSequenceSnafu {
error: "Missing sequence prefix",
})?;
Expand All @@ -307,7 +322,7 @@ pub fn maybe_emit_entry(
}
_ => {
entry_records
.entry(record.entry_id)
.entry(record.meta.entry_id)
.or_default()
.push(record);
}
Expand All @@ -324,29 +339,44 @@ pub fn maybe_emit_entry(

#[cfg(test)]
mod tests {
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use common_config::wal::KafkaConfig;
use rand::Rng;

use super::*;
use crate::kafka::client_manager::ClientManager;

// Implements some utility methods for testing.
impl Default for Record {
fn default() -> Self {
Self {
version: VERSION,
tp: RecordType::Full,
ns: NamespaceImpl {
region_id: 0,
topic: "greptimedb_wal_topic".to_string(),
meta: RecordMeta {
version: VERSION,
tp: RecordType::Full,
ns: NamespaceImpl {
region_id: 0,
topic: "greptimedb_wal_topic".to_string(),
},
entry_id: 0,
checksum: 0,
},
data: Vec::new(),
entry_id: 0,
checksum: 0,
}
}
}

impl Record {
/// Overrides tp.
fn with_tp(&self, tp: RecordType) -> Self {
Self { tp, ..self.clone() }
Self {
meta: RecordMeta {
tp,
..self.meta.clone()
},
..self.clone()
}
}

/// Overrides data with the given data.
Expand All @@ -360,26 +390,34 @@ mod tests {
/// Overrides entry id.
fn with_entry_id(&self, entry_id: EntryId) -> Self {
Self {
entry_id,
meta: RecordMeta {
entry_id,
..self.meta.clone()
},
..self.clone()
}
}

/// Overrides namespace.
fn with_ns(&self, ns: NamespaceImpl) -> Self {
Self { ns, ..self.clone() }
Self {
meta: RecordMeta { ns, ..self.meta },
..self.clone()
}
}

/// Overrides checksum.
fn with_checksum(&self, checksum: u32) -> Self {
Self {
checksum,
meta: RecordMeta {
checksum,
..self.meta.clone()
},
..self.clone()
}
}
}

#[allow(unused)]
fn new_test_entry<D: AsRef<[u8]>>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl {
EntryImpl {
data: data.as_ref().to_vec(),
Expand Down Expand Up @@ -557,15 +595,17 @@ mod tests {
#[test]
fn test_record_conversion() {
let record = Record {
version: VERSION,
tp: RecordType::Full,
data: b"12345".to_vec(),
entry_id: 1,
ns: NamespaceImpl {
region_id: 1,
topic: "greptimedb_wal_topic".to_string(),
meta: RecordMeta {
version: VERSION,
tp: RecordType::Full,
entry_id: 1,
ns: NamespaceImpl {
region_id: 1,
topic: "greptimedb_wal_topic".to_string(),
},
checksum: crc32fast::hash(b"12345".as_slice()),
},
checksum: crc32fast::hash(b"12345".as_slice()),
data: b"12345".to_vec(),
};
let kafka_record: KafkaRecord = record.clone().try_into().unwrap();
let got = Record::try_from(kafka_record).unwrap();
Expand All @@ -591,8 +631,8 @@ mod tests {
.with_tp(RecordType::Last),
];
let entry = EntryImpl::try_from(records.clone()).unwrap();
assert_eq!(records[0].entry_id, entry.id);
assert_eq!(records[0].ns, entry.ns);
assert_eq!(records[0].meta.entry_id, entry.id);
assert_eq!(records[0].meta.ns, entry.ns);
assert_eq!(
entry.data,
records
Expand Down Expand Up @@ -666,4 +706,24 @@ mod tests {
assert!(entry_records.contains_key(&1));
assert!(entry_records.contains_key(&2));
}

#[tokio::test]
async fn test_produce_large_entry() {
let max_record_size = 1024;
let topic = format!("greptimedb_wal_topic_{}", rand::thread_rng().gen::<usize>());
let ns = NamespaceImpl {
region_id: 1,
topic,
};
let entry = new_test_entry([b'1'; 4096], 0, ns.clone());
let producer = RecordProducer::new(ns.clone(), max_record_size).with_entries(vec![entry]);

let config = KafkaConfig {
broker_endpoints: vec!["localhost:9092".to_string()],
max_batch_size: ReadableSize::kb(1),
..Default::default()
};
let manager = Arc::new(ClientManager::try_new(&config).await.unwrap());
producer.produce(&manager).await.unwrap();
}
}

0 comments on commit 33ea9af

Please sign in to comment.