Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archer.li/feature/issue 365 #366

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,17 @@ type ShardConfig struct {
Type string `yaml:"type"`
TableRowLimit int `yaml:"table_row_limit"`
DateRange []string `yaml:"date_range"`
TableNamePostfixLength int `yaml:"table_name_postfix_length"`
}

func ParseConfigData(data []byte) (*Config, error) {
var cfg Config
if err := yaml.Unmarshal([]byte(data), &cfg); err != nil {
return nil, err
}

setDefaultTblNamePostfixLength(&cfg)

return &cfg, nil
}

Expand Down Expand Up @@ -109,3 +113,14 @@ func WriteConfigFile(cfg *Config) error {

return nil
}

// set the default tbl_name postfix to 4, tbl_name_xxxx
func setDefaultTblNamePostfixLength(cfg *Config) error {
for i := 0; i < len(cfg.Schema.ShardRule); i++ {
if cfg.Schema.ShardRule[i].TableNamePostfixLength == 0 {
cfg.Schema.ShardRule[i].TableNamePostfixLength = 4
}
}

return nil
}
48 changes: 35 additions & 13 deletions proxy/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/flike/kingshard/core/errors"
"github.com/flike/kingshard/core/golog"
"github.com/flike/kingshard/sqlparser"
"strconv"
)

var (
Expand All @@ -46,6 +47,7 @@ type Rule struct {
SubTableIndexs []int //SubTableIndexs store all the index of sharding sub-table
TableToNode map[int]int //key is table index, and value is node index
Shard Shard
TableNamePostfixLength int
}

type Router struct {
Expand Down Expand Up @@ -141,6 +143,7 @@ func NewRouter(schemaConfig *config.SchemaConfig) (*Router, error) {
rt.Rules[rule.DB] = m
rt.Rules[rule.DB][rule.Table] = rule
}
rt.Rules[rule.DB][rule.Table].TableNamePostfixLength = shard.TableNamePostfixLength
}
return rt, nil
}
Expand Down Expand Up @@ -525,14 +528,18 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex
node.Distinct,
)

// tbl name format: tbl_name_xxxx
tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength
tblPostfixLengthStr := strconv.Itoa(tblPostfixLength)

var prefix string
//rewrite select expr
for _, expr := range node.SelectExprs {
switch v := expr.(type) {
case *sqlparser.StarExpr:
//for shardTable.*,need replace table into shardTable_xxxx.
if string(v.TableName) == plan.Rule.Table {
fmt.Fprintf(buf, "%s%s_%04d.*",
fmt.Fprintf(buf, "%s%s_%0" + tblPostfixLengthStr + "d.*",
prefix,
plan.Rule.Table,
tableIndex,
Expand All @@ -545,7 +552,7 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex
//into shardTable_xxxx.column as a
if colName, ok := v.Expr.(*sqlparser.ColName); ok {
if string(colName.Qualifier) == plan.Rule.Table {
fmt.Fprintf(buf, "%s%s_%04d.%s",
fmt.Fprintf(buf, "%s%s_%0" + tblPostfixLengthStr + "d.%s",
prefix,
plan.Rule.Table,
tableIndex,
Expand Down Expand Up @@ -577,33 +584,33 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex
switch v := (node.From[0]).(type) {
case *sqlparser.AliasedTableExpr:
if len(v.As) != 0 {
fmt.Fprintf(buf, "%s_%04d as %s",
fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d as %s",
sqlparser.String(v.Expr),
tableIndex,
string(v.As),
)
} else {
fmt.Fprintf(buf, "%s_%04d",
fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d",
sqlparser.String(v.Expr),
tableIndex,
)
}
case *sqlparser.JoinTableExpr:
if ate, ok := (v.LeftExpr).(*sqlparser.AliasedTableExpr); ok {
if len(ate.As) != 0 {
fmt.Fprintf(buf, "%s_%04d as %s",
fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d as %s",
sqlparser.String(ate.Expr),
tableIndex,
string(ate.As),
)
} else {
fmt.Fprintf(buf, "%s_%04d",
fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d",
sqlparser.String(ate.Expr),
tableIndex,
)
}
} else {
fmt.Fprintf(buf, "%s_%04d",
fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d",
sqlparser.String(v.LeftExpr),
tableIndex,
)
Expand All @@ -613,7 +620,7 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex
buf.Fprintf(" on %v", v.On)
}
default:
fmt.Fprintf(buf, "%s_%04d",
fmt.Fprintf(buf, "%s_%0" + tblPostfixLengthStr + "d",
sqlparser.String(node.From[0]),
tableIndex,
)
Expand Down Expand Up @@ -693,6 +700,9 @@ func (r *Router) generateInsertSql(plan *Plan, stmt sqlparser.Statement) error {
nodeName := r.Nodes[0]
sqls[nodeName] = []string{buf.String()}
} else {
// tbl name format: tbl_name_xxxx
tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength
tblPostfixLengthStr := strconv.Itoa(tblPostfixLength)
tableCount := len(plan.RouteTableIndexs)
for i := 0; i < tableCount; i++ {
buf := sqlparser.NewTrackedBuffer(nil)
Expand All @@ -701,7 +711,7 @@ func (r *Router) generateInsertSql(plan *Plan, stmt sqlparser.Statement) error {
nodeName := r.Nodes[nodeIndex]

buf.Fprintf("insert %v%s into %v", node.Comments, node.Ignore, node.Table)
fmt.Fprintf(buf, "_%04d", plan.RouteTableIndexs[i])
fmt.Fprintf(buf, "_%0" + tblPostfixLengthStr + "d", plan.RouteTableIndexs[i])
buf.Fprintf("%v %v%v",
node.Columns,
plan.Rows[tableIndex],
Expand Down Expand Up @@ -733,14 +743,17 @@ func (r *Router) generateUpdateSql(plan *Plan, stmt sqlparser.Statement) error {
nodeName := r.Nodes[0]
sqls[nodeName] = []string{buf.String()}
} else {
// tbl name format: tbl_name_xxxx
tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength
tblPostfixLengthStr := strconv.Itoa(tblPostfixLength)
tableCount := len(plan.RouteTableIndexs)
for i := 0; i < tableCount; i++ {
buf := sqlparser.NewTrackedBuffer(nil)
buf.Fprintf("update %v%v",
node.Comments,
node.Table,
)
fmt.Fprintf(buf, "_%04d", plan.RouteTableIndexs[i])
fmt.Fprintf(buf, "_%0" + tblPostfixLengthStr + "d", plan.RouteTableIndexs[i])
buf.Fprintf(" set %v%v%v%v",
node.Exprs,
node.Where,
Expand Down Expand Up @@ -776,14 +789,17 @@ func (r *Router) generateDeleteSql(plan *Plan, stmt sqlparser.Statement) error {
nodeName := r.Nodes[0]
sqls[nodeName] = []string{buf.String()}
} else {
// tbl name format: tbl_name_xxxx
tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength
tblPostfixLengthStr := strconv.Itoa(tblPostfixLength)
tableCount := len(plan.RouteTableIndexs)
for i := 0; i < tableCount; i++ {
buf := sqlparser.NewTrackedBuffer(nil)
buf.Fprintf("delete %vfrom %v",
node.Comments,
node.Table,
)
fmt.Fprintf(buf, "_%04d", plan.RouteTableIndexs[i])
fmt.Fprintf(buf, "_%0" + tblPostfixLengthStr + "d", plan.RouteTableIndexs[i])
buf.Fprintf("%v%v%v",
node.Where,
node.OrderBy,
Expand Down Expand Up @@ -818,6 +834,9 @@ func (r *Router) generateReplaceSql(plan *Plan, stmt sqlparser.Statement) error
nodeName := r.Nodes[0]
sqls[nodeName] = []string{buf.String()}
} else {
// tbl name format: tbl_name_xxxx
tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength
tblPostfixLengthStr := strconv.Itoa(tblPostfixLength)
tableCount := len(plan.RouteTableIndexs)
for i := 0; i < tableCount; i++ {
tableIndex := plan.RouteTableIndexs[i]
Expand All @@ -829,7 +848,7 @@ func (r *Router) generateReplaceSql(plan *Plan, stmt sqlparser.Statement) error
node.Comments,
node.Table,
)
fmt.Fprintf(buf, "_%04d", plan.RouteTableIndexs[i])
fmt.Fprintf(buf, "_%0" + tblPostfixLengthStr + "d", plan.RouteTableIndexs[i])
buf.Fprintf("%v %v",
node.Columns,
plan.Rows[tableIndex],
Expand Down Expand Up @@ -861,6 +880,9 @@ func (r *Router) generateTruncateSql(plan *Plan, stmt sqlparser.Statement) error
nodeName := r.Nodes[0]
sqls[nodeName] = []string{buf.String()}
} else {
// tbl name format: tbl_name_xxxx
tblPostfixLength := r.GetRule(plan.Rule.DB, plan.Rule.Table).TableNamePostfixLength
tblPostfixLengthStr := strconv.Itoa(tblPostfixLength)
tableCount := len(plan.RouteTableIndexs)
for i := 0; i < tableCount; i++ {
buf := sqlparser.NewTrackedBuffer(nil)
Expand All @@ -869,7 +891,7 @@ func (r *Router) generateTruncateSql(plan *Plan, stmt sqlparser.Statement) error
node.TableOpt,
node.Table,
)
fmt.Fprintf(buf, "_%04d", plan.RouteTableIndexs[i])
fmt.Fprintf(buf, "_%0" + tblPostfixLengthStr + "d", plan.RouteTableIndexs[i])
tableIndex := plan.RouteTableIndexs[i]
nodeIndex := plan.Rule.TableToNode[tableIndex]
nodeName := r.Nodes[nodeIndex]
Expand Down
6 changes: 5 additions & 1 deletion proxy/server/conn_preshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/flike/kingshard/mysql"
"github.com/flike/kingshard/proxy/router"
"github.com/flike/kingshard/sqlparser"
"strconv"
)

type ExecuteDB struct {
Expand Down Expand Up @@ -462,13 +463,16 @@ func (c *ClientConn) handleShowColumns(sql string, tokens []string,
}
showRouter := c.schema.rule
showRule := showRouter.GetRule(ruleDB, tableName)
// tbl name format: tbl_name_xxxx
tblPostfixLength := showRule.TableNamePostfixLength
tblPostfixLengthStr := strconv.Itoa(tblPostfixLength)
//this SHOW is sharding SQL
if showRule.Type != router.DefaultRuleType {
if 0 < len(showRule.SubTableIndexs) {
tableIndex := showRule.SubTableIndexs[0]
nodeIndex := showRule.TableToNode[tableIndex]
nodeName := showRule.Nodes[nodeIndex]
tokens[i+2] = fmt.Sprintf("%s_%04d", tableName, tableIndex)
tokens[i+2] = fmt.Sprintf("%s_%0" + tblPostfixLengthStr + "d", tableName, tableIndex)
executeDB.sql = strings.Join(tokens, " ")
executeDB.ExecNode = c.schema.nodes[nodeName]
return nil
Expand Down