Skip to content

Commit

Permalink
More merge skew fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jkosh44 committed Dec 12, 2024
1 parent e3798e7 commit cf8f451
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 10 deletions.
1 change: 1 addition & 0 deletions misc/python/materialize/checks/all_checks/source_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class TableFromPgSource(TableFromSourceBase):
suffix = "tbl_from_pg_source"

def initialize(self) -> Testdrive:

return Testdrive(
self.generic_setup()
+ dedent(
Expand Down
1 change: 0 additions & 1 deletion src/adapter-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,4 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&DEFAULT_SINK_PARTITION_STRATEGY)
.add(&ENABLE_CONTINUAL_TASK_BUILTINS)
.add(&ENABLE_EXPRESSION_CACHE)
.add(&ENABLE_SOURCE_TABLE_MIGRATION)
}
2 changes: 1 addition & 1 deletion src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use mz_catalog::builtin::{
use mz_catalog::durable::objects::{
ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleKey, SchemaKey,
};
use mz_catalog::durable::{CatalogError, SystemObjectMapping};
use mz_catalog::durable::{CatalogError, SystemObjectMapping};
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
Expand Down
15 changes: 7 additions & 8 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use std::collections::BTreeMap;

use mz_catalog::builtin::BuiltinTable;
use mz_catalog::durable::{
shard_id, Transaction, Item, BUILTIN_MIGRATION_SEED, BUILTIN_MIGRATION_SHARD_KEY,
shard_id, Transaction, BUILTIN_MIGRATION_SEED, BUILTIN_MIGRATION_SHARD_KEY,
EXPRESSION_CACHE_SEED, EXPRESSION_CACHE_SHARD_KEY,
};
use mz_catalog::memory::objects::{StateUpdate,BootstrapStateUpdateKind};
use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
use mz_ore::collections::CollectionExt;
use mz_ore::now::NowFn;
use mz_persist_types::ShardId;
Expand All @@ -35,16 +35,16 @@ fn rewrite_ast_items<F>(tx: &mut Transaction<'_>, mut f: F) -> Result<(), anyhow
where
F: for<'a> FnMut(
&'a mut Transaction<'_>,
GlobalId,
CatalogItemId,
&'a mut Statement<Raw>,
) -> Result<(), anyhow::Error>,
{
let mut updated_items = BTreeMap::new();

for mut item in tx.get_items() {
let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
// TODO(alter_table): Switch this to CatalogItemId.
f(tx, item.global_id, &mut stmt).await?;
f(tx, item.id, &mut stmt)?;

item.create_sql = stmt.to_ast_string_stable();

updated_items.insert(item.id, item);
Expand Down Expand Up @@ -114,8 +114,7 @@ pub(crate) async fn migrate(
ast_rewrite_sources_to_tables(tx, now)?;
}

rewrite_ast_items(tx, |tx, item, stmt, all_items_and_statements| {
let now = now.clone();
rewrite_ast_items(tx, |_tx, _id, _stmt| {
// Add per-item AST migrations below.
//
// Each migration should be a function that takes `stmt` (the AST
Expand All @@ -125,7 +124,7 @@ pub(crate) async fn migrate(
// Migration functions may also take `tx` as input to stage
// arbitrary changes to the catalog.

Ok(())
Ok(())
})?;

// Load items into catalog. We make sure to consolidate the old updates with the new updates to
Expand Down

0 comments on commit cf8f451

Please sign in to comment.