Skip to content

Commit

Permalink
Merge branch 'main' into li0k/storage_merge_metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Sep 19, 2024
2 parents 1c044be + c7b6fff commit cdeb417
Show file tree
Hide file tree
Showing 17 changed files with 75 additions and 29 deletions.
2 changes: 1 addition & 1 deletion ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ services:
retries: 5

message_queue:
image: "docker.vectorized.io/vectorized/redpanda:latest"
image: "redpandadata/redpanda:latest"
command:
- redpanda
- start
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-distributed-etcd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ services:
retries: 5
restart: always
message_queue:
image: "docker.vectorized.io/vectorized/redpanda:latest"
image: "redpandadata/redpanda:latest"
command:
- redpanda
- start
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-distributed.yml
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ services:
retries: 5
restart: always
message_queue:
image: "docker.vectorized.io/vectorized/redpanda:latest"
image: "redpandadata/redpanda:latest"
command:
- redpanda
- start
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-etcd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ services:
restart: always

message_queue:
image: "docker.vectorized.io/vectorized/redpanda:latest"
image: "redpandadata/redpanda:latest"
command:
- redpanda
- start
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-hdfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ services:
retries: 5
restart: always
message_queue:
image: "docker.vectorized.io/vectorized/redpanda:latest"
image: "redpandadata/redpanda:latest"
command:
- redpanda
- start
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-sqlite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ services:
restart: always

message_queue:
image: "docker.vectorized.io/vectorized/redpanda:latest"
image: "redpandadata/redpanda:latest"
command:
- redpanda
- start
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ services:
restart: always

message_queue:
image: "docker.vectorized.io/vectorized/redpanda:latest"
image: "redpandadata/redpanda:latest"
command:
- redpanda
- start
Expand Down
7 changes: 7 additions & 0 deletions e2e_test/batch/catalog/rw_depend.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ create materialized view mv1 as select t1.a from t1 join s1 on t1.a = s1.a;
statement ok
create materialized view mv2 as select * from mv1;

statement ok
create view v as select * from mv1;

statement ok
create sink sink1 from mv2 with (connector='blackhole');

Expand All @@ -37,13 +40,17 @@ mv2 mv1
sink1 mv2
sink2 t1
t2 sink2
v mv1

statement ok
drop sink sink1;

statement ok
drop table t2 cascade;

statement ok
drop view v;

statement ok
drop materialized view mv2;

Expand Down
5 changes: 4 additions & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ profile:
# - use: kafka
# persist-data: true

# To enable Confluent schema registry, uncomment the following line
# - use: schema-registry

default-v6:
steps:
- use: meta-node
Expand Down Expand Up @@ -1141,7 +1144,7 @@ compose:
risingwave: "ghcr.io/risingwavelabs/risingwave:latest"
prometheus: "prom/prometheus:latest"
minio: "quay.io/minio/minio:latest"
redpanda: "docker.vectorized.io/vectorized/redpanda:latest"
redpanda: "redpandadata/redpanda:latest"
grafana: "grafana/grafana-oss:latest"
etcd: "quay.io/coreos/etcd:latest"
tempo: "grafana/tempo:latest"
Expand Down
22 changes: 11 additions & 11 deletions src/connector/src/sink/encoder/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ mod tests {
}
]
}"#,
"encode q error: avro name ref unsupported yet",
"encode 'q' error: avro name ref unsupported yet",
);

test_err(
Expand All @@ -663,7 +663,7 @@ mod tests {
i64::MAX,
))),
r#"{"type": "fixed", "name": "Duration", "size": 12, "logicalType": "duration"}"#,
"encode error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
"encode '' error: -1 mons -1 days +2562047788:00:54.775807 overflows avro duration",
);

let avro_schema = AvroSchema::parse_str(
Expand Down Expand Up @@ -738,7 +738,7 @@ mod tests {
};
assert_eq!(
err.to_string(),
"Encode error: encode req error: field not present but required"
"Encode error: encode 'req' error: field not present but required"
);

let schema = Schema::new(vec![
Expand All @@ -751,7 +751,7 @@ mod tests {
};
assert_eq!(
err.to_string(),
"Encode error: encode extra error: field not in avro"
"Encode error: encode 'extra' error: field not in avro"
);

let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap();
Expand All @@ -761,14 +761,14 @@ mod tests {
};
assert_eq!(
err.to_string(),
r#"Encode error: encode error: expect avro record but got ["null","long"]"#
r#"Encode error: encode '' error: expect avro record but got ["null","long"]"#
);

test_err(
&DataType::Struct(StructType::new(vec![("f0", DataType::Boolean)])),
(),
r#"{"type": "record", "name": "T", "fields": [{"name": "f0", "type": "int"}]}"#,
"encode f0 error: cannot encode boolean column as \"int\" field",
"encode 'f0' error: cannot encode boolean column as \"int\" field",
);
}

Expand All @@ -790,7 +790,7 @@ mod tests {
&DataType::List(DataType::Int32.into()),
Some(ScalarImpl::List(ListValue::from_iter([Some(4), None]))).to_datum_ref(),
avro_schema,
"encode error: found null but required",
"encode '' error: found null but required",
);

test_ok(
Expand Down Expand Up @@ -829,7 +829,7 @@ mod tests {
&DataType::List(DataType::Boolean.into()),
(),
r#"{"type": "array", "items": "int"}"#,
"encode error: cannot encode boolean column as \"int\" field",
"encode '' error: cannot encode boolean column as \"int\" field",
);
}

Expand Down Expand Up @@ -863,14 +863,14 @@ mod tests {
t,
datum.to_datum_ref(),
both,
r#"encode error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
r#"encode '' error: cannot encode timestamp with time zone column as [{"type":"long","logicalType":"timestamp-millis"},{"type":"long","logicalType":"timestamp-micros"}] field"#,
);

test_err(
t,
datum.to_datum_ref(),
empty,
"encode error: cannot encode timestamp with time zone column as [] field",
"encode '' error: cannot encode timestamp with time zone column as [] field",
);

test_ok(
Expand All @@ -879,7 +879,7 @@ mod tests {
one,
Value::Union(0, Value::TimestampMillis(1).into()),
);
test_err(t, None, one, "encode error: found null but required");
test_err(t, None, one, "encode '' error: found null but required");

test_ok(
t,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl std::fmt::Display for FieldEncodeError {

write!(
f,
"encode {} error: {}",
"encode '{}' error: {}",
self.rev_path.iter().rev().join("."),
self.message
)
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/sink/encoder/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode repeated_int_field error: cannot encode integer[] column as int32 field"
"encode 'repeated_int_field' error: cannot encode integer[] column as int32 field"
);

let schema = Schema::new(vec![Field::with_name(
Expand All @@ -554,7 +554,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode repeated_int_field error: array containing null not allowed as repeated field"
"encode 'repeated_int_field' error: array containing null not allowed as repeated field"
);
}

Expand All @@ -573,7 +573,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode not_exists error: field not in proto"
"encode 'not_exists' error: field not in proto"
);

let err = validate_fields(
Expand All @@ -583,7 +583,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
"encode map_field error: field not in proto"
"encode 'map_field' error: field not in proto"
);
}
}
10 changes: 7 additions & 3 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use risingwave_common::array::StreamChunk;

use crate::sink::{Result, SinkError};
Expand Down Expand Up @@ -279,8 +279,12 @@ impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for AppendOnlyFormatter<

impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for UpsertFormatter<KE, VE> {
async fn build(b: FormatterParams<'_>) -> Result<Self> {
let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices)).await?;
let val_encoder = VE::build(b.builder, None).await?;
let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices))
.await
.with_context(|| "Failed to build key encoder")?;
let val_encoder = VE::build(b.builder, None)
.await
.with_context(|| "Failed to build value encoder")?;
Ok(UpsertFormatter::new(key_encoder, val_encoder))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ pub enum SinkError {
#[backtrace]
anyhow::Error,
),
#[error("Internal error: {0}")]
#[error(transparent)]
Internal(
#[from]
#[backtrace]
Expand Down
22 changes: 22 additions & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,28 @@ impl CatalogController {
})
.collect_vec();

let view_dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find()
.select_only()
.columns([
object_dependency::Column::Oid,
object_dependency::Column::UsedBy,
])
.join(
JoinType::InnerJoin,
object_dependency::Relation::Object1.def(),
)
.join(JoinType::InnerJoin, object::Relation::View.def())
.into_tuple()
.all(&inner.db)
.await?;

obj_dependencies.extend(view_dependencies.into_iter().map(|(view_id, table_id)| {
PbObjectDependencies {
object_id: table_id as _,
referenced_object_id: view_id as _,
}
}));

let sink_dependencies: Vec<(SinkId, TableId)> = Sink::find()
.select_only()
.columns([sink::Column::SinkId, sink::Column::TargetTable])
Expand Down
8 changes: 8 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4217,6 +4217,14 @@ impl CatalogManager {
}
}
}
for view in core.views.values() {
for referenced in &view.dependent_relations {
dependencies.push(PbObjectDependencies {
object_id: view.id,
referenced_object_id: *referenced,
});
}
}
for sink in core.sinks.values() {
for referenced in &sink.dependent_relations {
dependencies.push(PbObjectDependencies {
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,7 +1603,9 @@ impl ScaleController {

for (worker_id, n) in decreased_actor_count {
if let Some(actor_ids) = worker_to_actors.get(worker_id) {
assert!(actor_ids.len() >= n);
if actor_ids.len() < n {
bail!("plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",fragment_id, worker_id, actor_ids.len(), n);
}

let removed_actors: Vec<_> = actor_ids
.iter()
Expand Down

0 comments on commit cdeb417

Please sign in to comment.