Skip to content

Commit

Permalink
Merge branch 'peng/fe-remove-pu-slot' into peng/remove-pu-sche-slot
Browse files Browse the repository at this point in the history
  • Loading branch information
Shanicky Chen committed May 27, 2024
2 parents fbdade2 + 6eda8c6 commit 0a7ee88
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 55 deletions.
28 changes: 2 additions & 26 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,44 +238,20 @@ services:
# # protobuf/avro schema registry. Should be removed after the support.
# # Related tracking issue:
# # https://github.com/redpanda-data/redpanda/issues/1878
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"

schemaregistry:
container_name: schemaregistry
hostname: schemaregistry
image: confluentinc/cp-schema-registry:latest
depends_on:
- kafka
- message_queue
ports:
- "8082:8082"
environment:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry:8082
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9093,PLAINTEXT_INTERNAL://localhost:29093
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: message_queue:29092
SCHEMA_REGISTRY_DEBUG: 'true'

kafka:
container_name: kafka
image: confluentinc/cp-kafka:latest
ports:
- "29093:29093"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,PLAINTEXT_INTERNAL://localhost:29093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

pulsar-server:
container_name: pulsar-server
image: apachepulsar/pulsar:latest
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dnf install -y lld
ld.lld --version

echo "--- Install dependencies"
dnf install -y perl-core wget python3 python3-devel cyrus-sasl-devel rsync
dnf install -y perl-core wget python3 python3-devel cyrus-sasl-devel rsync openssl-devel

echo "--- Install java and maven"
dnf install -y java-11-openjdk java-11-openjdk-devel
Expand Down
1 change: 1 addition & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ steps:
run: source-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
upload-container-logs: always
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 18
retry: *auto-retry
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ statement ok
CREATE TABLE kafka_json_schema_plain with (
connector = 'kafka',
kafka.topic = 'kafka_json_schema',
kafka.brokers = 'kafka:9093',
kafka.brokers = 'message_queue:29092',
kafka.scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON (schema.registry = 'http://schemaregistry:8082');

Expand All @@ -64,7 +64,7 @@ INCLUDE KEY AS rw_key
with (
connector = 'kafka',
kafka.topic = 'kafka_upsert_json_schema',
kafka.brokers = 'kafka:9093',
kafka.brokers = 'message_queue:29092',
kafka.scan.startup.mode = 'earliest'
) FORMAT UPSERT ENCODE JSON (schema.registry = 'http://schemaregistry:8082');

Expand Down
2 changes: 1 addition & 1 deletion scripts/source/prepare_ci_kafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ for filename in $kafka_data_files; do
elif [[ "$topic" = *avro_json ]]; then
python3 source/schema_registry_producer.py "message_queue:29092" "http://message_queue:8081" "$filename" "topic" "avro"
elif [[ "$topic" = *json_schema ]]; then
python3 source/schema_registry_producer.py "kafka:9093" "http://schemaregistry:8082" "$filename" "topic" "json"
python3 source/schema_registry_producer.py "message_queue:29092" "http://schemaregistry:8082" "$filename" "topic" "json"
else
cat "$filename" | kcat -P -K ^ -b message_queue:29092 -t "$topic"
fi
Expand Down
6 changes: 5 additions & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,11 @@ pub struct SessionConfig {
#[parameter(default = true, rename = "rw_streaming_enable_bushy_join")]
streaming_enable_bushy_join: bool,

/// Enable arrangement backfill for streaming queries. Defaults to false.
/// Enable arrangement backfill for streaming queries. Defaults to true.
/// When set to true, the parallelism of the upstream fragment will be
/// decoupled from the parallelism of the downstream scan fragment.
/// Or more generally, the parallelism of the upstream table / index / mv
/// will be decoupled from the parallelism of the downstream table / index / mv / sink.
#[parameter(default = true)]
streaming_use_arrangement_backfill: bool,

Expand Down
24 changes: 22 additions & 2 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ where
visit_inner(stream_node, &mut f)
}

/// A utility for to accessing the [`StreamNode`]. The returned bool is used to determine whether the access needs to continue.
pub fn visit_stream_node_cont<F>(stream_node: &mut StreamNode, mut f: F)
/// A utility for to accessing the [`StreamNode`] mutably. The returned bool is used to determine whether the access needs to continue.
pub fn visit_stream_node_cont_mut<F>(stream_node: &mut StreamNode, mut f: F)
where
F: FnMut(&mut StreamNode) -> bool,
{
Expand All @@ -56,6 +56,26 @@ where
visit_inner(stream_node, &mut f)
}

/// A utility for to accessing the [`StreamNode`] immutably. The returned bool is used to determine whether the access needs to continue.
pub fn visit_stream_node_cont<F>(stream_node: &StreamNode, mut f: F)
where
F: FnMut(&StreamNode) -> bool,
{
fn visit_inner<F>(stream_node: &StreamNode, f: &mut F)
where
F: FnMut(&StreamNode) -> bool,
{
if !f(stream_node) {
return;
}
for input in &stream_node.input {
visit_inner(input, f);
}
}

visit_inner(stream_node, &mut f)
}

/// A utility for visiting and mutating the [`NodeBody`] of the [`StreamNode`]s in a
/// [`StreamFragment`] recursively.
pub fn visit_fragment<F>(fragment: &mut StreamFragment, f: F)
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/debug/fix_table_fragments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use etcd_client::ConnectOptions;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
use risingwave_meta::model::{MetadataModel, TableFragments};
use risingwave_meta::storage::{EtcdMetaStore, WrappedEtcdClient};
use risingwave_pb::stream_plan::stream_node::NodeBody;
Expand Down Expand Up @@ -54,7 +54,7 @@ pub async fn fix_table_fragments(
.upstream_fragment_ids
.retain(|id| !dirty_fragment_ids.contains(id));
for actor in &mut fragment.actors {
visit_stream_node_cont(actor.nodes.as_mut().unwrap(), |node| {
visit_stream_node_cont_mut(actor.nodes.as_mut().unwrap(), |node| {
if let Some(NodeBody::Union(_)) = node.node_body {
node.input.retain_mut(|input| {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body
Expand Down
10 changes: 4 additions & 6 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS};
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
use risingwave_common::{bail, current_cluster_version};
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_meta_model_v2::object::ObjectType;
Expand Down Expand Up @@ -902,7 +902,7 @@ impl CatalogController {

let mut pb_stream_node = stream_node.to_protobuf();

visit_stream_node_cont(&mut pb_stream_node, |node| {
visit_stream_node_cont_mut(&mut pb_stream_node, |node| {
if let Some(NodeBody::Union(_)) = node.node_body {
node.input.retain_mut(|input| {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body
Expand Down Expand Up @@ -2758,11 +2758,9 @@ impl CatalogController {
let inner = self.inner.read().await;

// created table ids.
let mut table_ids: Vec<TableId> = Table::find()
let mut table_ids: Vec<TableId> = StreamingJob::find()
.select_only()
.column(table::Column::TableId)
.join(JoinType::LeftJoin, table::Relation::Object1.def())
.join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
.column(streaming_job::Column::JobId)
.filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
.into_tuple()
.all(&inner.db)
Expand Down
18 changes: 10 additions & 8 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping};
use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont};
use risingwave_common::util::stream_graph_visitor::{
visit_stream_node, visit_stream_node_cont, visit_stream_node_cont_mut,
};
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model_v2::SourceId;
use risingwave_pb::common::{PbParallelUnitMapping, PbWorkerSlotMapping};
Expand Down Expand Up @@ -654,12 +656,12 @@ impl FragmentManager {

let mut dirty_downstream_table_ids = HashMap::new();

for (table_id, table_fragment) in table_fragments.tree_mut() {
for (table_id, table_fragment) in table_fragments.tree_ref() {
if to_delete_table_ids.contains(table_id) {
continue;
}

for fragment in table_fragment.fragments.values_mut() {
for fragment in table_fragment.fragments.values() {
if fragment
.get_upstream_fragment_ids()
.iter()
Expand All @@ -668,11 +670,11 @@ impl FragmentManager {
continue;
}

for actor in &mut fragment.actors {
visit_stream_node_cont(actor.nodes.as_mut().unwrap(), |node| {
for actor in &fragment.actors {
visit_stream_node_cont(actor.nodes.as_ref().unwrap(), |node| {
if let Some(NodeBody::Union(_)) = node.node_body {
for input in &mut node.input {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body
for input in &node.input {
if let Some(NodeBody::Merge(merge_node)) = &input.node_body
&& !all_fragment_ids.contains(&merge_node.upstream_fragment_id)
{
dirty_downstream_table_ids
Expand Down Expand Up @@ -702,7 +704,7 @@ impl FragmentManager {
.retain(|upstream_fragment_id| all_fragment_ids.contains(upstream_fragment_id));

for actor in &mut fragment.actors {
visit_stream_node_cont(actor.nodes.as_mut().unwrap(), |node| {
visit_stream_node_cont_mut(actor.nodes.as_mut().unwrap(), |node| {
if let Some(NodeBody::Union(_)) = node.node_body {
node.input.retain_mut(|input| {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body
Expand Down
4 changes: 0 additions & 4 deletions src/meta/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,6 @@ impl<'a, K: Ord + Debug, V: Clone> BTreeMapTransaction<'a, K, V> {
self.tree_ref
}

pub fn tree_mut(&mut self) -> &mut BTreeMap<K, V> {
self.tree_ref
}

/// Get the value of the provided key by merging the staging value and the original value
pub fn get(&self, key: &K) -> Option<&V> {
self.staging
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::stream_graph_visitor::{
visit_fragment, visit_stream_node, visit_stream_node_cont,
visit_fragment, visit_stream_node, visit_stream_node_cont_mut,
};
use risingwave_common::{bail, current_cluster_version};
use risingwave_connector::dispatch_source_prop;
Expand Down Expand Up @@ -1162,7 +1162,7 @@ impl DdlController {
if let Some(node) = &mut actor.nodes {
let fields = node.fields.clone();

visit_stream_node_cont(node, |node| {
visit_stream_node_cont_mut(node, |node| {
if let Some(NodeBody::Union(_)) = &mut node.node_body {
for input in &mut node.input {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body
Expand Down

0 comments on commit 0a7ee88

Please sign in to comment.