Skip to content

Commit

Permalink
fix test compile
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jul 19, 2024
1 parent a79e22f commit f013ba3
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 32 deletions.
16 changes: 7 additions & 9 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,20 +179,18 @@ impl ControlStreamManager {
pub(super) async fn next_complete_barrier_response(
&mut self,
) -> (WorkerId, MetaResult<BarrierCompleteResponse>) {
loop {
{
let (worker_id, result) = pending_on_none(self.next_response()).await;
match result {
Ok(resp) => match resp.response {
Some(streaming_control_stream_response::Response::CompleteBarrier(resp)) => {
assert_eq!(worker_id, resp.worker_id);
return (worker_id, Ok(resp));
}
resp => {
break (
worker_id,
Err(anyhow!("get unexpected resp: {:?}", resp).into()),
);
(worker_id, Ok(resp))
}
resp => (
worker_id,
Err(anyhow!("get unexpected resp: {:?}", resp).into()),
),
},
Err(err) => {
let node = self
Expand All @@ -203,7 +201,7 @@ impl ControlStreamManager {
warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");
let errors = self.collect_errors(node.worker.id, err).await;
let err = merge_node_rpc_errors("get error from control stream", errors);
break (worker_id, Err(err));
(worker_id, Err(err))
}
}
}
Expand Down
14 changes: 10 additions & 4 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,14 +1039,15 @@ impl LocalBarrierManager {

#[cfg(test)]
pub(crate) mod barrier_test_utils {
use std::collections::HashMap;
use std::sync::Arc;

use assert_matches::assert_matches;
use futures::StreamExt;
use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response, InjectBarrierRequest,
StreamingControlStreamRequest, StreamingControlStreamResponse,
PbPartialGraphInfo, StreamingControlStreamRequest, StreamingControlStreamResponse,
};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand Down Expand Up @@ -1108,9 +1109,14 @@ pub(crate) mod barrier_test_utils {
InjectBarrierRequest {
request_id: "".to_string(),
barrier: Some(barrier.to_protobuf()),
actor_ids_to_send: actor_to_send.into_iter().collect(),
actor_ids_to_collect: actor_to_collect.into_iter().collect(),
table_ids_to_sync: vec![],
graph_info: HashMap::from_iter([(
u32::MAX,
PbPartialGraphInfo {
actor_ids_to_send: actor_to_send.into_iter().collect(),
actor_ids_to_collect: actor_to_collect.into_iter().collect(),
table_ids_to_sync: vec![],
},
)]),
},
)),
}))
Expand Down
33 changes: 17 additions & 16 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ impl PartialGraphManagedBarrierState {
barrier_await_tree_reg,
}
}

#[cfg(test)]
pub(crate) fn for_test() -> Self {
Self::new(
true,
StateStoreImpl::for_test(),
Arc::new(StreamingMetrics::unused()),
None,
)
}
}

pub(super) struct ManagedBarrierState {
Expand All @@ -260,15 +270,6 @@ pub(super) struct ManagedBarrierState {
}

impl ManagedBarrierState {
#[cfg(test)]
pub(crate) fn for_test() -> Self {
Self::new(
StateStoreImpl::for_test(),
Arc::new(StreamingMetrics::unused()),
None,
)
}

/// Create a barrier manager state. This will be called only once.
pub(super) fn new(
state_store: StateStoreImpl,
Expand Down Expand Up @@ -451,20 +452,20 @@ impl ManagedBarrierState {
})
}

pub(super) fn collect(&mut self, actor_id: ActorId, barrier: &Barrier) {
pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) {
let actor_states = self.actor_states.get_mut(&actor_id).expect("should exist");
let (prev_epoch, partial_graph_id) = actor_states
.inflight_barriers
.pop_first()
.expect("should not be empty");
assert_eq!(prev_epoch, barrier.epoch.prev);
assert_eq!(prev_epoch, epoch.prev);
if actor_states.is_stopped && actor_states.inflight_barriers.is_empty() {
self.actor_states.remove(&actor_id);
}
self.graph_states
.get_mut(&partial_graph_id)
.expect("should exist")
.collect(actor_id, barrier);
.collect(actor_id, epoch);
}
}

Expand Down Expand Up @@ -783,7 +784,7 @@ impl PartialGraphManagedBarrierState {

#[cfg(test)]
async fn pop_next_completed_epoch(&mut self) -> u64 {
let epoch = self.next_completed_epoch().await;
let epoch = poll_fn(|cx| self.poll_next_completed_epoch(cx)).await;
let _ = self.pop_completed_epoch(epoch).unwrap().unwrap();
epoch
}
Expand All @@ -796,11 +797,11 @@ mod tests {
use risingwave_common::util::epoch::test_epoch;

use crate::executor::Barrier;
use crate::task::barrier_manager::managed_state::ManagedBarrierState;
use crate::task::barrier_manager::managed_state::PartialGraphManagedBarrierState;

#[tokio::test]
async fn test_managed_state_add_actor() {
let mut managed_barrier_state = ManagedBarrierState::for_test();
let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
let barrier1 = Barrier::new_test_barrier(test_epoch(1));
let barrier2 = Barrier::new_test_barrier(test_epoch(2));
let barrier3 = Barrier::new_test_barrier(test_epoch(3));
Expand Down Expand Up @@ -850,7 +851,7 @@ mod tests {

#[tokio::test]
async fn test_managed_state_stop_actor() {
let mut managed_barrier_state = ManagedBarrierState::for_test();
let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
let barrier1 = Barrier::new_test_barrier(test_epoch(1));
let barrier2 = Barrier::new_test_barrier(test_epoch(2));
let barrier3 = Barrier::new_test_barrier(test_epoch(3));
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ impl PartialGraphId {
}
}

impl Into<u32> for PartialGraphId {
fn into(self) -> u32 {
self.0
impl From<PartialGraphId> for u32 {
fn from(val: PartialGraphId) -> u32 {
val.0
}
}

Expand Down

0 comments on commit f013ba3

Please sign in to comment.