Skip to content

Commit

Permalink
feat: sync regions between RegionServer and RegionAliveKeeper (#2417)
Browse files Browse the repository at this point in the history
* feat: sync regions between RegionServer and RegionAliveKeeper

* Apply suggestions from code review

Co-authored-by: JeremyHi <[email protected]>

* refactor: rename event name

* chore: apply suggestions

---------

Co-authored-by: JeremyHi <[email protected]>
  • Loading branch information
WenyXu and fengjiachun authored Sep 17, 2023
1 parent 98a40ba commit 5b08e03
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 9 deletions.
36 changes: 31 additions & 5 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ use crate::error::{
MissingMetasrvOptsSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu,
ShutdownInstanceSnafu,
};
use crate::event_listener::{
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
RegionServerEventReceiver,
};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::{new_metasrv_client, HeartbeatTask};
use crate::region_server::RegionServer;
Expand All @@ -63,6 +67,7 @@ pub struct Datanode {
opts: DatanodeOptions,
services: Option<Services>,
heartbeat_task: Option<HeartbeatTask>,
region_event_receiver: Option<RegionServerEventReceiver>,
region_server: RegionServer,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
}
Expand All @@ -77,9 +82,11 @@ impl Datanode {
self.start_services().await
}

pub async fn start_heartbeat(&self) -> Result<()> {
pub async fn start_heartbeat(&mut self) -> Result<()> {
if let Some(task) = &self.heartbeat_task {
task.start().await?;
// Safety: The event_receiver must exist.
let receiver = self.region_event_receiver.take().unwrap();
task.start(receiver).await?;
}
Ok(())
}
Expand Down Expand Up @@ -185,8 +192,24 @@ impl DatanodeBuilder {

// build and initialize region server
let log_store = Self::build_log_store(&self.opts).await?;
let region_server =
Self::new_region_server(&self.opts, self.plugins.clone(), log_store).await?;
let (region_event_listener, region_event_receiver) = match mode {
Mode::Distributed => {
let (tx, rx) = new_region_server_event_channel();
(Box::new(tx) as RegionServerEventListenerRef, Some(rx))
}
Mode::Standalone => (
Box::new(NoopRegionServerEventListener) as RegionServerEventListenerRef,
None,
),
};

let region_server = Self::new_region_server(
&self.opts,
self.plugins.clone(),
log_store,
region_event_listener,
)
.await?;
self.initialize_region_server(&region_server, kv_backend, matches!(mode, Mode::Standalone))
.await?;

Expand Down Expand Up @@ -219,6 +242,7 @@ impl DatanodeBuilder {
heartbeat_task,
region_server,
greptimedb_telemetry_task,
region_event_receiver,
})
}

Expand Down Expand Up @@ -276,6 +300,7 @@ impl DatanodeBuilder {
opts: &DatanodeOptions,
plugins: Arc<Plugins>,
log_store: Arc<RaftEngineLogStore>,
event_listener: RegionServerEventListenerRef,
) -> Result<RegionServer> {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
Expand All @@ -294,7 +319,8 @@ impl DatanodeBuilder {
.context(RuntimeResourceSnafu)?,
);

let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone());
let mut region_server =
RegionServer::new(query_engine.clone(), runtime.clone(), event_listener);
let object_store = store::new_object_store(opts).await?;
let engines = Self::build_store_engines(opts, log_store, object_store).await?;
for engine in engines {
Expand Down
67 changes: 67 additions & 0 deletions src/datanode/src/event_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_telemetry::error;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};

pub enum RegionServerEvent {
Registered(RegionId),
Deregistered(RegionId),
}

pub trait RegionServerEventListener: Sync + Send {
/// Called *after* a new region was created/opened.
fn on_region_registered(&self, _region_id: RegionId) {}

/// Called *after* a region was closed.
fn on_region_deregistered(&self, _region_id: RegionId) {}
}

pub type RegionServerEventListenerRef = Box<dyn RegionServerEventListener>;

pub struct NoopRegionServerEventListener;

impl RegionServerEventListener for NoopRegionServerEventListener {}

#[derive(Debug, Clone)]
pub struct RegionServerEventSender(pub(crate) UnboundedSender<RegionServerEvent>);

impl RegionServerEventListener for RegionServerEventSender {
fn on_region_registered(&self, region_id: RegionId) {
if let Err(e) = self.0.send(RegionServerEvent::Registered(region_id)) {
error!(
"Failed to send registering region: {region_id} event, source: {}",
e
);
}
}

fn on_region_deregistered(&self, region_id: RegionId) {
if let Err(e) = self.0.send(RegionServerEvent::Deregistered(region_id)) {
error!(
"Failed to send deregistering region: {region_id} event, source: {}",
e
);
}
}
}

pub struct RegionServerEventReceiver(pub(crate) UnboundedReceiver<RegionServerEvent>);

pub fn new_region_server_event_channel() -> (RegionServerEventSender, RegionServerEventReceiver) {
let (tx, rx) = mpsc::unbounded_channel();

(RegionServerEventSender(tx), RegionServerEventReceiver(rx))
}
29 changes: 28 additions & 1 deletion src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use self::handler::RegionHeartbeatResponseHandler;
use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
use crate::region_server::RegionServer;

pub(crate) mod handler;
Expand Down Expand Up @@ -136,7 +137,7 @@ impl HeartbeatTask {
}

/// Start heartbeat task, spawn background task.
pub async fn start(&self) -> Result<()> {
pub async fn start(&self, mut event_receiver: RegionServerEventReceiver) -> Result<()> {
let running = self.running.clone();
if running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
Expand Down Expand Up @@ -180,6 +181,32 @@ impl HeartbeatTask {
});
let epoch = self.region_alive_keeper.epoch();

let keeper = self.region_alive_keeper.clone();

common_runtime::spawn_bg(async move {
loop {
if !running.load(Ordering::Relaxed) {
info!("shutdown heartbeat task");
break;
}

match event_receiver.0.recv().await {
Some(RegionServerEvent::Registered(region_id)) => {
keeper.register_region(region_id).await;
}
Some(RegionServerEvent::Deregistered(region_id)) => {
keeper.deregister_region(region_id).await;
}
None => {
info!("region server event sender closed!");
break;
}
}
}
});

let running = self.running.clone();

common_runtime::spawn_bg(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod alive_keeper;
pub mod config;
pub mod datanode;
pub mod error;
pub mod event_listener;
mod greptimedb_telemetry;
pub mod heartbeat;
pub mod metrics;
Expand Down
23 changes: 20 additions & 3 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,25 @@ use crate::error::{
GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu,
RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;

#[derive(Clone)]
pub struct RegionServer {
inner: Arc<RegionServerInner>,
}

impl RegionServer {
pub fn new(query_engine: QueryEngineRef, runtime: Arc<Runtime>) -> Self {
pub fn new(
query_engine: QueryEngineRef,
runtime: Arc<Runtime>,
event_listener: RegionServerEventListenerRef,
) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(query_engine, runtime)),
inner: Arc::new(RegionServerInner::new(
query_engine,
runtime,
event_listener,
)),
}
}

Expand Down Expand Up @@ -185,15 +194,21 @@ struct RegionServerInner {
region_map: DashMap<RegionId, RegionEngineRef>,
query_engine: QueryEngineRef,
runtime: Arc<Runtime>,
event_listener: RegionServerEventListenerRef,
}

impl RegionServerInner {
pub fn new(query_engine: QueryEngineRef, runtime: Arc<Runtime>) -> Self {
pub fn new(
query_engine: QueryEngineRef,
runtime: Arc<Runtime>,
event_listener: RegionServerEventListenerRef,
) -> Self {
Self {
engines: RwLock::new(HashMap::new()),
region_map: DashMap::new(),
query_engine,
runtime,
event_listener,
}
}

Expand Down Expand Up @@ -251,12 +266,14 @@ impl RegionServerInner {
RegionChange::Register(_) => {
info!("Region {region_id} is registered to engine {engine_type}");
self.region_map.insert(region_id, engine);
self.event_listener.on_region_registered(region_id);
}
RegionChange::Deregisters => {
info!("Region {region_id} is deregistered from engine {engine_type}");
self.region_map
.remove(&region_id)
.map(|(id, engine)| engine.set_writable(id, false));
self.event_listener.on_region_deregistered(region_id);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use session::context::QueryContextRef;
use table::TableRef;
use tokio::sync::mpsc::{self, Receiver};

use crate::event_listener::NoopRegionServerEventListener;
use crate::region_server::RegionServer;
use crate::Instance;

Expand Down Expand Up @@ -142,5 +143,6 @@ pub fn mock_region_server() -> RegionServer {
RegionServer::new(
Arc::new(MockQueryEngine),
Arc::new(Runtime::builder().build().unwrap()),
Box::new(NoopRegionServerEventListener),
)
}

0 comments on commit 5b08e03

Please sign in to comment.