forked from adlio/schema
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mysql.go
148 lines (127 loc) · 4.23 KB
/
mysql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package schema
import (
"context"
"fmt"
"hash/crc32"
"strings"
"time"
)
const mysqlLockSalt uint32 = 271192482
// MySQL is the dialect which should be used for MySQL/MariaDB databases
var MySQL = mysqlDialect{}
type mysqlDialect struct{}
// Lock implements the Locker interface to obtain a global lock before the
// migrations are run.
func (m mysqlDialect) Lock(ctx context.Context, tx Queryer, tableName string) error {
lockID := m.advisoryLockID(tableName)
query := fmt.Sprintf(`SELECT GET_LOCK('%s', 10)`, lockID)
_, err := tx.ExecContext(ctx, query)
return err
}
// Unlock implements the Locker interface to release the global lock after the
// migrations are run.
func (m mysqlDialect) Unlock(ctx context.Context, tx Queryer, tableName string) error {
lockID := m.advisoryLockID(tableName)
query := fmt.Sprintf(`SELECT RELEASE_LOCK('%s')`, lockID)
_, err := tx.ExecContext(ctx, query)
return err
}
// CreateMigrationsTable implements the Dialect interface to create the
// table which tracks applied migrations. It only creates the table if it
// does not already exist
func (m mysqlDialect) CreateMigrationsTable(ctx context.Context, tx Queryer, tableName string) error {
query := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id VARCHAR(255) NOT NULL,
checksum VARCHAR(32) NOT NULL DEFAULT '',
execution_time_in_millis INTEGER NOT NULL DEFAULT 0,
applied_at TIMESTAMP NOT NULL
)`, tableName)
_, err := tx.ExecContext(ctx, query)
return err
}
// InsertAppliedMigration implements the Dialect interface to insert a record
// into the migrations tracking table *after* a migration has successfully
// run.
func (m mysqlDialect) InsertAppliedMigration(ctx context.Context, tx Queryer, tableName string, am *AppliedMigration) error {
query := fmt.Sprintf(`
INSERT INTO %s
( id, checksum, execution_time_in_millis, applied_at )
VALUES
( ?, ?, ?, ? )
`, tableName,
)
_, err := tx.ExecContext(ctx, query, am.ID, am.MD5(), am.ExecutionTimeInMillis, am.AppliedAt)
return err
}
// GetAppliedMigrations retrieves all data from the migrations tracking table
func (m mysqlDialect) GetAppliedMigrations(ctx context.Context, tx Queryer, tableName string) (migrations []*AppliedMigration, err error) {
migrations = make([]*AppliedMigration, 0)
query := fmt.Sprintf(`
SELECT id, checksum, execution_time_in_millis, applied_at
FROM %s
ORDER BY id ASC`, tableName)
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return migrations, err
}
defer rows.Close()
for rows.Next() {
migration := AppliedMigration{}
var appliedAt mysqlTime
err = rows.Scan(&migration.ID, &migration.Checksum, &migration.ExecutionTimeInMillis, &appliedAt)
if err != nil {
err = fmt.Errorf("Failed to GetAppliedMigrations. Did somebody change the structure of the %s table?: %w", tableName, err)
return migrations, err
}
migration.AppliedAt = appliedAt.Value
migrations = append(migrations, &migration)
}
return migrations, err
}
// QuotedTableName returns the string value of the name of the migration
// tracking table after it has been quoted for MySQL
func (m mysqlDialect) QuotedTableName(schemaName, tableName string) string {
if schemaName == "" {
return m.quotedIdent(tableName)
}
return m.quotedIdent(schemaName) + "." + m.quotedIdent(tableName)
}
// quotedIdent wraps the supplied string in the MySQL identifier
// quote character
func (m mysqlDialect) quotedIdent(ident string) string {
if ident == "" {
return ""
}
return "`" + strings.ReplaceAll(ident, "`", "``") + "`"
}
// advisoryLockID generates a table-specific lock name to use
func (m mysqlDialect) advisoryLockID(tableName string) string {
sum := crc32.ChecksumIEEE([]byte(tableName))
sum = sum * mysqlLockSalt
return fmt.Sprint(sum)
}
type mysqlTime struct {
Value time.Time
}
func (t *mysqlTime) Scan(src interface{}) (err error) {
if src == nil {
t.Value = time.Time{}
}
if srcTime, isTime := src.(time.Time); isTime {
t.Value = srcTime.In(time.Local)
return nil
}
return t.ScanString(fmt.Sprintf("%s", src))
}
func (t *mysqlTime) ScanString(src string) (err error) {
switch len(src) {
case 19:
t.Value, err = time.ParseInLocation("2006-01-02 15:04:05", src, time.UTC)
if err != nil {
return err
}
}
t.Value = t.Value.In(time.Local)
return nil
}