Skip to content

Commit

Permalink
[TaskCenter][7/n] Migration of worker crate
Browse files Browse the repository at this point in the history
- Removes explicit task-center/metadata from worker
- Migrates worker tests to restate_worker
  • Loading branch information
AhmedSoliman committed Nov 25, 2024
1 parent 9a422ca commit 7b07c93
Show file tree
Hide file tree
Showing 13 changed files with 378 additions and 512 deletions.
3 changes: 1 addition & 2 deletions crates/bifrost/src/background_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,12 @@ where
/// behaviour to the owner of [`AppenderHandle`] to drain or drop when appropriate.
pub fn start(
self,
task_center: TaskCenter,
name: &'static str,
partition_id: Option<PartitionId>,
) -> Result<AppenderHandle<T>, ShutdownError> {
let (tx, rx) = tokio::sync::mpsc::channel(self.queue_capacity);

let handle = task_center.spawn_unmanaged(
let handle = TaskCenter::current().spawn_unmanaged(
restate_core::TaskKind::BifrostAppender,
name,
partition_id,
Expand Down
6 changes: 4 additions & 2 deletions crates/wal-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use restate_bifrost::Bifrost;
use restate_core::{metadata, ShutdownError};
use restate_core::{Metadata, ShutdownError};
use restate_storage_api::deduplication_table::DedupInformation;
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey};
use restate_types::invocation::{
Expand Down Expand Up @@ -237,7 +237,9 @@ pub async fn append_envelope_to_bifrost(
) -> Result<(LogId, Lsn), Error> {
let partition_id = {
// make sure we drop pinned partition table before awaiting
let partition_table = metadata().wait_for_partition_table(Version::MIN).await?;
let partition_table = Metadata::current()
.wait_for_partition_table(Version::MIN)
.await?;
partition_table.find_partition_id(envelope.partition_key())?
};

Expand Down
2 changes: 0 additions & 2 deletions crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,8 @@ impl Worker {
.await?;

let partition_processor_manager = PartitionProcessorManager::new(
task_center(),
health_status,
updateable_config.clone(),
metadata.clone(),
metadata_store_client,
partition_store_manager.clone(),
router_builder,
Expand Down
163 changes: 77 additions & 86 deletions crates/worker/src/partition/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,20 @@ use tokio::time::MissedTickBehavior;
use tracing::{debug, instrument, warn};

use restate_bifrost::Bifrost;
use restate_core::cancellation_watcher;
use restate_core::{cancellation_watcher, Metadata};
use restate_storage_api::invocation_status_table::{
InvocationStatus, ReadOnlyInvocationStatusTable,
};
use restate_types::identifiers::WithPartitionKey;
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey};
use restate_types::invocation::PurgeInvocationRequest;
use restate_types::GenerationalNodeId;
use restate_wal_protocol::{
append_envelope_to_bifrost, Command, Destination, Envelope, Header, Source,
};

pub(super) struct Cleaner<Storage> {
partition_id: PartitionId,
leader_epoch: LeaderEpoch,
node_id: GenerationalNodeId,
partition_key_range: RangeInclusive<PartitionKey>,
storage: Storage,
bifrost: Bifrost,
Expand All @@ -47,7 +45,6 @@ where
pub(super) fn new(
partition_id: PartitionId,
leader_epoch: LeaderEpoch,
node_id: GenerationalNodeId,
storage: Storage,
bifrost: Bifrost,
partition_key_range: RangeInclusive<PartitionKey>,
Expand All @@ -56,33 +53,32 @@ where
Self {
partition_id,
leader_epoch,
node_id,
partition_key_range,
storage,
bifrost,
cleanup_interval,
}
}

#[instrument(skip_all, fields(restate.node = %self.node_id, restate.partition.id = %self.partition_id))]
#[instrument(skip_all, fields(restate.partition.id = %self.partition_id))]
pub(super) async fn run(self) -> anyhow::Result<()> {
let Self {
partition_id,
leader_epoch,
node_id,
partition_key_range,
storage,
bifrost,
cleanup_interval,
} = self;
debug!("Running cleaner");

let my_node_id = Metadata::with_current(|m| m.my_node_id());
let bifrost_envelope_source = Source::Processor {
partition_id,
partition_key: None,
leader_epoch,
node_id: node_id.as_plain(),
generational_node_id: Some(node_id),
node_id: my_node_id.as_plain(),
generational_node_id: Some(my_node_id),
};

let mut interval = tokio::time::interval(cleanup_interval);
Expand Down Expand Up @@ -172,7 +168,7 @@ mod tests {

use futures::{stream, Stream};
use googletest::prelude::*;
use restate_core::{TaskCenter, TaskCenterFutureExt, TaskKind, TestCoreEnvBuilder};
use restate_core::{Metadata, TaskCenter, TaskKind, TestCoreEnvBuilder2};
use restate_storage_api::invocation_status_table::{
CompletedInvocation, InFlightInvocationMetadata, InvocationStatus,
};
Expand Down Expand Up @@ -215,97 +211,92 @@ mod tests {
}

// Start paused makes sure the timer is immediately fired
#[test(tokio::test(start_paused = true))]
#[test(restate_core::test(start_paused = true))]
pub async fn cleanup_works() {
let env = TestCoreEnvBuilder::with_incoming_only_connector()
let _env = TestCoreEnvBuilder2::with_incoming_only_connector()
.set_partition_table(PartitionTable::with_equally_sized_partitions(
Version::MIN,
1,
))
.build()
.await;
async {
let bifrost = Bifrost::init_in_memory().await;
let bifrost = Bifrost::init_in_memory().await;

let expired_invocation =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());
let not_expired_invocation_1 =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());
let not_expired_invocation_2 =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());
let not_completed_invocation =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());
let expired_invocation =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());
let not_expired_invocation_1 =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());
let not_expired_invocation_2 =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());
let not_completed_invocation =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());

let mock_storage = MockInvocationStatusReader(vec![
(
expired_invocation,
InvocationStatus::Completed(CompletedInvocation {
completion_retention_duration: Duration::ZERO,
..CompletedInvocation::mock_neo()
}),
),
(
not_expired_invocation_1,
InvocationStatus::Completed(CompletedInvocation {
completion_retention_duration: Duration::MAX,
..CompletedInvocation::mock_neo()
}),
),
(
not_expired_invocation_2,
// Old status invocations are still processed with the cleanup timer in the PP
InvocationStatus::Completed(CompletedInvocation::mock_old()),
),
(
not_completed_invocation,
InvocationStatus::Invoked(InFlightInvocationMetadata::mock()),
),
]);
let mock_storage = MockInvocationStatusReader(vec![
(
expired_invocation,
InvocationStatus::Completed(CompletedInvocation {
completion_retention_duration: Duration::ZERO,
..CompletedInvocation::mock_neo()
}),
),
(
not_expired_invocation_1,
InvocationStatus::Completed(CompletedInvocation {
completion_retention_duration: Duration::MAX,
..CompletedInvocation::mock_neo()
}),
),
(
not_expired_invocation_2,
// Old status invocations are still processed with the cleanup timer in the PP
InvocationStatus::Completed(CompletedInvocation::mock_old()),
),
(
not_completed_invocation,
InvocationStatus::Invoked(InFlightInvocationMetadata::mock()),
),
]);

TaskCenter::current()
.spawn(
TaskKind::Cleaner,
"cleaner",
Some(PartitionId::MIN),
Cleaner::new(
PartitionId::MIN,
LeaderEpoch::INITIAL,
GenerationalNodeId::new(1, 1),
mock_storage,
bifrost.clone(),
RangeInclusive::new(PartitionKey::MIN, PartitionKey::MAX),
Duration::from_secs(1),
)
.run(),
TaskCenter::current()
.spawn(
TaskKind::Cleaner,
"cleaner",
Some(PartitionId::MIN),
Cleaner::new(
PartitionId::MIN,
LeaderEpoch::INITIAL,
mock_storage,
bifrost.clone(),
RangeInclusive::new(PartitionKey::MIN, PartitionKey::MAX),
Duration::from_secs(1),
)
.unwrap();
.run(),
)
.unwrap();

// By yielding once we let the cleaner task run, and perform the cleanup
tokio::task::yield_now().await;
// By yielding once we let the cleaner task run, and perform the cleanup
tokio::task::yield_now().await;

// All the invocation ids were created with same partition keys, hence same partition id.
let partition_id = env
.metadata
.partition_table_snapshot()
// All the invocation ids were created with same partition keys, hence same partition id.
let partition_id = Metadata::with_current(|m| {
m.partition_table_snapshot()
.find_partition_id(expired_invocation.partition_key())
.unwrap();
})
.unwrap();

let mut log_entries = bifrost.read_all(partition_id.into()).await.unwrap();
let bifrost_message = log_entries
.remove(0)
.try_decode::<Envelope>()
.unwrap()
.unwrap();
let mut log_entries = bifrost.read_all(partition_id.into()).await.unwrap();
let bifrost_message = log_entries
.remove(0)
.try_decode::<Envelope>()
.unwrap()
.unwrap();

assert_that!(
bifrost_message.command,
pat!(Command::PurgeInvocation(pat!(PurgeInvocationRequest {
invocation_id: eq(expired_invocation)
})))
);
assert_that!(log_entries, empty());
}
.in_tc(&env.tc)
.await;
assert_that!(
bifrost_message.command,
pat!(Command::PurgeInvocation(pat!(PurgeInvocationRequest {
invocation_id: eq(expired_invocation)
})))
);
assert_that!(log_entries, empty());
}
}
Loading

0 comments on commit 7b07c93

Please sign in to comment.