Skip to content

Commit

Permalink
feat(meta): add event log service (#13392)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Nov 23, 2023
1 parent 53c5250 commit 38a78d8
Show file tree
Hide file tree
Showing 27 changed files with 681 additions and 8 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,7 @@ set -e
cargo nextest run \
--config "target.'cfg(all())'.rustflags = ['--cfg=madsim']" \
--features fail/failpoints \
-p risingwave_simulation \
"$@"
"""
Expand All @@ -980,6 +981,7 @@ set -e
cargo nextest archive \
--config "target.'cfg(all())'.rustflags = ['--cfg=madsim']" \
--features fail/failpoints \
-p risingwave_simulation \
--archive-file simulation-it-test.tar.zst \
"$@"
Expand Down
39 changes: 39 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,42 @@ message GetServingVnodeMappingsResponse {
service ServingService {
rpc GetServingVnodeMappings(GetServingVnodeMappingsRequest) returns (GetServingVnodeMappingsResponse);
}

message EventLog {
message EventMetaNodeStart {
string advertise_addr = 1;
string listen_addr = 2;
string opts = 3;
}
message EventCreateStreamJobFail {
uint32 id = 1;
string name = 2;
string definition = 3;
string error = 4;
}
message EventDirtyStreamJobClear {
uint32 id = 1;
string name = 2;
string definition = 3;
string error = 4;
}
// Event logs identifier, which should be populated by event log service.
optional string unique_id = 1;
// Processing time, which should be populated by event log service.
optional uint64 timestamp = 2;
oneof event {
EventCreateStreamJobFail create_stream_job_fail = 3;
EventDirtyStreamJobClear dirty_stream_job_clear = 4;
EventMetaNodeStart meta_node_start = 5;
}
}

message ListEventLogRequest {}

message ListEventLogResponse {
repeated EventLog event_logs = 1;
}

service EventLogService {
rpc ListEventLog(ListEventLogRequest) returns (ListEventLogResponse);
}
14 changes: 14 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ pub struct MetaConfig {

#[serde(default)]
pub compaction_config: CompactionConfig,

#[serde(default = "default::meta::event_log_enabled")]
pub event_log_enabled: bool,
/// Keeps the latest N events per channel.
#[serde(default = "default::meta::event_log_channel_max_size")]
pub event_log_channel_max_size: u32,
}

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -975,6 +981,14 @@ pub mod default {
pub fn compaction_task_max_heartbeat_interval_secs() -> u64 {
60 // 1min
}

pub fn event_log_enabled() -> bool {
true
}

pub fn event_log_channel_max_size() -> u32 {
10
}
}

pub mod server {
Expand Down
2 changes: 2 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ partition_vnode_count = 64
table_write_throughput_threshold = 16777216
min_table_split_write_throughput = 4194304
compaction_task_max_heartbeat_interval_secs = 60
event_log_enabled = true
event_log_channel_max_size = 10

[meta.compaction_config]
max_bytes_for_level_base = 536870912
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ prepare_sys_catalog! {
{ BuiltinCatalog::Table(&RW_HUMMOCK_BRANCHED_OBJECTS), read_hummock_branched_objects await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_COMPACTION_GROUP_CONFIGS), read_hummock_compaction_group_configs await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_META_CONFIGS), read_hummock_meta_configs await},
{ BuiltinCatalog::Table(&RW_EVENT_LOGS), read_event_logs await},
{ BuiltinCatalog::Table(&RW_HUMMOCK_COMPACT_TASK_ASSIGNMEN), read_hummock_compaction_status await },
{ BuiltinCatalog::Table(&RW_DESCRIPTION), read_rw_description },
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod rw_connections;
mod rw_databases;
mod rw_ddl_progress;
mod rw_description;
mod rw_event_logs;
mod rw_fragments;
mod rw_functions;
mod rw_hummock_branched_objects;
Expand Down Expand Up @@ -54,6 +55,7 @@ pub use rw_connections::*;
pub use rw_databases::*;
pub use rw_ddl_progress::*;
pub use rw_description::*;
pub use rw_event_logs::*;
pub use rw_fragments::*;
pub use rw_functions::*;
pub use rw_hummock_branched_objects::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl, Timestamptz};
use risingwave_pb::meta::event_log::Event;
use serde_json::json;

use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};

pub const RW_EVENT_LOGS: BuiltinTable = BuiltinTable {
name: "rw_event_logs",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &[
(DataType::Varchar, "unique_id"),
(DataType::Timestamptz, "timestamp"),
(DataType::Varchar, "event_type"),
(DataType::Jsonb, "info"),
],
pk: &[0],
};

impl SysCatalogReaderImpl {
pub async fn read_event_logs(&self) -> Result<Vec<OwnedRow>> {
let configs = self
.meta_client
.list_event_log()
.await?
.into_iter()
.sorted_by(|a, b| a.timestamp.cmp(&b.timestamp))
.map(|e| {
let ts = Timestamptz::from_millis(e.timestamp.unwrap() as i64).unwrap();
let event_type = event_type(e.event.as_ref().unwrap());
OwnedRow::new(vec![
Some(ScalarImpl::Utf8(e.unique_id.to_owned().unwrap().into())),
Some(ScalarImpl::Timestamptz(ts)),
Some(ScalarImpl::Utf8(event_type.into())),
Some(ScalarImpl::Jsonb(json!(e).into())),
])
})
.collect_vec();
Ok(configs)
}
}

fn event_type(e: &Event) -> String {
match e {
Event::CreateStreamJobFail(_) => "CREATE_STREAM_JOB_FAIL",
Event::DirtyStreamJobClear(_) => "DIRTY_STREAM_JOB_CLEAR",
Event::MetaNodeStart(_) => "META_NODE_START",
}
.into()
}
6 changes: 6 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::EventLog;
use risingwave_rpc_client::error::Result;
use risingwave_rpc_client::{HummockMetaClient, MetaClient};

Expand Down Expand Up @@ -99,6 +100,7 @@ pub trait FrontendMetaClient: Send + Sync {

async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;

async fn list_event_log(&self) -> Result<Vec<EventLog>>;
async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>>;
async fn list_all_nodes(&self) -> Result<Vec<WorkerNode>>;
}
Expand Down Expand Up @@ -246,6 +248,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.list_hummock_meta_config().await
}

async fn list_event_log(&self) -> Result<Vec<EventLog>> {
self.0.list_event_log().await
}

async fn list_compact_task_assignment(&self) -> Result<Vec<CompactTaskAssignment>> {
self.0.rise_ctl_list_compact_task_assignment().await
}
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::SystemParams;
use risingwave_pb::meta::{EventLog, SystemParams};
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_pb::user::update_user_request::UpdateField;
use risingwave_pb::user::{GrantPrivilege, UserInfo};
Expand Down Expand Up @@ -924,6 +924,10 @@ impl FrontendMetaClient for MockFrontendMetaClient {
unimplemented!()
}

async fn list_event_log(&self) -> RpcResult<Vec<EventLog>> {
unimplemented!()
}

async fn list_compact_task_assignment(&self) -> RpcResult<Vec<CompactTaskAssignment>> {
unimplemented!()
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
sync-point = { path = "../utils/sync-point" }
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
Expand Down
2 changes: 2 additions & 0 deletions src/meta/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ sea-orm = { version = "0.12.0", features = [
"runtime-tokio-native-tls",
"macros",
] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
Expand Down
2 changes: 2 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.meta
.compaction_task_max_heartbeat_interval_secs,
compaction_config: Some(config.meta.compaction_config),
event_log_enabled: config.meta.event_log_enabled,
event_log_channel_max_size: config.meta.event_log_channel_max_size,
advertise_addr: opts.advertise_addr,
},
config.system.into_init_system_params(),
Expand Down
17 changes: 17 additions & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ pub struct MetaStoreSqlBackend {
}

use risingwave_meta::MetaStoreBackend;
use risingwave_meta_service::event_log_service::EventLogServiceImpl;
use risingwave_meta_service::AddressInfo;
use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;

pub async fn rpc_serve(
address_info: AddressInfo,
Expand Down Expand Up @@ -619,6 +621,7 @@ pub async fn start_service_as_election_leader(
let serving_srv =
ServingServiceImpl::new(serving_vnode_mapping.clone(), fragment_manager.clone());
let cloud_srv = CloudServiceImpl::new(catalog_manager.clone(), aws_cli);
let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());

if let Some(prometheus_addr) = address_info.prometheus_addr {
MetricsManager::boot_metrics_service(prometheus_addr.to_string())
Expand Down Expand Up @@ -706,6 +709,10 @@ pub async fn start_service_as_election_leader(
tracing::info!("Telemetry didn't start due to meta backend or config");
}

if let Some(pair) = env.event_log_manager_ref().take_join_handle() {
sub_tasks.push(pair);
}

let shutdown_all = async move {
let mut handles = Vec::with_capacity(sub_tasks.len());

Expand Down Expand Up @@ -743,6 +750,15 @@ pub async fn start_service_as_election_leader(
tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
tracing::info!("Starting meta services");

let event = risingwave_pb::meta::event_log::EventMetaNodeStart {
advertise_addr: address_info.advertise_addr,
listen_addr: address_info.listen_addr.to_string(),
opts: serde_json::to_string(&env.opts).unwrap(),
};
env.event_log_manager_ref().add_event_logs(vec![
risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
]);

tonic::transport::Server::builder()
.layer(MetricsMiddlewareLayer::new(meta_metrics))
.layer(TracingExtractLayer::new())
Expand All @@ -764,6 +780,7 @@ pub async fn start_service_as_election_leader(
.add_service(ServingServiceServer::new(serving_srv))
.add_service(CloudServiceServer::new(cloud_srv))
.add_service(SinkCoordinationServiceServer::new(sink_coordination_srv))
.add_service(EventLogServiceServer::new(event_log_srv))
.monitored_serve_with_shutdown(
address_info.listen_addr,
"grpc-meta-leader-service",
Expand Down
39 changes: 39 additions & 0 deletions src/meta/service/src/event_log_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_meta::manager::event_log::EventLogMangerRef;
use risingwave_pb::meta::event_log_service_server::EventLogService;
use risingwave_pb::meta::{ListEventLogRequest, ListEventLogResponse};
use tonic::{Request, Response, Status};

pub struct EventLogServiceImpl {
event_log_manager: EventLogMangerRef,
}

impl EventLogServiceImpl {
pub fn new(event_log_manager: EventLogMangerRef) -> Self {
Self { event_log_manager }
}
}

#[async_trait::async_trait]
impl EventLogService for EventLogServiceImpl {
async fn list_event_log(
&self,
_request: Request<ListEventLogRequest>,
) -> Result<Response<ListEventLogResponse>, Status> {
let event_logs = self.event_log_manager.list_event_logs();
Ok(Response::new(ListEventLogResponse { event_logs }))
}
}
1 change: 1 addition & 0 deletions src/meta/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod backup_service;
pub mod cloud_service;
pub mod cluster_service;
pub mod ddl_service;
pub mod event_log_service;
pub mod health_service;
pub mod heartbeat_service;
pub mod hummock_service;
Expand Down
Loading

0 comments on commit 38a78d8

Please sign in to comment.