Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TaskCenter] Replace TestCoreEnv with TestCoreEnv2 #2358

Merged
merged 9 commits into from
Nov 26, 2024
12 changes: 6 additions & 6 deletions benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures_util::{future, TryFutureExt};
use http::header::CONTENT_TYPE;
use http::Uri;
use pprof::flamegraph::Options;
use restate_core::{TaskCenter, TaskCenterBuilder, TaskKind};
use restate_core::{task_center, TaskCenter, TaskCenterBuilder, TaskKind};
use restate_node::Node;
use restate_rocksdb::RocksDbManager;
use restate_types::config::{
Expand Down Expand Up @@ -86,27 +86,27 @@ pub fn discover_deployment(current_thread_rt: &Runtime, address: Uri) {
.is_success(),);
}

pub fn spawn_restate(config: Configuration) -> TaskCenter {
pub fn spawn_restate(config: Configuration) -> task_center::Handle {
if rlimit::increase_nofile_limit(u64::MAX).is_err() {
warn!("Failed to increase the number of open file descriptors limit.");
}

let tc = TaskCenterBuilder::default()
.options(config.common.clone())
.build()
.expect("task_center builds");
let cloned_tc = tc.clone();
.expect("task_center builds")
.to_handle();
restate_types::config::set_current_config(config.clone());
let updateable_config = Configuration::updateable();

tc.block_on(async {
RocksDbManager::init(Constant::new(config.common));

tc.spawn(TaskKind::SystemBoot, "restate", None, async move {
TaskCenter::spawn(TaskKind::SystemBoot, "restate", async move {
let node = Node::create(updateable_config)
.await
.expect("Restate node must build");
cloned_tc.run_in_scope("startup", None, node.start()).await
node.start().await
})
.unwrap();
});
Expand Down
14 changes: 3 additions & 11 deletions crates/admin/src/cluster_controller/cluster_state_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use restate_types::time::MillisSinceEpoch;
use restate_types::Version;

pub struct ClusterStateRefresher<T> {
metadata: Metadata,
network_sender: Networking<T>,
get_state_router: RpcRouter<GetNodeState>,
in_flight_refresh: Option<TaskHandle<anyhow::Result<()>>>,
Expand All @@ -39,11 +38,7 @@ pub struct ClusterStateRefresher<T> {
}

impl<T: TransportConnect> ClusterStateRefresher<T> {
pub fn new(
metadata: Metadata,
network_sender: Networking<T>,
router_builder: &mut MessageRouterBuilder,
) -> Self {
pub fn new(network_sender: Networking<T>, router_builder: &mut MessageRouterBuilder) -> Self {
let get_state_router = RpcRouter::new(router_builder);

let initial_state = ClusterState {
Expand All @@ -57,7 +52,6 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
watch::channel(Arc::from(initial_state));

Self {
metadata,
network_sender,
get_state_router,
in_flight_refresh: None,
Expand Down Expand Up @@ -99,7 +93,6 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
self.get_state_router.clone(),
self.network_sender.clone(),
Arc::clone(&self.cluster_state_update_tx),
self.metadata.clone(),
)?;

Ok(())
Expand All @@ -109,10 +102,10 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
get_state_router: RpcRouter<GetNodeState>,
network_sender: Networking<T>,
cluster_state_tx: Arc<watch::Sender<Arc<ClusterState>>>,
metadata: Metadata,
) -> Result<Option<TaskHandle<anyhow::Result<()>>>, ShutdownError> {
let refresh = async move {
let last_state = Arc::clone(&cluster_state_tx.borrow());
let metadata = Metadata::current();
// make sure we have a partition table that equals or newer than last refresh
let partition_table_version = metadata
.wait_for_version(
Expand Down Expand Up @@ -228,10 +221,9 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
Ok(())
};

let handle = TaskCenter::current().spawn_unmanaged(
let handle = TaskCenter::spawn_unmanaged(
restate_core::TaskKind::Disposable,
"cluster-state-refresh",
None,
refresh,
)?;

Expand Down
39 changes: 15 additions & 24 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError};
use restate_core::metadata_store::{
retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError,
};
use restate_core::{metadata, task_center, Metadata, MetadataWriter, ShutdownError};
use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt};
use restate_types::config::Configuration;
use restate_types::errors::GenericError;
use restate_types::identifiers::PartitionId;
Expand Down Expand Up @@ -324,7 +324,7 @@ fn try_provisioning(
#[cfg(feature = "replicated-loglet")]
ProviderKind::Replicated => build_new_replicated_loglet_configuration(
ReplicatedLogletId::new(log_id, SegmentIndex::OLDEST),
metadata().nodes_config_ref().as_ref(),
&Metadata::with_current(|m| m.nodes_config_ref()),
observed_cluster_state,
None,
node_set_selector_hints.preferred_sequencer(&log_id),
Expand Down Expand Up @@ -494,7 +494,7 @@ impl LogletConfiguration {
LogletConfiguration::Replicated(configuration) => {
build_new_replicated_loglet_configuration(
configuration.loglet_id.next(),
&metadata().nodes_config_ref(),
&Metadata::with_current(|m| m.nodes_config_ref()),
observed_cluster_state,
Some(configuration),
preferred_sequencer,
Expand Down Expand Up @@ -621,7 +621,7 @@ struct LogsControllerInner {
logs_state: HashMap<LogId, LogState, Xxh3Builder>,
logs_write_in_progress: Option<Version>,

// We are storing the logs explicitly (not relying on metadata()) because we need a fixed
// We are storing the logs explicitly (not relying on Metadata::current()) because we need a fixed
// snapshot to keep logs_state in sync.
current_logs: Arc<Logs>,
retry_policy: RetryPolicy,
Expand Down Expand Up @@ -1024,15 +1024,11 @@ impl LogsController {
Event::LogsTailUpdates { updates }
};

let tc = task_center();
self.async_operations.spawn(async move {
tc.run_in_scope(
"log-controller-refresh-tail",
None,
find_tail.instrument(trace_span!("scheduled-find-tail")),
)
.await
});
self.async_operations.spawn(
find_tail
.instrument(trace_span!("scheduled-find-tail"))
.in_current_tc(),
);
}

pub fn on_observed_cluster_state_update(
Expand Down Expand Up @@ -1093,12 +1089,10 @@ impl LogsController {
logs: Arc<Logs>,
mut debounce: Option<RetryIter<'static>>,
) {
let tc = task_center().clone();
let metadata_store_client = self.metadata_store_client.clone();
let metadata_writer = self.metadata_writer.clone();

self.async_operations.spawn(async move {
tc.run_in_scope("logs-controller-write-logs", None, async {
if let Some(debounce) = &mut debounce {
let delay = debounce.next().unwrap_or(FALLBACK_MAX_RETRY_DELAY);
debug!(?delay, %previous_version, "Wait before attempting to write logs");
Expand Down Expand Up @@ -1153,9 +1147,7 @@ impl LogsController {

let version = logs.version();
Event::WriteLogsSucceeded(version)
})
.await
});
}.in_current_tc());
}

fn seal_log(
Expand All @@ -1164,13 +1156,12 @@ impl LogsController {
segment_index: SegmentIndex,
mut debounce: Option<RetryIter<'static>>,
) {
let tc = task_center().clone();
let bifrost = self.bifrost.clone();
let metadata_store_client = self.metadata_store_client.clone();
let metadata_writer = self.metadata_writer.clone();

self.async_operations.spawn(async move {
tc.run_in_scope("logs-controller-seal-log", None, async {
self.async_operations.spawn(
async move {
if let Some(debounce) = &mut debounce {
let delay = debounce.next().unwrap_or(FALLBACK_MAX_RETRY_DELAY);
debug!(?delay, %log_id, %segment_index, "Wait before attempting to seal log");
Expand Down Expand Up @@ -1205,9 +1196,9 @@ impl LogsController {
}
}
}
})
.await
});
}
.in_current_tc(),
);
}

pub async fn run_async_operations(&mut self) -> Result<Never> {
Expand Down
Loading
Loading