diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 2bc81a73de5b..3f3cd705f09f 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -704,6 +704,46 @@ steps: timeout_in_minutes: 25 retry: *auto-retry + - label: "end-to-end test (postgres backend)" + key: e2e-test-postgres-backend + command: "ci/scripts/e2e-test.sh -p ci-release -m ci-3streaming-2serving-3fe-pg-backend" + if: | + !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-e2e-test-other-backends" + || build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/ + depends_on: + - "build" + - "build-other" + - "docslt" + plugins: + - docker-compose#v5.1.0: + run: pg-mysql-backend-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 25 + retry: *auto-retry + + - label: "end-to-end test (mysql backend)" + key: e2e-test-mysql-backend + command: "ci/scripts/e2e-test.sh -p ci-release -m ci-3streaming-2serving-3fe-mysql-backend" + if: | + !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-e2e-test-other-backends" + || build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/ + depends_on: + - "build" + - "build-other" + - "docslt" + plugins: + - docker-compose#v5.1.0: + run: pg-mysql-backend-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 25 + retry: *auto-retry + - label: "end-to-end test for opendal (parallel)" key: "e2e-test-opendal-parallel" command: "ci/scripts/e2e-test-parallel-for-opendal.sh -p ci-release" @@ -1112,4 +1152,4 @@ steps: # This should be the LAST part of the main-cron file. - label: "trigger failed test notification" if: build.pull_request.labels includes "ci/main-cron/test-notify" || build.branch == "main" - command: "ci/scripts/notify.py" \ No newline at end of file + command: "ci/scripts/notify.py" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 4b6079b72414..5924f0fe49bf 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -808,9 +808,7 @@ steps: - label: "end-to-end test (mysql backend)" command: "ci/scripts/e2e-test.sh -p ci-dev -m ci-3streaming-2serving-3fe-mysql-backend" - if: "false" - # TODO: enable this when all bugs of mysql backend are addressed. - # if: build.pull_request.labels includes "ci/run-e2e-test-other-backends" || build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/ + if: build.pull_request.labels includes "ci/run-e2e-test-other-backends" || build.env("CI_STEPS") =~ /(^|,)e2e-test-other-backends?(,|$$)/ depends_on: - "build" - "build-other" diff --git a/src/meta/model/migration/README.md b/src/meta/model/migration/README.md index d0253be5c05d..e354d556ed6d 100644 --- a/src/meta/model/migration/README.md +++ b/src/meta/model/migration/README.md @@ -54,3 +54,9 @@ ## Adding a migration - Add a new column to some catalogs. You can checkout the migration [m20240617_070131_index_column_properties.rs](src/m20240617_070131_index_column_properties.rs) as a reference. + +### Special notes on MySQL backend + +MySQL data types typically have stricter length limits compared to those in PostgreSQL or SQLite. For example, the `VARCHAR`, `TEXT`, `BLOB`, and `BINARY` data types have a maximum length of 65,535 bytes. + +If you need to store more data, e.g., SQL definition, UDF body, protobuf-encoded internal data, etc., please **avoid** using the built-in constructors like `ColumnDef::text` or `ColumnDef::blob`, but use the extensions defined in [`./src/utils.rs`](./src/utils.rs) instead. diff --git a/src/meta/model/migration/src/m20230908_072257_init.rs b/src/meta/model/migration/src/m20230908_072257_init.rs index d30e9fe1d949..38fba9101156 100644 --- a/src/meta/model/migration/src/m20230908_072257_init.rs +++ b/src/meta/model/migration/src/m20230908_072257_init.rs @@ -538,7 +538,11 @@ impl MigrationTrait for Migration { .json_binary() .not_null(), ) - .col(ColumnDef::new(Source::Definition).text().not_null()) + .col( + ColumnDef::new(Source::Definition) + .rw_long_text(manager) + .not_null(), + ) .col(ColumnDef::new(Source::SourceInfo).rw_binary(manager)) .col( ColumnDef::new(Source::WatermarkDescs) @@ -596,7 +600,11 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Table::VnodeColIndex).integer()) .col(ColumnDef::new(Table::RowIdIndex).integer()) .col(ColumnDef::new(Table::ValueIndices).json_binary().not_null()) - .col(ColumnDef::new(Table::Definition).text().not_null()) + .col( + ColumnDef::new(Table::Definition) + .rw_long_text(manager) + .not_null(), + ) .col( ColumnDef::new(Table::HandlePkConflictBehavior) .string() @@ -686,7 +694,11 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Sink::DownstreamPk).json_binary().not_null()) .col(ColumnDef::new(Sink::SinkType).string().not_null()) .col(ColumnDef::new(Sink::Properties).json_binary().not_null()) - .col(ColumnDef::new(Sink::Definition).text().not_null()) + .col( + ColumnDef::new(Sink::Definition) + .rw_long_text(manager) + .not_null(), + ) .col(ColumnDef::new(Sink::ConnectionId).integer()) .col(ColumnDef::new(Sink::DbName).string().not_null()) .col(ColumnDef::new(Sink::SinkFromName).string().not_null()) @@ -724,7 +736,11 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(View::ViewId).integer().primary_key()) .col(ColumnDef::new(View::Name).string().not_null()) .col(ColumnDef::new(View::Properties).json_binary().not_null()) - .col(ColumnDef::new(View::Definition).text().not_null()) + .col( + ColumnDef::new(View::Definition) + .rw_long_text(manager) + .not_null(), + ) .col(ColumnDef::new(View::Columns).rw_binary(manager).not_null()) .foreign_key( &mut ForeignKey::create() @@ -798,8 +814,9 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Function::Language).string().not_null()) .col(ColumnDef::new(Function::Link).string()) .col(ColumnDef::new(Function::Identifier).string()) - .col(ColumnDef::new(Function::Body).string()) - .col(ColumnDef::new(Function::CompressedBinary).string()) + .col(ColumnDef::new(Function::Body).rw_long_text(manager)) + // XXX: should this be binary type instead? + .col(ColumnDef::new(Function::CompressedBinary).rw_long_text(manager)) .col(ColumnDef::new(Function::Kind).string().not_null()) .col( ColumnDef::new(Function::AlwaysRetryOnNetworkError) diff --git a/src/meta/model/migration/src/m20240304_074901_subscription.rs b/src/meta/model/migration/src/m20240304_074901_subscription.rs index a7c791e828dc..f759d303b25a 100644 --- a/src/meta/model/migration/src/m20240304_074901_subscription.rs +++ b/src/meta/model/migration/src/m20240304_074901_subscription.rs @@ -40,7 +40,11 @@ impl MigrationTrait for Migration { .json_binary() .not_null(), ) - .col(ColumnDef::new(Subscription::Definition).string().not_null()) + .col( + ColumnDef::new(Subscription::Definition) + .rw_long_text(manager) + .not_null(), + ) .col( ColumnDef::new(Subscription::SubscriptionFromName) .string() diff --git a/src/meta/model/migration/src/m20240506_112555_subscription_partial_ckpt.rs b/src/meta/model/migration/src/m20240506_112555_subscription_partial_ckpt.rs index 1e0eec2c0bd2..e82f7aa59d1a 100644 --- a/src/meta/model/migration/src/m20240506_112555_subscription_partial_ckpt.rs +++ b/src/meta/model/migration/src/m20240506_112555_subscription_partial_ckpt.rs @@ -21,7 +21,11 @@ impl MigrationTrait for Migration { .primary_key(), ) .col(ColumnDef::new(Subscription::Name).string().not_null()) - .col(ColumnDef::new(Subscription::Definition).string().not_null()) + .col( + ColumnDef::new(Subscription::Definition) + .rw_long_text(manager) + .not_null(), + ) .col( ColumnDef::new(Subscription::RetentionSeconds) .string() @@ -67,7 +71,11 @@ impl MigrationTrait for Migration { .primary_key(), ) .col(ColumnDef::new(Subscription::Name).string().not_null()) - .col(ColumnDef::new(Subscription::Definition).string().not_null()) + .col( + ColumnDef::new(Subscription::Definition) + .rw_long_text(manager) + .not_null(), + ) .col( ColumnDef::new(Subscription::Columns) .rw_binary(manager) diff --git a/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs b/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs index bd2b12bd0724..be5b1fef90fd 100644 --- a/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs +++ b/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs @@ -6,22 +6,21 @@ pub struct Migration; #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Hack for unifying the test for different backends. + let json_is_empty_array = |col| { + Expr::col(col) + .cast_as(Alias::new("char(2)")) + .eq(Expr::value("[]")) + }; + // Fill vnode count with 1 for singleton tables. manager .exec_stmt( UpdateStatement::new() .table(Table::Table) .values([(Table::VnodeCount, Expr::value(1))]) - .and_where( - Expr::col(Table::DistributionKey) - .cast_as(Alias::new("varchar")) - .eq(Expr::value("[]")), - ) - .and_where( - Expr::col(Table::DistKeyInPk) - .cast_as(Alias::new("varchar")) - .eq(Expr::value("[]")), - ) + .and_where(json_is_empty_array(Table::DistributionKey)) + .and_where(json_is_empty_array(Table::DistKeyInPk)) .and_where(Expr::col(Table::VnodeColIndex).is_null()) .to_owned(), ) diff --git a/src/meta/model/migration/src/utils.rs b/src/meta/model/migration/src/utils.rs index 4f704374befd..7d3eb1fbb3ae 100644 --- a/src/meta/model/migration/src/utils.rs +++ b/src/meta/model/migration/src/utils.rs @@ -14,4 +14,16 @@ impl ColumnDef { DatabaseBackend::Postgres | DatabaseBackend::Sqlite => self.blob(), } } + + /// Set column type as `longtext` for MySQL, and `text` for Postgres and Sqlite. + /// + /// Should be preferred over [`text`](ColumnDef::text) or [`string`](ColumnDef::string) for large text fields, + /// typically user-specified contents like UDF body or SQL definition. Otherwise, MySQL will return an error + /// when the length exceeds 65535 bytes. + pub fn rw_long_text(&mut self, manager: &SchemaManager) -> &mut Self { + match manager.get_database_backend() { + DatabaseBackend::MySql => self.custom(Alias::new("longtext")), + DatabaseBackend::Postgres | DatabaseBackend::Sqlite => self.text(), + } + } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 10e6338bc213..6a1cddc0ef74 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1140,7 +1140,10 @@ impl GlobalBarrierWorker { .instrument(span) .await; } else { - panic!("failed to execute barrier: {}", err.as_report()); + panic!( + "a streaming error occurred while recovery is disabled, aborting: {:?}", + err.as_report() + ); } } diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 6c8eb32e6a6c..4187ce818b34 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -385,12 +385,7 @@ impl HummockManager { .exec(db) .await?; hummock_gc_history::Entity::insert_many(models) - .on_conflict( - OnConflict::column(hummock_gc_history::Column::ObjectId) - .do_nothing() - .to_owned(), - ) - .do_nothing() + .on_conflict_do_nothing() .exec(db) .await?; Ok(()) diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 4e28fa512abb..fab79fe01f9a 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -34,7 +34,6 @@ use risingwave_meta_model::{ hummock_time_travel_version, }; use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta}; -use sea_orm::sea_query::OnConflict; use sea_orm::ActiveValue::Set; use sea_orm::{ ColumnTrait, Condition, DatabaseTransaction, EntityTrait, QueryFilter, QueryOrder, QuerySelect, @@ -383,12 +382,7 @@ impl HummockManager { sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())), }; hummock_sstable_info::Entity::insert(m) - .on_conflict( - OnConflict::column(hummock_sstable_info::Column::SstId) - .do_nothing() - .to_owned(), - ) - .do_nothing() + .on_conflict_do_nothing() .exec(txn) .await?; count += 1; @@ -437,12 +431,7 @@ impl HummockManager { .into()), }; hummock_time_travel_version::Entity::insert(m) - .on_conflict( - OnConflict::column(hummock_time_travel_version::Column::VersionId) - .do_nothing() - .to_owned(), - ) - .do_nothing() + .on_conflict_do_nothing() .exec(txn) .await?; } @@ -468,12 +457,7 @@ impl HummockManager { .into()), }; hummock_time_travel_delta::Entity::insert(m) - .on_conflict( - OnConflict::column(hummock_time_travel_delta::Column::VersionId) - .do_nothing() - .to_owned(), - ) - .do_nothing() + .on_conflict_do_nothing() .exec(txn) .await?; }