From 4ecdcef7f495f35fe47f7533520401a84704724b Mon Sep 17 00:00:00 2001 From: Archer Li Date: Tue, 22 Aug 2017 11:08:34 +0800 Subject: [PATCH] support any length table postfix tbl_name_xxxxxxxx set default table_name_postfix refactor --- config/config.go | 15 +++++++++++ proxy/router/router.go | 48 +++++++++++++++++++++++++---------- proxy/server/conn_preshard.go | 6 ++++- 3 files changed, 55 insertions(+), 14 deletions(-) diff --git a/config/config.go b/config/config.go index 08b0c9d6..4f5cd823 100644 --- a/config/config.go +++ b/config/config.go @@ -75,6 +75,7 @@ type ShardConfig struct { Type string `yaml:"type"` TableRowLimit int `yaml:"table_row_limit"` DateRange []string `yaml:"date_range"` + TableNamePostfixLength int `yaml:"table_name_postfix_length"` } func ParseConfigData(data []byte) (*Config, error) { @@ -82,6 +83,9 @@ func ParseConfigData(data []byte) (*Config, error) { if err := yaml.Unmarshal([]byte(data), &cfg); err != nil { return nil, err } + + setDefaultTblNamePostfixLength(&cfg) + return &cfg, nil } @@ -109,3 +113,14 @@ func WriteConfigFile(cfg *Config) error { return nil } + +// set the default tbl_name postfix to 4, tbl_name_xxxx +func setDefaultTblNamePostfixLength(cfg *Config) error { + for i := 0; i < len(cfg.Schema.ShardRule); i++ { + if cfg.Schema.ShardRule[i].TableNamePostfixLength == 0 { + cfg.Schema.ShardRule[i].TableNamePostfixLength = 4 + } + } + + return nil +} diff --git a/proxy/router/router.go b/proxy/router/router.go index 8de9a011..19e55e79 100644 --- a/proxy/router/router.go +++ b/proxy/router/router.go @@ -22,6 +22,7 @@ import ( "github.com/flike/kingshard/core/errors" "github.com/flike/kingshard/core/golog" "github.com/flike/kingshard/sqlparser" + "strconv" ) var ( @@ -46,6 +47,7 @@ type Rule struct { SubTableIndexs []int //SubTableIndexs store all the index of sharding sub-table TableToNode map[int]int //key is table index, and value is node index Shard Shard + TableNamePostfixLength int } type Router struct { @@ -141,6 +143,7 @@ func NewRouter(schemaConfig *config.SchemaConfig) (*Router, error) { rt.Rules[rule.DB] = m rt.Rules[rule.DB][rule.Table] = rule } + rt.Rules[rule.DB][rule.Table].TableNamePostfixLength = shard.TableNamePostfixLength } return rt, nil } @@ -525,6 +528,10 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex node.Distinct, ) + // tbl name format: tbl_name_xxxx + tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength + tblPostfixLengthStr := strconv.Itoa(tblPostfixLength) + var prefix string //rewrite select expr for _, expr := range node.SelectExprs { @@ -532,7 +539,7 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex case *sqlparser.StarExpr: //for shardTable.*,need replace table into shardTable_xxxx. if string(v.TableName) == plan.Rule.Table { - fmt.Fprintf(buf, "%s%s_%04d.*", + fmt.Fprintf(buf, "%s%s_%0" + tblPostfixLengthStr + "d.*", prefix, plan.Rule.Table, tableIndex, @@ -545,7 +552,7 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex //into shardTable_xxxx.column as a if colName, ok := v.Expr.(*sqlparser.ColName); ok { if string(colName.Qualifier) == plan.Rule.Table { - fmt.Fprintf(buf, "%s%s_%04d.%s", + fmt.Fprintf(buf, "%s%s_%0" + tblPostfixLengthStr + "d.%s", prefix, plan.Rule.Table, tableIndex, @@ -577,13 +584,13 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex switch v := (node.From[0]).(type) { case *sqlparser.AliasedTableExpr: if len(v.As) != 0 { - fmt.Fprintf(buf, "%s_%04d as %s", + fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d as %s", sqlparser.String(v.Expr), tableIndex, string(v.As), ) } else { - fmt.Fprintf(buf, "%s_%04d", + fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d", sqlparser.String(v.Expr), tableIndex, ) @@ -591,19 +598,19 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex case *sqlparser.JoinTableExpr: if ate, ok := (v.LeftExpr).(*sqlparser.AliasedTableExpr); ok { if len(ate.As) != 0 { - fmt.Fprintf(buf, "%s_%04d as %s", + fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d as %s", sqlparser.String(ate.Expr), tableIndex, string(ate.As), ) } else { - fmt.Fprintf(buf, "%s_%04d", + fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d", sqlparser.String(ate.Expr), tableIndex, ) } } else { - fmt.Fprintf(buf, "%s_%04d", + fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d", sqlparser.String(v.LeftExpr), tableIndex, ) @@ -613,7 +620,7 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex buf.Fprintf(" on %v", v.On) } default: - fmt.Fprintf(buf, "%s_%04d", + fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d", sqlparser.String(node.From[0]), tableIndex, ) @@ -693,6 +700,9 @@ func (r *Router) generateInsertSql(plan *Plan, stmt sqlparser.Statement) error { nodeName := r.Nodes[0] sqls[nodeName] = []string{buf.String()} } else { + // tbl name format: tbl_name_xxxx + tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength + tblPostfixLengthStr := strconv.Itoa(tblPostfixLength) tableCount := len(plan.RouteTableIndexs) for i := 0; i < tableCount; i++ { buf := sqlparser.NewTrackedBuffer(nil) @@ -701,7 +711,7 @@ func (r *Router) generateInsertSql(plan *Plan, stmt sqlparser.Statement) error { nodeName := r.Nodes[nodeIndex] buf.Fprintf("insert %v%s into %v", node.Comments, node.Ignore, node.Table) - fmt.Fprintf(buf, "_%04d", plan.RouteTableIndexs[i]) + fmt.Fprintf(buf, "_%0" + tblPostfixLengthStr + "d", plan.RouteTableIndexs[i]) buf.Fprintf("%v %v%v", node.Columns, plan.Rows[tableIndex], @@ -733,6 +743,9 @@ func (r *Router) generateUpdateSql(plan *Plan, stmt sqlparser.Statement) error { nodeName := r.Nodes[0] sqls[nodeName] = []string{buf.String()} } else { + // tbl name format: tbl_name_xxxx + tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength + tblPostfixLengthStr := strconv.Itoa(tblPostfixLength) tableCount := len(plan.RouteTableIndexs) for i := 0; i < tableCount; i++ { buf := sqlparser.NewTrackedBuffer(nil) @@ -740,7 +753,7 @@ func (r *Router) generateUpdateSql(plan *Plan, stmt sqlparser.Statement) error { node.Comments, node.Table, ) - fmt.Fprintf(buf, "_%04d", plan.RouteTableIndexs[i]) + fmt.Fprintf(buf, "_%0" + tblPostfixLengthStr + "d", plan.RouteTableIndexs[i]) buf.Fprintf(" set %v%v%v%v", node.Exprs, node.Where, @@ -776,6 +789,9 @@ func (r *Router) generateDeleteSql(plan *Plan, stmt sqlparser.Statement) error { nodeName := r.Nodes[0] sqls[nodeName] = []string{buf.String()} } else { + // tbl name format: tbl_name_xxxx + tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength + tblPostfixLengthStr := strconv.Itoa(tblPostfixLength) tableCount := len(plan.RouteTableIndexs) for i := 0; i < tableCount; i++ { buf := sqlparser.NewTrackedBuffer(nil) @@ -783,7 +799,7 @@ func (r *Router) generateDeleteSql(plan *Plan, stmt sqlparser.Statement) error { node.Comments, node.Table, ) - fmt.Fprintf(buf, "_%04d", plan.RouteTableIndexs[i]) + fmt.Fprintf(buf, "_%0" + tblPostfixLengthStr + "d", plan.RouteTableIndexs[i]) buf.Fprintf("%v%v%v", node.Where, node.OrderBy, @@ -818,6 +834,9 @@ func (r *Router) generateReplaceSql(plan *Plan, stmt sqlparser.Statement) error nodeName := r.Nodes[0] sqls[nodeName] = []string{buf.String()} } else { + // tbl name format: tbl_name_xxxx + tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength + tblPostfixLengthStr := strconv.Itoa(tblPostfixLength) tableCount := len(plan.RouteTableIndexs) for i := 0; i < tableCount; i++ { tableIndex := plan.RouteTableIndexs[i] @@ -829,7 +848,7 @@ func (r *Router) generateReplaceSql(plan *Plan, stmt sqlparser.Statement) error node.Comments, node.Table, ) - fmt.Fprintf(buf, "_%04d", plan.RouteTableIndexs[i]) + fmt.Fprintf(buf, "_%0" + tblPostfixLengthStr + "d", plan.RouteTableIndexs[i]) buf.Fprintf("%v %v", node.Columns, plan.Rows[tableIndex], @@ -861,6 +880,9 @@ func (r *Router) generateTruncateSql(plan *Plan, stmt sqlparser.Statement) error nodeName := r.Nodes[0] sqls[nodeName] = []string{buf.String()} } else { + // tbl name format: tbl_name_xxxx + tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength + tblPostfixLengthStr := strconv.Itoa(tblPostfixLength) tableCount := len(plan.RouteTableIndexs) for i := 0; i < tableCount; i++ { buf := sqlparser.NewTrackedBuffer(nil) @@ -869,7 +891,7 @@ func (r *Router) generateTruncateSql(plan *Plan, stmt sqlparser.Statement) error node.TableOpt, node.Table, ) - fmt.Fprintf(buf, "_%04d", plan.RouteTableIndexs[i]) + fmt.Fprintf(buf, "_%0" + tblPostfixLengthStr + "d", plan.RouteTableIndexs[i]) tableIndex := plan.RouteTableIndexs[i] nodeIndex := plan.Rule.TableToNode[tableIndex] nodeName := r.Nodes[nodeIndex] diff --git a/proxy/server/conn_preshard.go b/proxy/server/conn_preshard.go index 5877e073..646ab9ba 100644 --- a/proxy/server/conn_preshard.go +++ b/proxy/server/conn_preshard.go @@ -25,6 +25,7 @@ import ( "github.com/flike/kingshard/mysql" "github.com/flike/kingshard/proxy/router" "github.com/flike/kingshard/sqlparser" + "strconv" ) type ExecuteDB struct { @@ -462,13 +463,16 @@ func (c *ClientConn) handleShowColumns(sql string, tokens []string, } showRouter := c.schema.rule showRule := showRouter.GetRule(ruleDB, tableName) + // tbl name format: tbl_name_xxxx + tblPostfixLength := showRule.TableNamePostfixLength + tblPostfixLengthStr := strconv.Itoa(tblPostfixLength) //this SHOW is sharding SQL if showRule.Type != router.DefaultRuleType { if 0 < len(showRule.SubTableIndexs) { tableIndex := showRule.SubTableIndexs[0] nodeIndex := showRule.TableToNode[tableIndex] nodeName := showRule.Nodes[nodeIndex] - tokens[i+2] = fmt.Sprintf("%s_%04d", tableName, tableIndex) + tokens[i+2] = fmt.Sprintf("%s_%0" + tblPostfixLengthStr + "d", tableName, tableIndex) executeDB.sql = strings.Join(tokens, " ") executeDB.ExecNode = c.schema.nodes[nodeName] return nil