Skip to content

Commit

Permalink
Add additional bookkeeping to support incremental rebalancing
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang committed Oct 24, 2024
1 parent df2cf01 commit 9261674
Showing 1 changed file with 40 additions and 15 deletions.
55 changes: 40 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rdkafka::{
};
use std::{
cmp,
collections::HashMap,
collections::{BTreeSet, HashMap},
fmt::Debug,
future::Future,
mem::take,
Expand Down Expand Up @@ -41,7 +41,10 @@ pub mod reducers;
pub async fn start_consumer(
topics: &[&str],
kafka_client_config: &ClientConfig,
spawn_actors: impl FnMut(Arc<StreamConsumer<KafkaContext>>, &[(String, i32)]) -> ActorHandles,
spawn_actors: impl FnMut(
Arc<StreamConsumer<KafkaContext>>,
&BTreeSet<(String, i32)>,
) -> ActorHandles,
) -> Result<(), Error> {
let (client_shutdown_sender, client_shutdown_receiver) = oneshot::channel();
let (event_sender, event_receiver) = unbounded_channel();
Expand Down Expand Up @@ -151,8 +154,8 @@ impl ConsumerContext for KafkaContext {

#[derive(Debug)]
pub enum Event {
Assign(Vec<(String, i32)>),
Revoke(Vec<(String, i32)>),
Assign(BTreeSet<(String, i32)>),
Revoke(BTreeSet<(String, i32)>),
Shutdown,
}

Expand Down Expand Up @@ -195,7 +198,7 @@ macro_rules! processing_strategy {
}
) => {{
|consumer: Arc<rdkafka::consumer::StreamConsumer<$crate::KafkaContext>>,
tpl: &[(String, i32)]|
tpl: &std::collections::BTreeSet<(String, i32)>|
-> $crate::ActorHandles {
let start = std::time::Instant::now();

Expand Down Expand Up @@ -261,7 +264,7 @@ macro_rules! processing_strategy {
#[derive(Debug)]
enum ConsumerState {
Ready,
Consuming(ActorHandles),
Consuming(ActorHandles, BTreeSet<(String, i32)>),
Stopped,
}

Expand All @@ -270,7 +273,10 @@ pub async fn handle_events(
consumer: Arc<StreamConsumer<KafkaContext>>,
events: UnboundedReceiver<(Event, SyncSender<()>)>,
shutdown_client: oneshot::Sender<()>,
mut spawn_actors: impl FnMut(Arc<StreamConsumer<KafkaContext>>, &[(String, i32)]) -> ActorHandles,
mut spawn_actors: impl FnMut(
Arc<StreamConsumer<KafkaContext>>,
&BTreeSet<(String, i32)>,
) -> ActorHandles,
) -> Result<(), anyhow::Error> {
const CALLBACK_DURATION: Duration = Duration::from_secs(1);

Expand All @@ -280,27 +286,46 @@ pub async fn handle_events(
let mut state = ConsumerState::Ready;

while let ConsumerState::Ready { .. } | ConsumerState::Consuming { .. } = state {
let Some((event, _rendezvous_guard)) = &mut events_stream.next().await else {
let Some((event, _rendezvous_guard)) = events_stream.next().await else {
unreachable!("Unexpected end to event stream")
};
info!("Recieved event: {:?}", event);
state = match (state, event) {
(ConsumerState::Ready, Event::Assign(tpl)) => {
ConsumerState::Consuming(spawn_actors(consumer.clone(), tpl))
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
(ConsumerState::Ready, Event::Revoke(_)) => {
unreachable!("Got partition revocation before the consumer has started")
}
(ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped,
(ConsumerState::Consuming { .. }, Event::Assign(_)) => {
unreachable!("Got partition assignment after consumer has started")
(ConsumerState::Consuming(actor_handles, mut tpl), Event::Assign(mut assigned_tpl)) => {
assert!(
tpl.is_disjoint(&assigned_tpl),
"Newly assigned TPL should be disjoint from TPL we're consuming from"
);
tpl.append(&mut assigned_tpl);
debug!(
"{} additional topic partitions added after assignment",
assigned_tpl.len()
);
actor_handles.shutdown(CALLBACK_DURATION).await;
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
(ConsumerState::Consuming(actor_handles), Event::Revoke(_)) => {
(ConsumerState::Consuming(actor_handles, mut tpl), Event::Revoke(revoked_tpl)) => {
assert!(
tpl.is_subset(&revoked_tpl),
"Revoked TPL should be a subset of TPL we're consuming from"
);
tpl.retain(|e| !revoked_tpl.contains(e));
debug!("{} topic partitions remaining after revocation", tpl.len());
actor_handles.shutdown(CALLBACK_DURATION).await;
debug!("Transitioning consumer state to Ready");
ConsumerState::Ready
if tpl.is_empty() {
ConsumerState::Ready
} else {
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
}
(ConsumerState::Consuming(actor_handles), Event::Shutdown) => {
(ConsumerState::Consuming(actor_handles, _), Event::Shutdown) => {
actor_handles.shutdown(CALLBACK_DURATION).await;
debug!("Signaling shutdown to client...");
shutdown_client.take();
Expand Down

0 comments on commit 9261674

Please sign in to comment.