Skip to content

Commit

Permalink
Merge branch 'main' into yiming/no-read-version-in-reset
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 30, 2024
2 parents faf02f5 + e05015a commit 1a7dc2b
Show file tree
Hide file tree
Showing 51 changed files with 2,789 additions and 645 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions e2e_test/sink/sink_into_table/rename.slt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ALTER SINK s RENAME TO s1;
query TT
SELECT name, definition from rw_sinks;
----
s1 CREATE SINK s1 INTO target AS SELECT * FROM source WITH (type = 'append-only')
s1 CREATE SINK s1 INTO target AS SELECT * FROM source WITH (type = 'append-only')

statement ok
ALTER TABLE source RENAME TO src;
Expand All @@ -36,7 +36,7 @@ tar CREATE TABLE tar (v INT)
query TT
SELECT name, definition from rw_sinks;
----
s1 CREATE SINK s1 INTO target AS SELECT * FROM src AS source WITH (type = 'append-only')
s1 CREATE SINK s1 INTO tar AS SELECT * FROM src AS source WITH (type = 'append-only')

statement ok
drop sink s1;
Expand Down
1 change: 1 addition & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ message AlterSetSchemaResponse {
message AlterParallelismRequest {
uint32 table_id = 1;
meta.TableParallelism parallelism = 2;
bool deferred = 3;
}

message AlterParallelismResponse {}
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/rpc/service/config_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tonic::{Code, Request, Response, Status};

pub struct ConfigServiceImpl {
batch_mgr: Arc<BatchManager>,
stream_mgr: Arc<LocalStreamManager>,
stream_mgr: LocalStreamManager,
}

#[async_trait::async_trait]
Expand All @@ -46,7 +46,7 @@ impl ConfigService for ConfigServiceImpl {
}

impl ConfigServiceImpl {
pub fn new(batch_mgr: Arc<BatchManager>, stream_mgr: Arc<LocalStreamManager>) -> Self {
pub fn new(batch_mgr: Arc<BatchManager>, stream_mgr: LocalStreamManager) -> Self {
Self {
batch_mgr,
stream_mgr,
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const BATCH_EXCHANGE_BUFFER_SIZE: usize = 1024;
#[derive(Clone)]
pub struct ExchangeServiceImpl {
batch_mgr: Arc<BatchManager>,
stream_mgr: Arc<LocalStreamManager>,
stream_mgr: LocalStreamManager,
metrics: Arc<ExchangeServiceMetrics>,
}

Expand Down Expand Up @@ -128,7 +128,7 @@ impl ExchangeService for ExchangeServiceImpl {
impl ExchangeServiceImpl {
pub fn new(
mgr: Arc<BatchManager>,
stream_mgr: Arc<LocalStreamManager>,
stream_mgr: LocalStreamManager,
metrics: Arc<ExchangeServiceMetrics>,
) -> Self {
ExchangeServiceImpl {
Expand Down
5 changes: 2 additions & 3 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::ffi::CString;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use itertools::Itertools;
Expand All @@ -37,14 +36,14 @@ use tonic::{Code, Request, Response, Status};

#[derive(Clone)]
pub struct MonitorServiceImpl {
stream_mgr: Arc<LocalStreamManager>,
stream_mgr: LocalStreamManager,
grpc_await_tree_reg: Option<AwaitTreeRegistryRef>,
server_config: ServerConfig,
}

impl MonitorServiceImpl {
pub fn new(
stream_mgr: Arc<LocalStreamManager>,
stream_mgr: LocalStreamManager,
grpc_await_tree_reg: Option<AwaitTreeRegistryRef>,
server_config: ServerConfig,
) -> Self {
Expand Down
42 changes: 7 additions & 35 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use await_tree::InstrumentAwait;
use itertools::Itertools;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
Expand All @@ -26,16 +24,16 @@ use risingwave_stream::error::StreamError;
use risingwave_stream::executor::Barrier;
use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment};
use thiserror_ext::AsReport;
use tonic::{Code, Request, Response, Status};
use tonic::{Request, Response, Status};

#[derive(Clone)]
pub struct StreamServiceImpl {
mgr: Arc<LocalStreamManager>,
mgr: LocalStreamManager,
env: StreamEnvironment,
}

impl StreamServiceImpl {
pub fn new(mgr: Arc<LocalStreamManager>, env: StreamEnvironment) -> Self {
pub fn new(mgr: LocalStreamManager, env: StreamEnvironment) -> Self {
StreamServiceImpl { mgr, env }
}
}
Expand All @@ -48,7 +46,7 @@ impl StreamService for StreamServiceImpl {
request: Request<UpdateActorsRequest>,
) -> std::result::Result<Response<UpdateActorsResponse>, Status> {
let req = request.into_inner();
let res = self.mgr.update_actors(&req.actors);
let res = self.mgr.update_actors(req.actors).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to update stream actor");
Expand All @@ -66,10 +64,7 @@ impl StreamService for StreamServiceImpl {
let req = request.into_inner();

let actor_id = req.actor_id;
let res = self
.mgr
.build_actors(actor_id.as_slice(), self.env.clone())
.await;
let res = self.mgr.build_actors(actor_id).await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to build actors");
Expand Down Expand Up @@ -108,7 +103,7 @@ impl StreamService for StreamServiceImpl {
) -> std::result::Result<Response<DropActorsResponse>, Status> {
let req = request.into_inner();
let actors = req.actor_ids;
self.mgr.drop_actors(&actors)?;
self.mgr.drop_actors(actors).await?;
Ok(Response::new(DropActorsResponse {
request_id: req.request_id,
status: None,
Expand All @@ -121,8 +116,7 @@ impl StreamService for StreamServiceImpl {
request: Request<ForceStopActorsRequest>,
) -> std::result::Result<Response<ForceStopActorsResponse>, Status> {
let req = request.into_inner();
self.mgr.stop_all_actors().await?;
self.env.dml_manager_ref().clear();
self.mgr.reset().await;
Ok(Response::new(ForceStopActorsResponse {
request_id: req.request_id,
status: None,
Expand All @@ -138,28 +132,6 @@ impl StreamService for StreamServiceImpl {
let barrier =
Barrier::from_protobuf(req.get_barrier().unwrap()).map_err(StreamError::from)?;

// The barrier might be outdated and been injected after recovery in some certain extreme
// scenarios. So some newly creating actors in the barrier are possibly not rebuilt during
// recovery. Check it here and return an error here if some actors are not found to
// avoid collection hang. We need some refine in meta side to remove this workaround since
// it will cause another round of unnecessary recovery.
let actor_ids = self.mgr.all_actor_ids();
let missing_actor_ids = req
.actor_ids_to_collect
.iter()
.filter(|id| !actor_ids.contains(id))
.collect_vec();
if !missing_actor_ids.is_empty() {
tracing::warn!(
"to collect actors not found, they should be cleaned when recovering: {:?}",
missing_actor_ids
);
return Err(Status::new(
Code::InvalidArgument,
"to collect actors not found",
));
}

self.mgr
.send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect)
.await?;
Expand Down
21 changes: 10 additions & 11 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,6 @@ pub async fn compute_node_serve(
// Run a background heap profiler
heap_profiler.start();

let stream_mgr = Arc::new(LocalStreamManager::new(
advertise_addr.clone(),
state_store.clone(),
streaming_metrics.clone(),
config.streaming.clone(),
await_tree_config.clone(),
memory_mgr.get_watermark_epoch(),
));

let grpc_await_tree_reg = await_tree_config
.map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into()));
let dml_mgr = Arc::new(DmlManager::new(
worker_id,
config.streaming.developer.dml_channel_initial_permits,
Expand Down Expand Up @@ -363,6 +352,16 @@ pub async fn compute_node_serve(
meta_client.clone(),
);

let stream_mgr = LocalStreamManager::new(
stream_env.clone(),
streaming_metrics.clone(),
await_tree_config.clone(),
memory_mgr.get_watermark_epoch(),
);

let grpc_await_tree_reg = await_tree_config
.map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into()));

// Generally, one may use `risedev ctl trace` to manually get the trace reports. However, if
// this is not the case, we can use the following command to get it printed into the logs
// periodically.
Expand Down
11 changes: 8 additions & 3 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ pub trait CatalogWriter: Send + Sync {

async fn alter_source_with_sr(&self, source: PbSource) -> Result<()>;

async fn alter_parallelism(&self, table_id: u32, parallelism: PbTableParallelism)
-> Result<()>;
async fn alter_parallelism(
&self,
table_id: u32,
parallelism: PbTableParallelism,
deferred: bool,
) -> Result<()>;

async fn alter_set_schema(
&self,
Expand Down Expand Up @@ -506,9 +510,10 @@ impl CatalogWriter for CatalogWriterImpl {
&self,
table_id: u32,
parallelism: PbTableParallelism,
deferred: bool,
) -> Result<()> {
self.meta_client
.alter_parallelism(table_id, parallelism)
.alter_parallelism(table_id, parallelism, deferred)
.await
.map_err(|e| anyhow!(e))?;

Expand Down
11 changes: 9 additions & 2 deletions src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub async fn handle_alter_parallelism(
obj_name: ObjectName,
parallelism: SetVariableValue,
stmt_type: StatementType,
deferred: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
Expand Down Expand Up @@ -93,10 +94,16 @@ pub async fn handle_alter_parallelism(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_parallelism(table_id, target_parallelism)
.alter_parallelism(table_id, target_parallelism, deferred)
.await?;

Ok(RwPgResponse::empty_result(stmt_type))
let mut builder = RwPgResponse::builder(stmt_type);

if deferred {
builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_string());
}

Ok(builder.into())
}

fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
Expand Down
28 changes: 24 additions & 4 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,18 @@ pub async fn handle(
}
Statement::AlterTable {
name,
operation: AlterTableOperation::SetParallelism { parallelism },
operation:
AlterTableOperation::SetParallelism {
parallelism,
deferred,
},
} => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_TABLE,
deferred,
)
.await
}
Expand All @@ -557,13 +562,18 @@ pub async fn handle(
} => alter_rename::handle_rename_index(handler_args, name, index_name).await,
Statement::AlterIndex {
name,
operation: AlterIndexOperation::SetParallelism { parallelism },
operation:
AlterIndexOperation::SetParallelism {
parallelism,
deferred,
},
} => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_INDEX,
deferred,
)
.await
}
Expand All @@ -587,13 +597,18 @@ pub async fn handle(
Statement::AlterView {
materialized,
name,
operation: AlterViewOperation::SetParallelism { parallelism },
operation:
AlterViewOperation::SetParallelism {
parallelism,
deferred,
},
} if materialized => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_MATERIALIZED_VIEW,
deferred,
)
.await
}
Expand Down Expand Up @@ -677,13 +692,18 @@ pub async fn handle(
}
Statement::AlterSink {
name,
operation: AlterSinkOperation::SetParallelism { parallelism },
operation:
AlterSinkOperation::SetParallelism {
parallelism,
deferred,
},
} => {
alter_parallelism::handle_alter_parallelism(
handler_args,
name,
parallelism,
StatementType::ALTER_SINK,
deferred,
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ impl CatalogWriter for MockCatalogWriter {
&self,
_table_id: u32,
_parallelism: PbTableParallelism,
_deferred: bool,
) -> Result<()> {
todo!()
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,9 +841,10 @@ impl DdlService for DdlServiceImpl {

let table_id = req.get_table_id();
let parallelism = req.get_parallelism()?.clone();
let deferred = req.get_deferred();

self.ddl_controller
.alter_parallelism(table_id, parallelism)
.alter_parallelism(table_id, parallelism, deferred)
.await?;

Ok(Response::new(AlterParallelismResponse {}))
Expand Down
Loading

0 comments on commit 1a7dc2b

Please sign in to comment.