Skip to content

Commit

Permalink
chore(stream): rename StreamEnvironment to StreamContext (#13863)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Dec 10, 2023
1 parent 19f4254 commit 7b6bbf3
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 56 deletions.
4 changes: 2 additions & 2 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ message TableFragments {
map<uint32, ActorStatus> actor_status = 4;
map<uint32, source.ConnectorSplits> actor_splits = 5;

stream_plan.StreamEnvironment env = 6;
stream_plan.StreamContext ctx = 6;
}

/// Parallel unit mapping with fragment id, used for notification.
Expand Down Expand Up @@ -196,7 +196,7 @@ message ListTableFragmentsResponse {
}
message TableFragmentInfo {
repeated FragmentInfo fragments = 1;
stream_plan.StreamEnvironment env = 2;
stream_plan.StreamContext ctx = 2;
}
map<uint32, TableFragmentInfo> table_fragments = 1;
}
Expand Down
6 changes: 3 additions & 3 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -840,8 +840,8 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
}

// The environment associated with a stream plan
message StreamEnvironment {
// The streaming context associated with a stream plan
message StreamContext {
// The timezone associated with the streaming plan. Only applies to MV for now.
string timezone = 1;
}
Expand Down Expand Up @@ -889,7 +889,7 @@ message StreamFragmentGraph {

repeated uint32 dependent_table_ids = 3;
uint32 table_ids_cnt = 4;
StreamEnvironment env = 5;
StreamContext ctx = 5;
// If none, default parallelism will be applied.
Parallelism parallelism = 6;
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl SysCatalogReaderImpl {
Some(ScalarImpl::Utf8("MATERIALIZED VIEW".into())),
Some(ScalarImpl::Int32(t.id.table_id as i32)),
Some(ScalarImpl::Utf8(
fragments.get_env().unwrap().get_timezone().clone().into(),
fragments.get_ctx().unwrap().get_timezone().clone().into(),
)),
Some(ScalarImpl::Utf8(
json!(fragments.get_fragments()).to_string().into(),
Expand All @@ -107,7 +107,7 @@ impl SysCatalogReaderImpl {
Some(ScalarImpl::Utf8("TABLE".into())),
Some(ScalarImpl::Int32(t.id.table_id as i32)),
Some(ScalarImpl::Utf8(
fragments.get_env().unwrap().get_timezone().clone().into(),
fragments.get_ctx().unwrap().get_timezone().clone().into(),
)),
Some(ScalarImpl::Utf8(
json!(fragments.get_fragments()).to_string().into(),
Expand All @@ -128,7 +128,7 @@ impl SysCatalogReaderImpl {
Some(ScalarImpl::Utf8("SINK".into())),
Some(ScalarImpl::Int32(t.id.sink_id as i32)),
Some(ScalarImpl::Utf8(
fragments.get_env().unwrap().get_timezone().clone().into(),
fragments.get_ctx().unwrap().get_timezone().clone().into(),
)),
Some(ScalarImpl::Utf8(
json!(fragments.get_fragments()).to_string().into(),
Expand All @@ -149,7 +149,7 @@ impl SysCatalogReaderImpl {
Some(ScalarImpl::Utf8("INDEX".into())),
Some(ScalarImpl::Int32(t.index_table.id.table_id as i32)),
Some(ScalarImpl::Utf8(
fragments.get_env().unwrap().get_timezone().clone().into(),
fragments.get_ctx().unwrap().get_timezone().clone().into(),
)),
Some(ScalarImpl::Utf8(
json!(fragments.get_fragments()).to_string().into(),
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ It only indicates the physical clustering of the data, which may improve the per
.map(|parallelism| Parallelism {
parallelism: parallelism.get(),
});
// Set the timezone for the stream environment
let env = graph.env.as_mut().unwrap();
env.timezone = context.get_session_timezone();
// Set the timezone for the stream context
let ctx = graph.ctx.as_mut().unwrap();
ctx.timezone = context.get_session_timezone();

(table, graph, can_run_in_background)
};
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/stream_fragmenter/graph/fragment_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_pb::stream_plan::stream_fragment_graph::{
StreamFragment as StreamFragmentProto, StreamFragmentEdge as StreamFragmentEdgeProto,
};
use risingwave_pb::stream_plan::{
DispatchStrategy, FragmentTypeFlag, StreamEnvironment,
DispatchStrategy, FragmentTypeFlag, StreamContext,
StreamFragmentGraph as StreamFragmentGraphProto, StreamNode,
};

Expand Down Expand Up @@ -92,8 +92,8 @@ pub struct StreamFragmentGraph {
/// stores edges between fragments: (upstream, downstream) => edge.
edges: HashMap<(LocalFragmentId, LocalFragmentId), StreamFragmentEdgeProto>,

/// Stores the environment for the streaming plan
env: StreamEnvironment,
/// Stores the streaming context for the streaming plan
ctx: StreamContext,
}

impl StreamFragmentGraph {
Expand All @@ -105,7 +105,7 @@ impl StreamFragmentGraph {
.map(|(k, v)| (*k, v.to_protobuf()))
.collect(),
edges: self.edges.values().cloned().collect(),
env: Some(self.env.clone()),
ctx: Some(self.ctx.clone()),
// To be filled later
dependent_table_ids: vec![],
table_ids_cnt: 0,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl StreamManagerService for StreamServiceImpl {
.collect_vec(),
})
.collect_vec(),
env: Some(tf.env.to_protobuf()),
ctx: Some(tf.ctx.to_protobuf()),
},
)
})
Expand Down
10 changes: 4 additions & 6 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_pb::meta::{FragmentParallelUnitMapping, PbTableFragments};
use risingwave_pb::source::PbConnectorSplits;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
PbDispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamEnvironment,
PbDispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext,
};
use sea_orm::sea_query::{Expr, Value};
use sea_orm::ActiveValue::Set;
Expand Down Expand Up @@ -249,7 +249,7 @@ impl CatalogController {
pub fn compose_table_fragments(
table_id: u32,
state: PbState,
env: Option<PbStreamEnvironment>,
ctx: Option<PbStreamContext>,
fragments: Vec<(
fragment::Model,
Vec<actor::Model>,
Expand Down Expand Up @@ -277,7 +277,7 @@ impl CatalogController {
fragments: pb_fragments,
actor_status: pb_actor_status,
actor_splits: pb_actor_splits,
env,
ctx,
};

Ok(table_fragments)
Expand Down Expand Up @@ -554,9 +554,7 @@ impl CatalogController {
Self::compose_table_fragments(
job_id as _,
job_info.job_status.into(),
job_info
.timezone
.map(|tz| PbStreamEnvironment { timezone: tz }),
job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
fragment_info,
parallel_units_map,
)
Expand Down
32 changes: 16 additions & 16 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_pb::meta::PbTableFragments;
use risingwave_pb::plan_common::PbExprContext;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
FragmentTypeFlag, PbStreamEnvironment, StreamActor, StreamNode, StreamSource,
FragmentTypeFlag, PbStreamContext, StreamActor, StreamNode, StreamSource,
};

use super::{ActorId, FragmentId};
Expand Down Expand Up @@ -58,19 +58,19 @@ pub struct TableFragments {
/// The splits of actors
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,

/// The environment associated with this stream plan and its fragments
pub env: StreamEnvironment,
/// The streaming context associated with this stream plan and its fragments
pub ctx: StreamContext,
}

#[derive(Debug, Clone, Default)]
pub struct StreamEnvironment {
pub struct StreamContext {
/// The timezone used to interpret timestamps and dates for conversion
pub timezone: Option<String>,
}

impl StreamEnvironment {
pub fn to_protobuf(&self) -> PbStreamEnvironment {
PbStreamEnvironment {
impl StreamContext {
pub fn to_protobuf(&self) -> PbStreamContext {
PbStreamContext {
timezone: self.timezone.clone().unwrap_or("".into()),
}
}
Expand All @@ -82,7 +82,7 @@ impl StreamEnvironment {
}
}

pub fn from_protobuf(prost: &PbStreamEnvironment) -> Self {
pub fn from_protobuf(prost: &PbStreamContext) -> Self {
Self {
timezone: if prost.get_timezone().is_empty() {
None
Expand All @@ -108,19 +108,19 @@ impl MetadataModel for TableFragments {
fragments: self.fragments.clone().into_iter().collect(),
actor_status: self.actor_status.clone().into_iter().collect(),
actor_splits: build_actor_connector_splits(&self.actor_splits),
env: Some(self.env.to_protobuf()),
ctx: Some(self.ctx.to_protobuf()),
}
}

fn from_protobuf(prost: Self::PbType) -> Self {
let env = StreamEnvironment::from_protobuf(prost.get_env().unwrap());
let ctx = StreamContext::from_protobuf(prost.get_ctx().unwrap());
Self {
table_id: TableId::new(prost.table_id),
state: prost.state(),
fragments: prost.fragments.into_iter().collect(),
actor_status: prost.actor_status.into_iter().collect(),
actor_splits: build_actor_split_impls(&prost.actor_splits),
env,
ctx,
}
}

Expand All @@ -136,7 +136,7 @@ impl TableFragments {
table_id,
fragments,
&BTreeMap::new(),
StreamEnvironment::default(),
StreamContext::default(),
)
}

Expand All @@ -146,7 +146,7 @@ impl TableFragments {
table_id: TableId,
fragments: BTreeMap<FragmentId, Fragment>,
actor_locations: &BTreeMap<ActorId, ParallelUnit>,
env: StreamEnvironment,
ctx: StreamContext,
) -> Self {
let actor_status = actor_locations
.iter()
Expand All @@ -167,7 +167,7 @@ impl TableFragments {
fragments,
actor_status,
actor_splits: HashMap::default(),
env,
ctx,
}
}

Expand All @@ -191,7 +191,7 @@ impl TableFragments {

/// Returns the timezone of the table
pub fn timezone(&self) -> Option<String> {
self.env.timezone.clone()
self.ctx.timezone.clone()
}

/// Returns whether the table fragments is in `Created` state.
Expand Down Expand Up @@ -520,7 +520,7 @@ impl TableFragments {
self.fragments.values_mut().for_each(|fragment| {
fragment.actors.iter_mut().for_each(|actor| {
if actor.expr_context.is_none() {
actor.expr_context = Some(self.env.to_expr_context());
actor.expr_context = Some(self.ctx.to_expr_context());
}
});
});
Expand Down
28 changes: 14 additions & 14 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::manager::{
SchemaId, SinkId, SourceId, StreamingClusterInfo, StreamingJob, TableId, UserId, ViewId,
IGNORED_NOTIFICATION_VERSION,
};
use crate::model::{FragmentId, StreamEnvironment, TableFragments};
use crate::model::{FragmentId, StreamContext, TableFragments};
use crate::rpc::cloud_provider::AwsEc2Client;
use crate::stream::{
validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph,
Expand Down Expand Up @@ -490,7 +490,7 @@ impl DdlController {
.unwrap();
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;

let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap());
let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());

tracing::debug!(id = stream_job.id(), "preparing stream job");
let fragment_graph = self
Expand All @@ -505,7 +505,7 @@ impl DdlController {
tracing::debug!(id = stream_job.id(), "building stream job");
let (ctx, table_fragments) = self
.build_stream_job(
env.clone(),
stream_ctx,
&stream_job,
fragment_graph,
affected_table_replace_info,
Expand Down Expand Up @@ -626,7 +626,7 @@ impl DdlController {
// Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.
async fn inject_replace_table_job(
&self,
env: StreamEnvironment,
stream_ctx: StreamContext,
sink_table_fragments: &TableFragments,
ReplaceTableInfo {
mut streaming_job,
Expand All @@ -639,7 +639,7 @@ impl DdlController {
.await?;

let (mut replace_table_ctx, mut table_fragments) = self
.build_replace_table(env.clone(), &streaming_job, fragment_graph, None)
.build_replace_table(stream_ctx, &streaming_job, fragment_graph, None)
.await?;

let mut union_fragment_id = None;
Expand Down Expand Up @@ -936,15 +936,15 @@ impl DdlController {
/// - Expand each fragment into one or several actors
async fn build_stream_job(
&self,
env: StreamEnvironment,
stream_ctx: StreamContext,
stream_job: &StreamingJob,
fragment_graph: StreamFragmentGraph,
affected_table_replace_info: Option<ReplaceTableInfo>,
) -> MetaResult<(CreateStreamingJobContext, TableFragments)> {
let id = stream_job.id();
let default_parallelism = fragment_graph.default_parallelism();
let internal_tables = fragment_graph.internal_tables();
let expr_context = env.to_expr_context();
let expr_context = stream_ctx.to_expr_context();

// 1. Resolve the upstream fragments, extend the fragment graph to a complete graph that
// contains all information needed for building the actor graph.
Expand Down Expand Up @@ -999,12 +999,12 @@ impl DdlController {
id.into(),
graph,
&building_locations.actor_locations,
env.clone(),
stream_ctx.clone(),
);

let replace_table_job_info = match affected_table_replace_info {
Some(replace_table_info) => Some(
self.inject_replace_table_job(env, &table_fragments, replace_table_info)
self.inject_replace_table_job(stream_ctx, &table_fragments, replace_table_info)
.await?,
),
None => None,
Expand Down Expand Up @@ -1226,7 +1226,7 @@ impl DdlController {
table_col_index_mapping: Option<ColIndexMapping>,
) -> MetaResult<NotificationVersion> {
let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await;
let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap());
let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());

let fragment_graph = self
.prepare_replace_table(&mut stream_job, fragment_graph)
Expand All @@ -1235,7 +1235,7 @@ impl DdlController {
let result = try {
let (ctx, table_fragments) = self
.build_replace_table(
env,
stream_ctx,
&stream_job,
fragment_graph,
table_col_index_mapping.clone(),
Expand Down Expand Up @@ -1289,14 +1289,14 @@ impl DdlController {
/// fragments.
async fn build_replace_table(
&self,
env: StreamEnvironment,
stream_ctx: StreamContext,
stream_job: &StreamingJob,
mut fragment_graph: StreamFragmentGraph,
table_col_index_mapping: Option<ColIndexMapping>,
) -> MetaResult<(ReplaceTableContext, TableFragments)> {
let id = stream_job.id();
let default_parallelism = fragment_graph.default_parallelism();
let expr_context = env.to_expr_context();
let expr_context = stream_ctx.to_expr_context();

let old_table_fragments = self
.fragment_manager
Expand Down Expand Up @@ -1376,7 +1376,7 @@ impl DdlController {
dummy_id.into(),
graph,
&building_locations.actor_locations,
env,
stream_ctx,
);

let ctx = ReplaceTableContext {
Expand Down
Loading

0 comments on commit 7b6bbf3

Please sign in to comment.