Skip to content

Commit

Permalink
fix meta (excl. storage)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 19, 2024
1 parent e66c5e2 commit 52c9b79
Show file tree
Hide file tree
Showing 22 changed files with 101 additions and 74 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ sea-orm = { version = "0.12.0", features = [
"macros",
] }
sync-point = { path = "../../utils/sync-point" }
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
Expand Down
11 changes: 6 additions & 5 deletions src/meta/service/src/cloud_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use risingwave_connector::source::kafka::private_link::insert_privatelink_broker
use risingwave_connector::source::{
ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator,
};
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::manager::{MetadataManager, ConnectionId};
use risingwave_pb::catalog::connection::Info::PrivateLinkService;
use risingwave_pb::cloud_service::cloud_service_server::CloudService;
use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType};
use risingwave_pb::cloud_service::{
RwCloudValidateSourceRequest, RwCloudValidateSourceResponse, SourceType,
};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};

use crate::rpc::cloud_provider::AwsEc2Client;
Expand Down Expand Up @@ -77,8 +78,8 @@ impl CloudService for CloudServiceImpl {
// if connection_id provided, check whether endpoint service is available and resolve
// broker rewrite map currently only support aws privatelink connection
if let Some(connection_id_str) = source_cfg.get("connection.id") {
let connection_id = connection_id_str.parse().map_err(|e| {
Status::invalid_argument(format!("connection.id is not an integer: {}", e))
let connection_id = connection_id_str.parse::<ConnectionId>().map_err(|e| {
Status::invalid_argument(format!("connection.id is not an integer: {}", e.as_report()))
})?;

let connection = match &self.metadata_manager {
Expand All @@ -97,7 +98,7 @@ impl CloudService for CloudServiceImpl {
if let Err(e) = connection {
return Ok(new_rwc_validate_fail_response(
ErrorType::PrivatelinkConnectionNotFound,
e.to_string(),
e.to_report_string(),
));
}
if let Some(PrivateLinkService(service)) = connection.unwrap().info {
Expand All @@ -115,7 +116,7 @@ impl CloudService for CloudServiceImpl {
Err(e) => {
return Ok(new_rwc_validate_fail_response(
ErrorType::PrivatelinkUnavailable,
e.to_string(),
e.to_report_string(),
));
}
Ok(false) => {
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/cluster_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_pb::meta::{
ListAllNodesResponse, UpdateWorkerNodeSchedulabilityRequest,
UpdateWorkerNodeSchedulabilityResponse,
};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};

use crate::MetaError;
Expand Down Expand Up @@ -64,7 +65,7 @@ impl ClusterService for ClusterServiceImpl {
return Ok(Response::new(AddWorkerNodeResponse {
status: Some(risingwave_pb::common::Status {
code: risingwave_pb::common::status::Code::UnknownWorker as i32,
message: format!("{}", e),
message: e.to_report_string(),
}),
node_id: None,
}));
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/heartbeat_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use itertools::Itertools;
use risingwave_meta::manager::MetadataManager;
use risingwave_pb::meta::heartbeat_service_server::HeartbeatService;
use risingwave_pb::meta::{HeartbeatRequest, HeartbeatResponse};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};

#[derive(Clone)]
Expand Down Expand Up @@ -58,7 +59,7 @@ impl HeartbeatService for HeartbeatServiceImpl {
return Ok(Response::new(HeartbeatResponse {
status: Some(risingwave_pb::common::Status {
code: risingwave_pb::common::status::Code::UnknownWorker as i32,
message: format!("{}", e),
message: e.to_report_string(),
}),
}));
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
use risingwave_pb::hummock::*;
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status, Streaming};

use crate::hummock::compaction::selector::ManualCompactionOption;
Expand Down Expand Up @@ -308,7 +309,7 @@ impl HummockManagerService for HummockServiceImpl {
tracing::info!("Full GC results {} SSTs to delete", number);
}
Err(e) => {
tracing::warn!("Full GC SST failed: {:#?}", e);
tracing::warn!(error = %e.as_report(), "Full GC SST failed");
}
}
});
Expand Down
11 changes: 6 additions & 5 deletions src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use thiserror_ext::AsReport;
use tokio::task::JoinHandle;

use crate::backup_restore::meta_snapshot_builder;
Expand Down Expand Up @@ -134,10 +135,10 @@ impl BackupManager {
if let Err(e) = self.set_store(new_config.clone()).await {
// Retry is driven by periodic system params notification.
tracing::warn!(
"failed to apply new backup config: url={}, dir={}, {:#?}",
new_config.0,
new_config.1,
e
url = &new_config.0,
dir = &new_config.1,
error = %e.as_report(),
"failed to apply new backup config",
);
}
}
Expand Down Expand Up @@ -269,7 +270,7 @@ impl BackupManager {
}
BackupJobResult::Failed(e) => {
self.metrics.job_latency_failure.observe(job_latency);
let message = format!("failed backup job {}: {}", job_id, e);
let message = format!("failed backup job {}: {}", job_id, e.as_report());
tracing::warn!(message);
self.latest_job_info
.store(Arc::new((job_id, BackupJobStatus::Failed, message)));
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_hummock_sdk::version_checkpoint_path;
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_pb::hummock::PbHummockVersionCheckpoint;
use thiserror_ext::AsReport;

use crate::backup_restore::restore_impl::v1::{LoaderV1, WriterModelV1ToMetaStoreV1};
use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
Expand Down Expand Up @@ -193,7 +194,7 @@ pub async fn restore(opts: RestoreOpts) -> BackupResult<()> {
tracing::info!("command succeeded");
}
Err(e) => {
tracing::warn!("command failed: {}", e);
tracing::warn!(error = %e.as_report(), "command failed");
}
}
result
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/backup_restore/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use etcd_client::ConnectOptions;
use risingwave_backup::error::BackupResult;
use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage};
Expand Down Expand Up @@ -74,7 +75,8 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult<MetaStoreBackendI
}
let client = EtcdClient::connect(endpoints, Some(options), credentials.is_some())
.await
.map_err(|e| anyhow::anyhow!("failed to connect etcd {}", e))?;
.context("failed to connect etcd")
?;
Ok(MetaStoreBackendImpl::Etcd(EtcdMetaStore::new(client)))
}
MetaStoreBackend::Mem => Ok(MetaStoreBackendImpl::Mem(MemStore::new())),
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
TransactionTrait,
};
use thiserror_ext::AsReport;
use tokio::sync::oneshot::Sender;
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -268,7 +269,7 @@ impl ClusterController {
{
Ok(keys) => keys,
Err(err) => {
tracing::warn!("Failed to load expire worker info from db: {}", err);
tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
continue;
}
};
Expand All @@ -278,7 +279,7 @@ impl ClusterController {
.exec(&inner.db)
.await
{
tracing::warn!("Failed to delete expire workers from db: {}", err);
tracing::warn!(error = %err.as_report(), "Failed to delete expire workers from db");
continue;
}

Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::heartbeat_request;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
use thiserror_ext::AsReport;
use tokio::sync::oneshot::Sender;
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -400,11 +401,11 @@ impl ClusterManager {
}
Err(err) => {
tracing::warn!(
"Failed to delete expired worker {} {:#?}, current timestamp {}. {:?}",
error = %err.as_report(),
"Failed to delete expired worker {} {:#?}, current timestamp {}",
worker_id,
key,
now,
err,
);
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/manager/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use risingwave_common::catalog::{NON_RESERVED_SYS_CATALOG_ID, NON_RESERVED_USER_ID};
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use thiserror_ext::AsReport;
use tokio::sync::RwLock;

use crate::manager::cluster::META_NODE_ID;
Expand Down Expand Up @@ -57,7 +58,7 @@ impl StoredIdGenerator {
let current_id = match res {
Ok(value) => memcomparable::from_slice(&value).unwrap(),
Err(MetaStoreError::ItemNotFound(_)) => start.unwrap_or(0),
Err(e) => panic!("{:?}", e),
Err(e) => panic!("{}", e.as_report()),
};

let next_allocate_id = current_id + ID_PREALLOCATE_INTERVAL;
Expand All @@ -69,7 +70,7 @@ impl StoredIdGenerator {
)
.await
{
panic!("{:?}", err)
panic!("{}", err.as_report());
}

StoredIdGenerator {
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/manager/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{
MetaSnapshot, Relation, RelationGroup, SubscribeResponse, SubscribeType,
};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::sync::Mutex;
use tonic::Status;
Expand Down Expand Up @@ -265,7 +266,7 @@ impl NotificationManager {
let mut core_guard = self.core.lock().await;
core_guard.local_senders.retain(|sender| {
if let Err(err) = sender.send(notification.clone()) {
tracing::warn!("Failed to notify local subscriber. {}", err);
tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
return false;
}
true
Expand Down
35 changes: 17 additions & 18 deletions src/meta/src/manager/sink_coordination/coordinator_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_pb::connector_service::coordinate_response::{
use risingwave_pb::connector_service::{
coordinate_request, coordinate_response, CoordinateRequest, CoordinateResponse, SinkMetadata,
};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::UnboundedReceiver;
use tonic::Status;
use tracing::{error, warn};
Expand Down Expand Up @@ -62,8 +63,9 @@ impl CoordinatorWorker {
Ok(sink) => sink,
Err(e) => {
error!(
"unable to build sink with param {:?}: {:?}",
first_writer_request.param, e
error = %e.as_report(),
"unable to build sink with param {:?}",
first_writer_request.param
);
send_await_with_err_check!(
first_writer_request.response_tx,
Expand All @@ -77,8 +79,9 @@ impl CoordinatorWorker {
Ok(coordinator) => coordinator,
Err(e) => {
error!(
"unable to build coordinator with param {:?}: {:?}",
first_writer_request.param, e
error = %e.as_report(),
"unable to build coordinator with param {:?}",
first_writer_request.param
);
send_await_with_err_check!(
first_writer_request.response_tx,
Expand Down Expand Up @@ -149,10 +152,9 @@ impl CoordinatorWorker {
Either::Right((Some(Ok(None)), _)) => Err(anyhow!(
"one sink writer stream reaches the end before initialize"
)),
Either::Right((Some(Err(e)), _)) => Err(anyhow!(
"unable to poll from one sink writer stream: {:?}",
e
)),
Either::Right((Some(Err(e)), _)) => {
Err(anyhow!(e).context("unable to poll from one sink writer stream"))
}
Either::Right((None, _)) => unreachable!("request_streams must not be empty"),
}
}
Expand Down Expand Up @@ -265,10 +267,8 @@ impl CoordinatorWorker {
));
}
Err(e) => {
return Err(anyhow!(
"failed to poll from one of the writer request streams: {:?}",
e
));
return Err(anyhow!(e)
.context("failed to poll from one of the writer request streams"));
}
},
Either::Right((None, _)) => {
Expand All @@ -285,17 +285,16 @@ impl CoordinatorWorker {
async fn start_coordination(&mut self, mut coordinator: impl SinkCommitCoordinator) {
let result: Result<(), ()> = try {
coordinator.init().await.map_err(|e| {
error!("failed to initialize coordinator: {:?}", e);
error!(error = %e.as_report(), "failed to initialize coordinator");
})?;
loop {
let (epoch, metadata_list) = self.collect_all_metadata().await.map_err(|e| {
error!("failed to collect all metadata: {:?}", e);
error!(error = %e.as_report(), "failed to collect all metadata");
})?;
// TODO: measure commit time
coordinator
.commit(epoch, metadata_list)
.await
.map_err(|e| error!("failed to commit metadata of epoch {}: {:?}", epoch, e))?;
coordinator.commit(epoch, metadata_list).await.map_err(
|e| error!(epoch, error = %e.as_report(), "failed to commit metadata of epoch"),
)?;

self.send_to_all_sink_writers(|| {
Ok(CoordinateResponse {
Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/manager/sink_coordination/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_connector::sink::SinkParam;
use risingwave_pb::connector_service::coordinate_request::Msg;
use risingwave_pb::connector_service::{coordinate_request, CoordinateRequest, CoordinateResponse};
use rw_futures_util::pending_on_none;
use thiserror_ext::AsReport;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::{channel, Receiver, Sender};
Expand Down Expand Up @@ -292,14 +293,15 @@ impl ManagerWorker {
match join_result {
Ok(()) => {
info!(
"sink coordinator of {} has gracefully finished",
sink_id.sink_id
id = sink_id.sink_id,
"sink coordinator has gracefully finished",
);
}
Err(err) => {
error!(
"sink coordinator of {} finished with error {:?}",
sink_id.sink_id, err
id = sink_id.sink_id,
error = %err.as_report(),
"sink coordinator finished with error",
);
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/model/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use thiserror_ext::AsReport;

use crate::storage::{MetaStore, MetaStoreError, DEFAULT_COLUMN_FAMILY};

/// `NotificationVersion` records the last sent notification version, this will be stored
Expand All @@ -31,7 +33,7 @@ impl NotificationVersion {
{
Ok(byte_vec) => memcomparable::from_slice(&byte_vec).unwrap(),
Err(MetaStoreError::ItemNotFound(_)) => 0,
Err(e) => panic!("{:?}", e),
Err(e) => panic!("{}", e.as_report()),
};
Self(version)
}
Expand Down
Loading

0 comments on commit 52c9b79

Please sign in to comment.