Skip to content

Commit

Permalink
Merge branch 'main' into fix/internal-table-privilege
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Nov 4, 2024
2 parents 5146b9c + 6b10d92 commit cba9f30
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 49 deletions.
42 changes: 41 additions & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,46 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "end-to-end test (postgres backend)"
key: e2e-test-postgres-backend
command: "ci/scripts/e2e-test.sh -p ci-release -m ci-3streaming-2serving-3fe-pg-backend"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-test-other-backends"
|| build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/
depends_on:
- "build"
- "build-other"
- "docslt"
plugins:
- docker-compose#v5.1.0:
run: pg-mysql-backend-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "end-to-end test (mysql backend)"
key: e2e-test-mysql-backend
command: "ci/scripts/e2e-test.sh -p ci-release -m ci-3streaming-2serving-3fe-mysql-backend"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-test-other-backends"
|| build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/
depends_on:
- "build"
- "build-other"
- "docslt"
plugins:
- docker-compose#v5.1.0:
run: pg-mysql-backend-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "end-to-end test for opendal (parallel)"
key: "e2e-test-opendal-parallel"
command: "ci/scripts/e2e-test-parallel-for-opendal.sh -p ci-release"
Expand Down Expand Up @@ -1112,4 +1152,4 @@ steps:
# This should be the LAST part of the main-cron file.
- label: "trigger failed test notification"
if: build.pull_request.labels includes "ci/main-cron/test-notify" || build.branch == "main"
command: "ci/scripts/notify.py"
command: "ci/scripts/notify.py"
4 changes: 1 addition & 3 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -808,9 +808,7 @@ steps:

- label: "end-to-end test (mysql backend)"
command: "ci/scripts/e2e-test.sh -p ci-dev -m ci-3streaming-2serving-3fe-mysql-backend"
if: "false"
# TODO: enable this when all bugs of mysql backend are addressed.
# if: build.pull_request.labels includes "ci/run-e2e-test-other-backends" || build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/
if: build.pull_request.labels includes "ci/run-e2e-test-other-backends" || build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/
depends_on:
- "build"
- "build-other"
Expand Down
6 changes: 6 additions & 0 deletions src/meta/model/migration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,9 @@
## Adding a migration

- Add a new column to some catalogs. You can checkout the migration [m20240617_070131_index_column_properties.rs](src/m20240617_070131_index_column_properties.rs) as a reference.

### Special notes on MySQL backend

MySQL data types typically have stricter length limits compared to those in PostgreSQL or SQLite. For example, the `VARCHAR`, `TEXT`, `BLOB`, and `BINARY` data types have a maximum length of 65,535 bytes.

If you need to store more data, e.g., SQL definition, UDF body, protobuf-encoded internal data, etc., please **avoid** using the built-in constructors like `ColumnDef::text` or `ColumnDef::blob`, but use the extensions defined in [`./src/utils.rs`](./src/utils.rs) instead.
29 changes: 23 additions & 6 deletions src/meta/model/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,11 @@ impl MigrationTrait for Migration {
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Source::Definition).text().not_null())
.col(
ColumnDef::new(Source::Definition)
.rw_long_text(manager)
.not_null(),
)
.col(ColumnDef::new(Source::SourceInfo).rw_binary(manager))
.col(
ColumnDef::new(Source::WatermarkDescs)
Expand Down Expand Up @@ -596,7 +600,11 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Table::VnodeColIndex).integer())
.col(ColumnDef::new(Table::RowIdIndex).integer())
.col(ColumnDef::new(Table::ValueIndices).json_binary().not_null())
.col(ColumnDef::new(Table::Definition).text().not_null())
.col(
ColumnDef::new(Table::Definition)
.rw_long_text(manager)
.not_null(),
)
.col(
ColumnDef::new(Table::HandlePkConflictBehavior)
.string()
Expand Down Expand Up @@ -686,7 +694,11 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Sink::DownstreamPk).json_binary().not_null())
.col(ColumnDef::new(Sink::SinkType).string().not_null())
.col(ColumnDef::new(Sink::Properties).json_binary().not_null())
.col(ColumnDef::new(Sink::Definition).text().not_null())
.col(
ColumnDef::new(Sink::Definition)
.rw_long_text(manager)
.not_null(),
)
.col(ColumnDef::new(Sink::ConnectionId).integer())
.col(ColumnDef::new(Sink::DbName).string().not_null())
.col(ColumnDef::new(Sink::SinkFromName).string().not_null())
Expand Down Expand Up @@ -724,7 +736,11 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(View::ViewId).integer().primary_key())
.col(ColumnDef::new(View::Name).string().not_null())
.col(ColumnDef::new(View::Properties).json_binary().not_null())
.col(ColumnDef::new(View::Definition).text().not_null())
.col(
ColumnDef::new(View::Definition)
.rw_long_text(manager)
.not_null(),
)
.col(ColumnDef::new(View::Columns).rw_binary(manager).not_null())
.foreign_key(
&mut ForeignKey::create()
Expand Down Expand Up @@ -798,8 +814,9 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Function::Language).string().not_null())
.col(ColumnDef::new(Function::Link).string())
.col(ColumnDef::new(Function::Identifier).string())
.col(ColumnDef::new(Function::Body).string())
.col(ColumnDef::new(Function::CompressedBinary).string())
.col(ColumnDef::new(Function::Body).rw_long_text(manager))
// XXX: should this be binary type instead?
.col(ColumnDef::new(Function::CompressedBinary).rw_long_text(manager))
.col(ColumnDef::new(Function::Kind).string().not_null())
.col(
ColumnDef::new(Function::AlwaysRetryOnNetworkError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ impl MigrationTrait for Migration {
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Subscription::Definition).string().not_null())
.col(
ColumnDef::new(Subscription::Definition)
.rw_long_text(manager)
.not_null(),
)
.col(
ColumnDef::new(Subscription::SubscriptionFromName)
.string()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ impl MigrationTrait for Migration {
.primary_key(),
)
.col(ColumnDef::new(Subscription::Name).string().not_null())
.col(ColumnDef::new(Subscription::Definition).string().not_null())
.col(
ColumnDef::new(Subscription::Definition)
.rw_long_text(manager)
.not_null(),
)
.col(
ColumnDef::new(Subscription::RetentionSeconds)
.string()
Expand Down Expand Up @@ -67,7 +71,11 @@ impl MigrationTrait for Migration {
.primary_key(),
)
.col(ColumnDef::new(Subscription::Name).string().not_null())
.col(ColumnDef::new(Subscription::Definition).string().not_null())
.col(
ColumnDef::new(Subscription::Definition)
.rw_long_text(manager)
.not_null(),
)
.col(
ColumnDef::new(Subscription::Columns)
.rw_binary(manager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,21 @@ pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Hack for unifying the test for different backends.
let json_is_empty_array = |col| {
Expr::col(col)
.cast_as(Alias::new("char(2)"))
.eq(Expr::value("[]"))
};

// Fill vnode count with 1 for singleton tables.
manager
.exec_stmt(
UpdateStatement::new()
.table(Table::Table)
.values([(Table::VnodeCount, Expr::value(1))])
.and_where(
Expr::col(Table::DistributionKey)
.cast_as(Alias::new("varchar"))
.eq(Expr::value("[]")),
)
.and_where(
Expr::col(Table::DistKeyInPk)
.cast_as(Alias::new("varchar"))
.eq(Expr::value("[]")),
)
.and_where(json_is_empty_array(Table::DistributionKey))
.and_where(json_is_empty_array(Table::DistKeyInPk))
.and_where(Expr::col(Table::VnodeColIndex).is_null())
.to_owned(),
)
Expand Down
12 changes: 12 additions & 0 deletions src/meta/model/migration/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,16 @@ impl ColumnDef {
DatabaseBackend::Postgres | DatabaseBackend::Sqlite => self.blob(),
}
}

/// Set column type as `longtext` for MySQL, and `text` for Postgres and Sqlite.
///
/// Should be preferred over [`text`](ColumnDef::text) or [`string`](ColumnDef::string) for large text fields,
/// typically user-specified contents like UDF body or SQL definition. Otherwise, MySQL will return an error
/// when the length exceeds 65535 bytes.
pub fn rw_long_text(&mut self, manager: &SchemaManager) -> &mut Self {
match manager.get_database_backend() {
DatabaseBackend::MySql => self.custom(Alias::new("longtext")),
DatabaseBackend::Postgres | DatabaseBackend::Sqlite => self.text(),
}
}
}
5 changes: 4 additions & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,10 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
.instrument(span)
.await;
} else {
panic!("failed to execute barrier: {}", err.as_report());
panic!(
"a streaming error occurred while recovery is disabled, aborting: {:?}",
err.as_report()
);
}
}

Expand Down
7 changes: 1 addition & 6 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,7 @@ impl HummockManager {
.exec(db)
.await?;
hummock_gc_history::Entity::insert_many(models)
.on_conflict(
OnConflict::column(hummock_gc_history::Column::ObjectId)
.do_nothing()
.to_owned(),
)
.do_nothing()
.on_conflict_do_nothing()
.exec(db)
.await?;
Ok(())
Expand Down
22 changes: 3 additions & 19 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use risingwave_meta_model::{
hummock_time_travel_version,
};
use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
use sea_orm::sea_query::OnConflict;
use sea_orm::ActiveValue::Set;
use sea_orm::{
ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
Expand Down Expand Up @@ -383,12 +382,7 @@ impl HummockManager {
sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
};
hummock_sstable_info::Entity::insert(m)
.on_conflict(
OnConflict::column(hummock_sstable_info::Column::SstId)
.do_nothing()
.to_owned(),
)
.do_nothing()
.on_conflict_do_nothing()
.exec(txn)
.await?;
count += 1;
Expand Down Expand Up @@ -437,12 +431,7 @@ impl HummockManager {
.into()),
};
hummock_time_travel_version::Entity::insert(m)
.on_conflict(
OnConflict::column(hummock_time_travel_version::Column::VersionId)
.do_nothing()
.to_owned(),
)
.do_nothing()
.on_conflict_do_nothing()
.exec(txn)
.await?;
}
Expand All @@ -468,12 +457,7 @@ impl HummockManager {
.into()),
};
hummock_time_travel_delta::Entity::insert(m)
.on_conflict(
OnConflict::column(hummock_time_travel_delta::Column::VersionId)
.do_nothing()
.to_owned(),
)
.do_nothing()
.on_conflict_do_nothing()
.exec(txn)
.await?;
}
Expand Down

0 comments on commit cba9f30

Please sign in to comment.