Skip to content

Commit

Permalink
feat(sql-backend): add missing failure event records in sql backend (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Feb 27, 2024
1 parent d2ef79f commit 8cc30b1
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 3 deletions.
98 changes: 95 additions & 3 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_meta_model_v2::{
actor, connection, database, fragment, function, index, object, object_dependency, schema,
sink, source, streaming_job, table, user_privilege, view, ActorId, ActorUpstreamActors,
ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array,
IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SourceId, StreamSourceInfo,
IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SinkId, SourceId, StreamSourceInfo,
StreamingParallelism, TableId, UserId,
};
use risingwave_pb::catalog::table::PbTableType;
Expand Down Expand Up @@ -428,9 +428,11 @@ impl CatalogController {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

let creating_job_ids: Vec<ObjectId> = streaming_job::Entity::find()
let creating_jobs: Vec<(ObjectId, ObjectType)> = streaming_job::Entity::find()
.select_only()
.column(streaming_job::Column::JobId)
.column(object::Column::ObjType)
.join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
.filter(
streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
streaming_job::Column::JobStatus
Expand All @@ -444,14 +446,102 @@ impl CatalogController {

let changed = Self::clean_dirty_sink_downstreams(&txn).await?;

if creating_job_ids.is_empty() {
if creating_jobs.is_empty() {
if changed {
txn.commit().await?;
}

return Ok(ReleaseContext::default());
}

// Record cleaned streaming jobs in event logs.
let mut creating_table_ids = vec![];
let mut creating_source_ids = vec![];
let mut creating_sink_ids = vec![];
let mut creating_job_ids = vec![];
for (job_id, job_type) in creating_jobs {
creating_job_ids.push(job_id);
match job_type {
ObjectType::Table | ObjectType::Index => creating_table_ids.push(job_id),
ObjectType::Source => creating_source_ids.push(job_id),
ObjectType::Sink => creating_sink_ids.push(job_id),
_ => unreachable!("unexpected streaming job type"),
}
}
let mut event_logs = vec![];
if !creating_table_ids.is_empty() {
let table_info: Vec<(TableId, String, String)> = Table::find()
.select_only()
.columns([
table::Column::TableId,
table::Column::Name,
table::Column::Definition,
])
.filter(table::Column::TableId.is_in(creating_table_ids))
.into_tuple()
.all(&txn)
.await?;
for (table_id, name, definition) in table_info {
let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
id: table_id as _,
name,
definition,
error: "clear during recovery".to_string(),
};
event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
event,
));
}
}
if !creating_source_ids.is_empty() {
let source_info: Vec<(SourceId, String, String)> = Source::find()
.select_only()
.columns([
source::Column::SourceId,
source::Column::Name,
source::Column::Definition,
])
.filter(source::Column::SourceId.is_in(creating_source_ids))
.into_tuple()
.all(&txn)
.await?;
for (source_id, name, definition) in source_info {
let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
id: source_id as _,
name,
definition,
error: "clear during recovery".to_string(),
};
event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
event,
));
}
}
if !creating_sink_ids.is_empty() {
let sink_info: Vec<(SinkId, String, String)> = Sink::find()
.select_only()
.columns([
sink::Column::SinkId,
sink::Column::Name,
sink::Column::Definition,
])
.filter(sink::Column::SinkId.is_in(creating_sink_ids))
.into_tuple()
.all(&txn)
.await?;
for (sink_id, name, definition) in sink_info {
let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
id: sink_id as _,
name,
definition,
error: "clear during recovery".to_string(),
};
event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
event,
));
}
}

let associated_source_ids: Vec<SourceId> = Table::find()
.select_only()
.column(table::Column::OptionalAssociatedSourceId)
Expand Down Expand Up @@ -491,6 +581,8 @@ impl CatalogController {

txn.commit().await?;

self.env.event_log_manager_ref().add_event_logs(event_logs);

Ok(ReleaseContext {
state_table_ids,
source_ids: associated_source_ids,
Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ impl DdlController {
Ok(version) => Ok(version),
Err(err) => {
tracing::error!(id = job_id, error = ?err.as_report(), "failed to create streaming job");
let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
id: streaming_job.id(),
name: streaming_job.name(),
definition: streaming_job.definition(),
error: err.as_report().to_string(),
};
self.env.event_log_manager_ref().add_event_logs(vec![
risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
]);
let aborted = mgr
.catalog_controller
.try_abort_creating_streaming_job(job_id as _, false)
Expand Down

0 comments on commit 8cc30b1

Please sign in to comment.