From 6c108ab4b74f35d345e71f7d344d30fe20f3c3d7 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 29 Apr 2021 09:06:09 +0800 Subject: [PATCH] Hotfix again (#1641) --- .gitignore | 1 + dm/config/subtask.go | 3 ++ dm/config/task.go | 6 +++ syncer/ddl.go | 2 +- syncer/sharding_group.go | 40 +++++++++++++++---- syncer/syncer.go | 13 +++--- tests/all_mode/conf/diff_config.toml | 2 - tests/compatibility/conf/diff_config.toml | 2 - tests/dmctl_basic/conf/diff_config.toml | 2 - tests/dmctl_command/conf/diff_config.toml | 2 - tests/full_mode/conf/diff_config.toml | 2 - tests/http_apis/conf/diff_config.toml | 2 - tests/incremental_mode/conf/diff_config.toml | 2 - tests/initial_unit/conf/diff_config.toml | 2 - tests/load_interrupt/conf/diff_config.toml | 2 - tests/online_ddl/conf/diff_config.toml | 2 - tests/print_status/conf/diff_config.toml | 2 - tests/relay_interrupt/conf/diff_config.toml | 2 - tests/retry_cancel/conf/diff_config.toml | 2 - tests/safe_mode/conf/diff_config.toml | 2 - .../sequence_safe_mode/conf/diff_config.toml | 2 - tests/sequence_sharding/conf/diff_config.toml | 2 - tests/sequence_sharding/conf/dm-task.yaml | 1 + .../sequence_sharding/data/db1.increment.sql | 3 +- tests/sharding/conf/diff_config.toml | 2 - tests/start_task/conf/diff_config.toml | 2 - 26 files changed, 54 insertions(+), 51 deletions(-) diff --git a/.gitignore b/.gitignore index 8549cc6115..5e875a1d11 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ relay_log/* vendor */*.DS_Store tidb-slow.log +.idea/ diff --git a/dm/config/subtask.go b/dm/config/subtask.go index ba8b0f9721..3e20b6f63c 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -181,6 +181,9 @@ type SubTaskConfig struct { ConfigFile string `toml:"-" json:"config-file"` + UpperSchema []string `toml:"upper-schema" json:"upper-schema"` + UpperTable []string `toml:"upper-table" json:"upper-table"` + // still needed by Syncer / Loader bin printVersion bool } diff --git a/dm/config/task.go b/dm/config/task.go index cfa867fdcb..47966beba5 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -263,6 +263,9 @@ type TaskConfig struct { Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"` Loaders map[string]*LoaderConfig `yaml:"loaders"` Syncers map[string]*SyncerConfig `yaml:"syncers"` + + UpperSchema []string `yaml:"upper-schema"` + UpperTable []string `yaml:"upper-table"` } // NewTaskConfig creates a TaskConfig @@ -520,6 +523,9 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf cfg.LoaderConfig = *inst.Loader cfg.SyncerConfig = *inst.Syncer + cfg.UpperSchema = c.UpperSchema + cfg.UpperTable = c.UpperTable + err := cfg.Adjust(true) if err != nil { return nil, terror.Annotatef(err, "source %s", inst.SourceID) diff --git a/syncer/ddl.go b/syncer/ddl.go index 1c78048279..38587fe02e 100644 --- a/syncer/ddl.go +++ b/syncer/ddl.go @@ -256,7 +256,7 @@ func (s *Syncer) dropSchemaInSharding(tctx *tcontext.Context, sourceSchema strin targetSchema, targetTable := UnpackTableID(name) sourceIDs := make([]string, 0, len(tables)) for _, table := range tables { - sourceID, _ := GenTableID(table[0], table[1]) + sourceID, _ := GenTableID(table[0], table[1], s.cfg.UpperSchema, s.cfg.UpperTable) sourceIDs = append(sourceIDs, sourceID) } err := s.sgk.LeaveGroup(targetSchema, targetTable, sourceIDs) diff --git a/syncer/sharding_group.go b/syncer/sharding_group.go index 0f4f1281a7..d5109065b4 100644 --- a/syncer/sharding_group.go +++ b/syncer/sharding_group.go @@ -79,6 +79,7 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" shardmeta "github.com/pingcap/dm/syncer/sharding-meta" @@ -379,7 +380,32 @@ func (sg *ShardingGroup) FlushData(targetTableID string) ([]string, [][]interfac } // GenTableID generates table ID -func GenTableID(schema, table string) (ID string, isSchemaOnly bool) { +func GenTableID(schema, table string, upperSchema, upperTable []string) (ID string, isSchemaOnly bool) { + for _, s := range upperSchema { + if s == schema { + oldSchema := schema + schema = strings.ToLower(schema) + if oldSchema != schema { + log.L().Warn("hotfix, changing schema to lowercase", + zap.String("old schema", oldSchema), + zap.String("schema", schema)) + } + break + } + } + for _, t := range upperTable { + if t == table { + oldTable := table + table = strings.ToLower(table) + if oldTable != table { + log.L().Warn("hotfix, changing table to lowercase", + zap.String("old table", oldTable), + zap.String("table", table)) + } + break + } + } + if len(table) == 0 { return fmt.Sprintf("`%s`", schema), true } @@ -425,8 +451,8 @@ func NewShardingGroupKeeper(tctx *tcontext.Context, cfg *config.SubTaskConfig) * func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, meta *shardmeta.ShardingMeta, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error) { // if need to support target table-level sharding DDL // we also need to support target schema-level sharding DDL - schemaID, _ := GenTableID(targetSchema, "") - targetTableID, _ := GenTableID(targetSchema, targetTable) + schemaID, _ := GenTableID(targetSchema, "", k.cfg.UpperSchema, k.cfg.UpperTable) + targetTableID, _ := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable) k.Lock() defer k.Unlock() @@ -486,8 +512,8 @@ func (k *ShardingGroupKeeper) ResetGroups() { // LeaveGroup leaves group according to target schema, table and source IDs // LeaveGroup doesn't affect in syncing process func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sources []string) error { - schemaID, _ := GenTableID(targetSchema, "") - targetTableID, _ := GenTableID(targetSchema, targetTable) + schemaID, _ := GenTableID(targetSchema, "", k.cfg.UpperSchema, k.cfg.UpperTable) + targetTableID, _ := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable) k.Lock() defer k.Unlock() if group, ok := k.groups[targetTableID]; ok { @@ -514,7 +540,7 @@ func (k *ShardingGroupKeeper) TrySync( targetSchema, targetTable, source string, pos, endPos mysql.Position, ddls []string) ( needShardingHandle bool, group *ShardingGroup, synced, active bool, remain int, err error) { - targetTableID, schemaOnly := GenTableID(targetSchema, targetTable) + targetTableID, schemaOnly := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable) if schemaOnly { // NOTE: now we don't support syncing for schema only sharding DDL return false, nil, true, false, 0, nil @@ -563,7 +589,7 @@ func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, [][]string) { // Group returns target table's group, nil if not exist func (k *ShardingGroupKeeper) Group(targetSchema, targetTable string) *ShardingGroup { - targetTableID, _ := GenTableID(targetSchema, targetTable) + targetTableID, _ := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable) k.RLock() defer k.RUnlock() return k.groups[targetTableID] diff --git a/syncer/syncer.go b/syncer/syncer.go index 7175b8e101..29cab838db 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -440,7 +440,7 @@ func (s *Syncer) initShardingGroups() error { if !ok { mSchema[targetTable] = make([]string, 0, len(tables)) } - ID, _ := GenTableID(schema, table) + ID, _ := GenTableID(schema, table, s.cfg.UpperSchema, s.cfg.UpperTable) mSchema[targetTable] = append(mSchema[targetTable], ID) } } @@ -453,7 +453,7 @@ func (s *Syncer) initShardingGroups() error { // add sharding group for targetSchema, mSchema := range mapper { for targetTable, sourceIDs := range mSchema { - tableID, _ := GenTableID(targetSchema, targetTable) + tableID, _ := GenTableID(targetSchema, targetTable, s.cfg.UpperSchema, s.cfg.UpperTable) _, _, _, _, err := s.sgk.AddGroup(targetSchema, targetTable, sourceIDs, loadMeta[tableID], false) if err != nil { return err @@ -1447,7 +1447,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } if s.cfg.IsSharding { - source, _ := GenTableID(originSchema, originTable) + source, _ := GenTableID(originSchema, originTable, s.cfg.UpperSchema, s.cfg.UpperTable) if s.sgk.InSyncing(schemaName, tableName, source, *ec.currentPos) { // if in unsync stage and not before active DDL, ignore it // if in sharding re-sync stage and not before active DDL (the next DDL to be synced), ignore it @@ -1666,7 +1666,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e } continue case *ast.DropTableStmt: - sourceID, _ := GenTableID(tableNames[0][0].Schema, tableNames[0][0].Name) + sourceID, _ := GenTableID(tableNames[0][0].Schema, tableNames[0][0].Name, s.cfg.UpperSchema, s.cfg.UpperTable) err = s.sgk.LeaveGroup(tableNames[1][0].Schema, tableNames[1][0].Name, []string{sourceID}) if err != nil { return err @@ -1772,7 +1772,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e Name: ec.currentPos.Name, Pos: ec.currentPos.Pos - ec.header.EventSize, } - source, _ = GenTableID(ddlInfo.tableNames[0][0].Schema, ddlInfo.tableNames[0][0].Name) + + source, _ = GenTableID(ddlInfo.tableNames[0][0].Schema, ddlInfo.tableNames[0][0].Name, s.cfg.UpperSchema, s.cfg.UpperTable) var annotate string switch ddlInfo.stmt.(type) { @@ -1801,7 +1802,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e s.tctx.L().Info(annotate, zap.String("event", "query"), zap.String("source", source), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.Bool("in-sharding", needShardingHandle), zap.Stringer("start position", startPos), zap.Bool("is-synced", synced), zap.Int("unsynced", remain)) if needShardingHandle { - target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) + target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name, s.cfg.UpperSchema, s.cfg.UpperTable) unsyncedTableGauge.WithLabelValues(s.cfg.Name, target).Set(float64(remain)) err = ec.safeMode.IncrForTable(s.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try enable safe-mode when starting syncing for sharding group if err != nil { diff --git a/tests/all_mode/conf/diff_config.toml b/tests/all_mode/conf/diff_config.toml index 1bbbba7238..6dfc688aaf 100644 --- a/tests/all_mode/conf/diff_config.toml +++ b/tests/all_mode/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/compatibility/conf/diff_config.toml b/tests/compatibility/conf/diff_config.toml index dddba9882c..4d24521a7d 100644 --- a/tests/compatibility/conf/diff_config.toml +++ b/tests/compatibility/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/dmctl_basic/conf/diff_config.toml b/tests/dmctl_basic/conf/diff_config.toml index 5069e5ff36..4d31f49b10 100644 --- a/tests/dmctl_basic/conf/diff_config.toml +++ b/tests/dmctl_basic/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/dmctl_command/conf/diff_config.toml b/tests/dmctl_command/conf/diff_config.toml index 7b7763cc88..0c0f052725 100644 --- a/tests/dmctl_command/conf/diff_config.toml +++ b/tests/dmctl_command/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/full_mode/conf/diff_config.toml b/tests/full_mode/conf/diff_config.toml index 94c32a33a1..4c52250773 100644 --- a/tests/full_mode/conf/diff_config.toml +++ b/tests/full_mode/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/http_apis/conf/diff_config.toml b/tests/http_apis/conf/diff_config.toml index ce36cee9d0..c186af889a 100644 --- a/tests/http_apis/conf/diff_config.toml +++ b/tests/http_apis/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/incremental_mode/conf/diff_config.toml b/tests/incremental_mode/conf/diff_config.toml index c981244dd8..1b7eefac07 100644 --- a/tests/incremental_mode/conf/diff_config.toml +++ b/tests/incremental_mode/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/initial_unit/conf/diff_config.toml b/tests/initial_unit/conf/diff_config.toml index 68d122475f..a7cc566a48 100644 --- a/tests/initial_unit/conf/diff_config.toml +++ b/tests/initial_unit/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/load_interrupt/conf/diff_config.toml b/tests/load_interrupt/conf/diff_config.toml index fbd40e48b2..6210cfeb43 100644 --- a/tests/load_interrupt/conf/diff_config.toml +++ b/tests/load_interrupt/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/online_ddl/conf/diff_config.toml b/tests/online_ddl/conf/diff_config.toml index 5fa9641ddb..8b386d2b25 100644 --- a/tests/online_ddl/conf/diff_config.toml +++ b/tests/online_ddl/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/print_status/conf/diff_config.toml b/tests/print_status/conf/diff_config.toml index 92e9dc79de..e9a12d9da3 100644 --- a/tests/print_status/conf/diff_config.toml +++ b/tests/print_status/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true use-checkpoint = false diff --git a/tests/relay_interrupt/conf/diff_config.toml b/tests/relay_interrupt/conf/diff_config.toml index 5eda165748..d0d551a3e0 100644 --- a/tests/relay_interrupt/conf/diff_config.toml +++ b/tests/relay_interrupt/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/retry_cancel/conf/diff_config.toml b/tests/retry_cancel/conf/diff_config.toml index 0244047d6c..9c77fb8251 100644 --- a/tests/retry_cancel/conf/diff_config.toml +++ b/tests/retry_cancel/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/safe_mode/conf/diff_config.toml b/tests/safe_mode/conf/diff_config.toml index 8246b8b971..18dcc752c0 100644 --- a/tests/safe_mode/conf/diff_config.toml +++ b/tests/safe_mode/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/sequence_safe_mode/conf/diff_config.toml b/tests/sequence_safe_mode/conf/diff_config.toml index 0a88341af0..19e982723e 100644 --- a/tests/sequence_safe_mode/conf/diff_config.toml +++ b/tests/sequence_safe_mode/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/sequence_sharding/conf/diff_config.toml b/tests/sequence_sharding/conf/diff_config.toml index 436877afe8..84c78b891f 100644 --- a/tests/sequence_sharding/conf/diff_config.toml +++ b/tests/sequence_sharding/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/sequence_sharding/conf/dm-task.yaml b/tests/sequence_sharding/conf/dm-task.yaml index c809bf532a..50498c778b 100644 --- a/tests/sequence_sharding/conf/dm-task.yaml +++ b/tests/sequence_sharding/conf/dm-task.yaml @@ -6,6 +6,7 @@ meta-schema: "dm_meta" remove-meta: false enable-heartbeat: true timezone: "Asia/Shanghai" +upper-table: ["T1"] target-database: host: "127.0.0.1" diff --git a/tests/sequence_sharding/data/db1.increment.sql b/tests/sequence_sharding/data/db1.increment.sql index 801c992577..6e57df2a3b 100644 --- a/tests/sequence_sharding/data/db1.increment.sql +++ b/tests/sequence_sharding/data/db1.increment.sql @@ -3,7 +3,8 @@ insert into t1 (uid,name) values (100003,'NR'); update t1 set name = 'uxoKehvqWg' where `uid` = 100001; update t1 set name = 'bapYymrtfT' where name = 'igvApUx'; insert into t2 (uid,name) values (200004,'CXDvoltoliUINgo'),(200005,'188689130'); -alter table t1 add column c int; +alter table T1 add column c int; +insert into t1 (uid,name,c) values (123123,'test',123); alter table t1 add index c(c); update t1 set c = 100; alter table t1 add column d int; diff --git a/tests/sharding/conf/diff_config.toml b/tests/sharding/conf/diff_config.toml index 5aa5cf6584..b9b459dc21 100644 --- a/tests/sharding/conf/diff_config.toml +++ b/tests/sharding/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql" diff --git a/tests/start_task/conf/diff_config.toml b/tests/start_task/conf/diff_config.toml index a68d568763..e69b9d1e37 100644 --- a/tests/start_task/conf/diff_config.toml +++ b/tests/start_task/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql"