Skip to content

Commit

Permalink
fix(kafka): overwrite the EntryId with Offset while consuming records (
Browse files Browse the repository at this point in the history
…#3148)

* fix(kafka): overwrite the EntryId with Offset while consuming the KafkaRecords

* fix: temporarily workaround of incorrect entry Id
  • Loading branch information
WenyXu authored Jan 12, 2024
1 parent c1190ba commit 430ffe0
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ impl LogStore for KafkaLogStore {
}

// Tries to construct an entry from records consumed so far.
if let Some(entry) = maybe_emit_entry(record, &mut entry_records)? {
if let Some(mut entry) = maybe_emit_entry(record, &mut entry_records)? {
// We don't rely on the EntryId generated by mito2.
// Instead, we use the offset return from Kafka as EntryId.
// Therefore, we MUST overwrite the EntryId with RecordOffset.
entry.id = offset as u64;
yield Ok(vec![entry]);
}

Expand Down Expand Up @@ -423,17 +427,20 @@ mod tests {

// Reads entries for regions and checks for each region that the gotten entries are identical with the expected ones.
for region_id in which {
let ctx = &region_contexts[&region_id];
let ctx = region_contexts.get_mut(&region_id).unwrap();
let stream = logstore
.read(&ctx.ns, ctx.flushed_entry_id + 1)
.await
.unwrap();
let got = stream
let mut got = stream
.collect::<Vec<_>>()
.await
.into_iter()
.flat_map(|x| x.unwrap())
.collect::<Vec<_>>();
//FIXME(weny): https://github.com/GreptimeTeam/greptimedb/issues/3152
ctx.expected.iter_mut().for_each(|entry| entry.id = 0);
got.iter_mut().for_each(|entry| entry.id = 0);
assert_eq!(ctx.expected, got);
}

Expand Down

0 comments on commit 430ffe0

Please sign in to comment.