Skip to content

Commit

Permalink
[TaskCenter] minor followups (#2363)
Browse files Browse the repository at this point in the history
Minor follow ups from code review
  • Loading branch information
AhmedSoliman authored Nov 26, 2024
1 parent 2ac41ac commit 07882c3
Show file tree
Hide file tree
Showing 13 changed files with 26 additions and 19 deletions.
2 changes: 1 addition & 1 deletion benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub fn spawn_restate(config: Configuration) -> task_center::Handle {
.options(config.common.clone())
.build()
.expect("task_center builds")
.to_handle();
.into_handle();
restate_types::config::set_current_config(config.clone());
let updateable_config = Configuration::updateable();

Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/benches/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn spawn_environment(
.options(config.common.clone())
.build()
.expect("task_center builds")
.to_handle();
.into_handle();

async {
restate_types::config::set_current_config(config.clone());
Expand Down
4 changes: 2 additions & 2 deletions crates/core/derive/src/tc_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,8 @@ fn parse_knobs(mut input: ItemFn, config: FinalConfig) -> TokenStream {
.build()
.expect("Failed building task-center");

let ret = rt.block_on(#body_ident.in_tc(&task_center.handle()));
rt.block_on(task_center.to_handle().shutdown_node("completed", 0));
let ret = rt.block_on(#body_ident.in_tc(&task_center.to_handle()));
rt.block_on(task_center.into_handle().shutdown_node("completed", 0));
ret
}
};
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ mod tests {
F: Fn(&Metadata) -> Version,
S: Fn(&mut T, Version),
{
let tc = TaskCenterBuilder::default().build()?.to_handle();
let tc = TaskCenterBuilder::default().build()?.into_handle();
tc.block_on(async move {
let metadata_builder = MetadataBuilder::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();
Expand Down Expand Up @@ -682,7 +682,7 @@ mod tests {
F: Fn(&Metadata) -> Version,
I: Fn(&mut T),
{
let tc = TaskCenterBuilder::default().build()?.to_handle();
let tc = TaskCenterBuilder::default().build()?.into_handle();
tc.block_on(async move {
let metadata_builder = MetadataBuilder::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();
Expand Down
5 changes: 1 addition & 4 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,6 @@ impl TaskCenterInner {
std::boxed::Box<dyn std::any::Any + std::marker::Send>,
>,
) {
//let inner = self.inner.clone();
// Remove our entry from the tasks map.
let Some(task) = self.managed_tasks.lock().remove(&task_id) else {
// This can happen if the task ownership was taken by calling take_task(id);
Expand Down Expand Up @@ -788,7 +787,6 @@ impl TaskCenterInner {
}

async fn shutdown_node(self: &Arc<Self>, reason: &str, exit_code: i32) {
//let inner = self.inner.clone();
if self
.shutdown_requested
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
Expand Down Expand Up @@ -861,7 +859,6 @@ impl TaskCenterInner {
kind: Option<TaskKind>,
partition_id: Option<PartitionId>,
) {
//let inner = self.inner.clone();
let mut victims = Vec::new();

{
Expand Down Expand Up @@ -1037,7 +1034,7 @@ mod tests {
.default_runtime_handle(tokio::runtime::Handle::current())
.ingress_runtime_handle(tokio::runtime::Handle::current())
.build()?
.to_handle();
.into_handle();
let start = tokio::time::Instant::now();
tc.spawn(TaskKind::RoleRunner, "worker-role", async {
info!("Hello async");
Expand Down
14 changes: 12 additions & 2 deletions crates/core/src/task_center/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,23 @@ impl OwnedHandle {
}
}

pub fn handle(&self) -> Handle {
pub fn to_handle(&self) -> Handle {
Handle::new(&self.inner)
}

pub fn to_handle(self) -> Handle {
pub fn into_handle(self) -> Handle {
Handle { inner: self.inner }
}

/// Sets the current task_center but doesn't create a task. Use this when you need to run a
/// future within task_center scope.
pub fn block_on<F, O>(&self, future: F) -> O
where
F: Future<Output = O>,
{
self.inner.block_on(future)
}

/// The exit code that the process should exit with.
pub fn exit_code(&self) -> i32 {
self.inner.current_exit_code.load(Ordering::Relaxed)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/task_center/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl TaskContext {
where
F: FnOnce(&Self) -> R,
{
TASK_CONTEXT.try_with(|tc| f(tc)).ok()
TASK_CONTEXT.try_with(|ctx| f(ctx)).ok()
}

/// Access to current task-center task context
Expand Down
2 changes: 1 addition & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ publish = false
[features]
default = []
memory-loglet = ["restate-bifrost/memory-loglet", "restate-admin/memory-loglet"]
replicated-loglet = ["restate-bifrost/replicated-loglet"]
replicated-loglet = ["restate-bifrost/replicated-loglet", "restate-admin/replicated-loglet"]
options_schema = [
"dep:schemars",
"restate-admin/options_schema",
Expand Down
2 changes: 1 addition & 1 deletion crates/partition-store/benches/basic_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn basic_writing_reading_benchmark(c: &mut Criterion) {
.default_runtime_handle(rt.handle().clone())
.build()
.expect("task_center builds")
.to_handle();
.into_handle();

let worker_options = WorkerOptions::default();
tc.run_sync(|| RocksDbManager::init(Constant::new(CommonOptions::default())));
Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ fn main() {
.options(Configuration::pinned().common.clone())
.build()
.expect("task_center builds");
tc.handle().block_on({
tc.block_on({
async move {
// Apply tracing config globally
// We need to apply this first to log correctly
Expand Down
2 changes: 1 addition & 1 deletion tools/bifrost-benchpress/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ fn spawn_environment(config: Live<Configuration>, num_logs: u16) -> (task_center
.options(config.pinned().common.clone())
.build()
.expect("task_center builds")
.to_handle();
.into_handle();

let bifrost = tc.block_on(async move {
let metadata_builder = MetadataBuilder::default();
Expand Down
2 changes: 1 addition & 1 deletion tools/restatectl/src/environment/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
.options(config.common.clone())
.build()
.expect("task_center builds")
.to_handle();
.into_handle();

let result = task_center.run_sync(|| fn_body(config)).await;

Expand Down
2 changes: 1 addition & 1 deletion tools/xtask/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async fn main() -> anyhow::Result<()> {
let tc = TaskCenterBuilder::default_for_tests()
.build()
.expect("building task-center should not fail")
.to_handle();
.into_handle();
let task = env::args().nth(1);
match task {
None => print_help(),
Expand Down

0 comments on commit 07882c3

Please sign in to comment.