Skip to content

Commit

Permalink
feat: add actor info to diagnose info & dump dianose on failure on ci
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Dec 12, 2024
1 parent 9f929d9 commit 2dd84d3
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 4 deletions.
13 changes: 13 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,19 @@ script = '''
watch -n 1 "${TMUX} list-windows -t risedev | grep -v active | cut -d'(' -f1"
'''

[tasks.diagnose]
category = "Misc"
description = "Dump diagnose info"
dependencies = ["check-and-load-risedev-env-file"]
script = '''
#!/usr/bin/env bash
set -ex
file_name=${PREFIX_LOG}/diagnose-$(date +%Y-%m-%d-%H-%M-%S).txt
curl ${RISEDEV_RW_META_DASHBOARD_ADDR}/api/monitor/diagnose/ > ${file_name}
echo "Diagnose info has been dumped to ${file_name}"
'''


[tasks.del]
alias = "delete"

Expand Down
3 changes: 2 additions & 1 deletion ci/plugins/upload-failure-logs/hooks/post-command
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ set -euo pipefail

if [ $BUILDKITE_COMMAND_EXIT_STATUS -ne 0 ]; then
mv .risingwave/log risedev-logs
risedev diagnose || true
zip -q -r risedev-logs.zip risedev-logs/
buildkite-agent artifact upload "risedev-logs/*"
buildkite-agent artifact upload risedev-logs.zip
Expand All @@ -17,4 +18,4 @@ if [ $BUILDKITE_COMMAND_EXIT_STATUS -ne 0 ]; then
if [ -e "$PWD/connector-node.log" ]; then
buildkite-agent artifact upload "$PWD/connector-node.log"
fi
fi
fi
1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-$
sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt'

echo "--- Kill cluster"
exit 88
cluster_stop

echo "--- e2e, $mode, batch"
Expand Down
22 changes: 21 additions & 1 deletion src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use risingwave_meta_model::prelude::{Actor, Fragment, Sink, StreamingJob};
use risingwave_meta_model::{
actor, actor_dispatcher, fragment, object, sink, source, streaming_job, table, ActorId,
ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus,
ObjectId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId,
ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap,
WorkerId,
};
use risingwave_meta_model_migration::{Alias, SelectStatement};
use risingwave_pb::common::PbActorLocation;
Expand Down Expand Up @@ -882,6 +883,25 @@ impl CatalogController {
Ok(actor_locations)
}

pub async fn list_actor_info(
&self,
) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
let inner = self.inner.read().await;
let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
Actor::find()
.join(JoinType::LeftJoin, actor::Relation::Fragment.def())
.join(JoinType::LeftJoin, fragment::Relation::Object.def())
.select_only()
.columns([actor::Column::ActorId, actor::Column::FragmentId])
.column_as(object::Column::Oid, "job_id")
.column_as(object::Column::SchemaId, "schema_id")
.column_as(object::Column::ObjType, "type")
.into_tuple()
.all(&inner.db)
.await?;
Ok(actor_locations)
}

pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
let inner = self.inner.read().await;

Expand Down
56 changes: 54 additions & 2 deletions src/meta/src/manager/diagnose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::cmp::{Ordering, Reverse};
use std::collections::{BTreeMap, BinaryHeap};
use std::collections::{BTreeMap, BinaryHeap, HashMap};
use std::fmt::Write;
use std::sync::Arc;

Expand Down Expand Up @@ -171,7 +171,8 @@ impl DiagnoseCommand {
&mut row,
worker_node.get_state().ok().map(|s| s.as_str_name()),
);
row.add_cell(worker_node.parallelism().into());
// FIXME:
// row.add_cell(worker_node.parallelism().into());
try_add_cell(
&mut row,
worker_node.property.as_ref().map(|p| p.is_streaming),
Expand Down Expand Up @@ -677,6 +678,7 @@ impl DiagnoseCommand {
("INDEX", indexes),
("SINK", sinks),
];
let mut obj_id_to_name = HashMap::new();
for (title, items) in catalogs {
use comfy_table::{Row, Table};
let mut table = Table::new();
Expand All @@ -689,6 +691,7 @@ impl DiagnoseCommand {
row
});
for (id, (name, schema_id, definition)) in items {
obj_id_to_name.insert(id, name.clone());
let mut row = Row::new();
let may_redact =
redact_all_sql_options(&definition).unwrap_or_else(|| "[REDACTED]".into());
Expand All @@ -702,6 +705,55 @@ impl DiagnoseCommand {
let _ = writeln!(s, "{title}");
let _ = writeln!(s, "{table}");
}

let actors = self
.metadata_manager
.catalog_controller
.list_actor_info()
.await?
.into_iter()
.map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
(
actor_id,
(
fragment_id,
job_id,
schema_id,
obj_type,
obj_id_to_name
.get(&(job_id as _))
.cloned()
.unwrap_or_default(),
),
)
})
.collect::<BTreeMap<_, _>>();

use comfy_table::{Row, Table};
let mut table = Table::new();
table.set_header({
let mut row = Row::new();
row.add_cell("id".into());
row.add_cell("fragment_id".into());
row.add_cell("job_id".into());
row.add_cell("schema_id".into());
row.add_cell("type".into());
row.add_cell("name".into());
row
});
for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
let mut row = Row::new();
row.add_cell(actor_id.into());
row.add_cell(fragment_id.into());
row.add_cell(job_id.into());
row.add_cell(schema_id.into());
row.add_cell(ddl_type.as_str().into());
row.add_cell(name.into());
table.add_row(row);
}
let _ = writeln!(s);
let _ = writeln!(s, "ACTOR");
let _ = writeln!(s, "{table}");
Ok(())
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/risedevtool/src/risedev_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ pub fn generate_risedev_env(services: &Vec<ServiceConfig>) -> String {
)
.unwrap();
}
ServiceConfig::MetaNode(meta_node_config) => {
writeln!(
env,
r#"RISEDEV_RW_META_DASHBOARD_ADDR="http://{}:{}""#,
meta_node_config.address, meta_node_config.dashboard_port
)
.unwrap();
}
_ => {}
}
}
Expand Down

0 comments on commit 2dd84d3

Please sign in to comment.