Skip to content

Commit

Permalink
refactor: use downgrading the region instead of closing region
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 4, 2023
1 parent 806400c commit 3223b72
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 113 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2b3ae45740a49ec6a0830d71fc09c3093aeb5fe7" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4377b4377cba3cd5881aa17c41030ffbfcab1b9b" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
// TODO(weny): https://github.com/GreptimeTeam/greptimedb/pull/2859
drop_if_exists: false,
}),
}),
DdlTask::AlterTable(task) => Task::AlterTableTask(PbAlterTableTask {
Expand Down
173 changes: 63 additions & 110 deletions src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

Expand All @@ -40,34 +39,29 @@ use crate::error::{self, Result};
use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
use crate::region_server::RegionServer;

const MAX_CLOSE_RETRY_TIMES: usize = 10;

/// [RegionAliveKeeper] manages all `CountdownTaskHandles`.
///
/// [RegionAliveKeeper] starts a `CountdownTask` for each region. When deadline is reached,
/// the region will be closed.
/// [RegionAliveKeeper] manages all [CountdownTaskHandle]s.
///
/// The deadline is controlled by Metasrv. It works like "lease" for regions: a Datanode submits its
/// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this
/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to
/// countdown.
/// [RegionAliveKeeper] starts a [CountdownTask] for each region. When the deadline is reached,
/// the status of region be set to "readonly", ensures there is no side-effect in the entity system.
///
/// On each lease extension, [RegionAliveKeeper] will reset the deadline to the corresponding time, and
/// set region's status to "writable".
/// The deadline is controlled by the meta server. Datanode will send its opened regions info to meta sever
/// via heartbeat. If the meta server decides some region could be resided in this Datanode,
/// it will renew the lease of region, a deadline of [CountdownTask] will be reset.
pub struct RegionAliveKeeper {
region_server: RegionServer,
tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
heartbeat_interval_millis: u64,
started: Arc<AtomicBool>,

/// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing
/// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically
/// non-decreasing). The heartbeat request will carry the duration since this epoch, and the
/// elapsed time when submitting heartbeats to the meta server (because [Instant] is monotonically
/// non-decreasing). The heartbeat requests will carry the duration since this epoch, and the
/// duration acts like an "invariant point" for region's keep alive lease.
epoch: Instant,
}

impl RegionAliveKeeper {
/// Returns an empty [RegionAliveKeeper].
pub fn new(region_server: RegionServer, heartbeat_interval_millis: u64) -> Self {
Self {
region_server,
Expand All @@ -82,26 +76,16 @@ impl RegionAliveKeeper {
self.tasks.lock().await.get(&region_id).cloned()
}

/// Add the countdown task for a specific region.
/// It will be ignored if the task exists.
pub async fn register_region(&self, region_id: RegionId) {
if self.find_handle(region_id).await.is_some() {
return;
}

let tasks = Arc::downgrade(&self.tasks);
let on_task_finished = async move {
if let Some(x) = tasks.upgrade() {
let _ = x.lock().await.remove(&region_id);
} // Else the countdown task handles map could be dropped because the keeper is dropped.
};
let handle = Arc::new(CountdownTaskHandle::new(
self.region_server.clone(),
region_id,
move |result: Option<bool>| {
info!(
"Deregister region: {region_id} after countdown task finished, result: {result:?}",
);
on_task_finished
},
));

let mut handles = self.tasks.lock().await;
Expand All @@ -118,13 +102,15 @@ impl RegionAliveKeeper {
}
}

/// Removes the countdown task for a specific region.
pub async fn deregister_region(&self, region_id: RegionId) {
if self.tasks.lock().await.remove(&region_id).is_some() {
info!("Deregister alive countdown for region {region_id}")
}
}

async fn keep_lived(&self, regions: &[GrantedRegion], deadline: Instant) {
/// Renews the lease of regions to `deadline`.
async fn new_region_leases(&self, regions: &[GrantedRegion], deadline: Instant) {
for region in regions {
let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
if let Some(handle) = self.find_handle(region_id).await {
Expand All @@ -138,6 +124,25 @@ impl RegionAliveKeeper {
}
}

async fn close_staled_region(&self, region_id: RegionId) {
let request = RegionRequest::Close(RegionCloseRequest {});
if let Err(e) = self.region_server.handle_request(region_id, request).await {
if e.status_code() != StatusCode::RegionNotFound {
// TODO(weny): Consider setting region to readonly.
error!(e; "Failed to close staled region {}",region_id);
}
}
}

/// Closes staled regions.
async fn close_staled_regions(&self, regions: &[u64]) {
info!("Closing staled regions: {regions:?}");
for region_id in regions {
self.close_staled_region(RegionId::from_u64(*region_id))
.await;
}
}

#[cfg(test)]
async fn deadline(&self, region_id: RegionId) -> Option<Instant> {
let mut deadline = None;
Expand Down Expand Up @@ -243,7 +248,11 @@ impl HeartbeatResponseHandler for RegionAliveKeeper {
let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);

self.keep_lived(&region_lease.regions, deadline).await;
self.new_region_leases(&region_lease.regions, deadline)
.await;
self.close_staled_regions(&region_lease.closeable_region_ids)
.await;

Ok(HandleControl::Continue)
}
}
Expand All @@ -269,19 +278,7 @@ struct CountdownTaskHandle {

impl CountdownTaskHandle {
/// Creates a new [CountdownTaskHandle] and starts the countdown task.
/// # Params
/// - `on_task_finished`: a callback to be invoked when the task is finished. Note that it will not
/// be invoked if the task is cancelled (by dropping the handle). This is because we want something
/// meaningful to be done when the task is finished, e.g. deregister the handle from the map.
/// While dropping the handle does not necessarily mean the task is finished.
fn new<Fut>(
region_server: RegionServer,
region_id: RegionId,
on_task_finished: impl FnOnce(Option<bool>) -> Fut + Send + 'static,
) -> Self
where
Fut: Future<Output = ()> + Send,
{
fn new(region_server: RegionServer, region_id: RegionId) -> Self {
let (tx, rx) = mpsc::channel(1024);

let mut countdown_task = CountdownTask {
Expand All @@ -290,8 +287,7 @@ impl CountdownTaskHandle {
rx,
};
let handler = common_runtime::spawn_bg(async move {
let result = countdown_task.run().await;
on_task_finished(result).await;
countdown_task.run().await;
});

Self {
Expand Down Expand Up @@ -354,8 +350,8 @@ struct CountdownTask {
}

impl CountdownTask {
// returns true if region closed successfully
async fn run(&mut self) -> Option<bool> {
///
async fn run(&mut self) {
// 30 years. See `Instant::far_future`.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);

Expand All @@ -370,8 +366,8 @@ impl CountdownTask {
command = self.rx.recv() => {
match command {
Some(CountdownCommand::Start(heartbeat_interval_millis)) => {
// Set first deadline in 4 heartbeats (roughly after 20 seconds from now if heartbeat
// interval is set to default 5 seconds), to make Datanode and Metasrv more tolerable to
// Set first deadline in 4 heartbeats (roughly after 12 seconds from now if heartbeat
// interval is set to default 3 seconds), to make Datanode and Metasrv more tolerable to
// network or other jitters during startup.
let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
countdown.set(tokio::time::sleep_until(first_deadline));
Expand All @@ -384,7 +380,7 @@ impl CountdownTask {

if countdown.deadline() < deadline {
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later",
"Reset deadline of region {region_id} to approximately {} seconds later.",
(deadline - Instant::now()).as_secs_f32(),
);
countdown.set(tokio::time::sleep_until(deadline));
Expand All @@ -409,38 +405,14 @@ impl CountdownTask {
}
}
() = &mut countdown => {
let result = self.close_region().await;
info!(
"Region {region_id} is closed, result: {result:?}. \
RegionAliveKeeper out.",
);
return Some(result);
info!("The region lease expired, set region {region_id} to readonly.");
let _ = self.region_server.set_writable(self.region_id, false);
// resets the countdown.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
countdown.as_mut().reset(far_future);
}
}
}
None
}

/// Returns if the region is closed successfully.
async fn close_region(&self) -> bool {
for retry in 0..MAX_CLOSE_RETRY_TIMES {
let request = RegionRequest::Close(RegionCloseRequest {});
match self
.region_server
.handle_request(self.region_id, request)
.await
{
Ok(_) => return true,
Err(e) if e.status_code() == StatusCode::RegionNotFound => return true,
// If region is failed to close, immediately retry. Maybe we should panic instead?
Err(e) => error!(e;
"Retry {retry}, failed to close region {}. \
For the integrity of data, retry closing and retry without wait.",
self.region_id,
),
}
}
false
}
}

Expand All @@ -457,19 +429,19 @@ mod test {
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server, 300));
let region_id = RegionId::new(1, 2);

// register a region before starting
// Register a region before starting.
alive_keeper.register_region(region_id).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());

alive_keeper.start(None).await.unwrap();

// started alive keeper should assign deadline to this region
// The started alive keeper should assign deadline to this region.
let deadline = alive_keeper.deadline(region_id).await.unwrap();
assert!(deadline >= Instant::now());

// extend lease then sleep
// Renew lease then sleep.
alive_keeper
.keep_lived(
.new_region_leases(
&[GrantedRegion {
region_id: region_id.as_u64(),
role: RegionRole::Leader.into(),
Expand All @@ -482,42 +454,34 @@ mod test {
let deadline = alive_keeper.deadline(region_id).await.unwrap();
assert!(deadline >= Instant::now());

// sleep to wait lease expired
// Sleep to wait lease expired.
tokio::time::sleep(Duration::from_millis(1000)).await;
assert!(alive_keeper.find_handle(region_id).await.is_none());
assert!(alive_keeper.find_handle(region_id).await.is_some());
}

#[tokio::test(flavor = "multi_thread")]
async fn countdown_task() {
let region_server = mock_region_server();

let (tx, rx) = oneshot::channel();
let countdown_handle = CountdownTaskHandle::new(region_server, RegionId::new(9999, 2));

let countdown_handle = CountdownTaskHandle::new(
region_server,
RegionId::new(9999, 2),
|result: Option<bool>| async move {
tx.send((Instant::now(), result)).unwrap();
},
);

// if countdown task is not started, its deadline is set to far future
// If countdown task is not started, its deadline is set to far future.
assert!(
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_secs(86400 * 365 * 29)
);

// the first deadline should be set to 4 * heartbeat_interval_millis
// we assert it to be greater than 3 * heartbeat_interval_millis to avoid flaky test
// The first deadline should be set to 4 * heartbeat_interval_millis.
// We assert it to be greater than 3 * heartbeat_interval_millis to avoid flaky test.
let heartbeat_interval_millis = 100;
countdown_handle.start(heartbeat_interval_millis).await;
assert!(
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
);

// reset deadline
// a nearer deadline will be ignored
// Reset deadline.
// A nearer deadline will be ignored.
countdown_handle
.reset_deadline(
RegionRole::Leader.into(),
Expand All @@ -529,7 +493,7 @@ mod test {
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
);

// only a farther deadline will be accepted
// Only a farther deadline will be accepted.
countdown_handle
.reset_deadline(
RegionRole::Leader.into(),
Expand All @@ -540,16 +504,5 @@ mod test {
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4)
);

// wait for countdown task to finish
let before_await = Instant::now();
let (finish_instant, result) = rx.await.unwrap();
// it returns `RegionNotFound`
assert_eq!(result, Some(true));
// this task should be finished after 5 * heartbeat_interval_millis
// we assert 4 times here
assert!(
finish_instant > before_await + Duration::from_millis(heartbeat_interval_millis * 4)
);
}
}
Loading

0 comments on commit 3223b72

Please sign in to comment.