Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dselans/concurrency support #58

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ install:
- go install ./...

script:
- go test -v ./...
- go test -v -enable-mysql ./...
- bash test-integration/postgres.sh
- bash test-integration/mysql.sh
- bash test-integration/mysql-flag.sh
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Using [modl](https://github.com/jmoiron/modl)? Check out [modl-migrate](https://
* Atomic migrations
* Up/down migrations to allow rollback
* Supports multiple database types in one project
* Support for concurrent migrations (by utilizing a DB-based mutex)

## Installation

Expand Down
182 changes: 182 additions & 0 deletions migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ var tableName = "gorp_migrations"
var schemaName = ""
var numberPrefixRegex = regexp.MustCompile(`^(\d+).*$`)

// Lock related bits
var (
DefaultLockWaitTime = time.Duration(1 * time.Minute)

lockTableName = "gorp_lock"
lockName = "sql-migrate"
lockWatchInterval = time.Duration(1 * time.Second)
lockMaxStaleAge = time.Duration(1 * time.Minute)
)

// TxError is returned when any error is encountered during a database
// transaction. It contains the relevant *Migration and notes it's Id in the
// Error function output.
Expand Down Expand Up @@ -121,6 +131,11 @@ type MigrationRecord struct {
AppliedAt time.Time `db:"applied_at"`
}

type LockRecord struct {
Lock string `db:"lock"`
AcquiredAt time.Time `db:"acquired_at"`
}

var MigrationDialects = map[string]gorp.Dialect{
"sqlite3": gorp.SqliteDialect{},
"postgres": gorp.PostgresDialect{},
Expand Down Expand Up @@ -262,13 +277,177 @@ type SqlExecutor interface {
Delete(list ...interface{}) (int64, error)
}

// Wrapper for ExecMaxWithLock(); same behavior except max migrations is set to 0 (no limit)
func ExecWithLock(db *sql.DB, dialect string, m MigrationSource, dir MigrationDirection, waitTime time.Duration) (int, error) {
return ExecMaxWithLock(db, dialect, m, dir, 0, waitTime)
}

// Perform a migration while utilizing a simple db-based mutex.
//
// This functionality is useful if you are running more than 1 instance of your
// app that performs in-app migrations. This will make sure the migrations do
// not collide with eachother.
//
// When using this functionality, a single `sql-migrate` instance will be designated
// as the 'master migrator'; other instances will stay in 'waitState' and will
// wait until the lock is either:
//
// * Released (lock record is removed from the `gorp_lock` table)
// * At which point, the 'waitState' migrators will exit cleanly (and not
// perform any migrations)
//
// OR
//
// * The 'waitTime' is exceeded, in which case, `sql-migrate` instances in `waitState`
// will return an error saying that they've exceeded the wait time.
//
// Finally, if for some reason your app crashes/gets killed before the lock was
// able to get cleaned up - the stale lock will be cleaned up on next start up.
//
// Note: If you are running into the latter case, considering bumping up the `waitTime`.
func ExecMaxWithLock(db *sql.DB, dialect string, m MigrationSource, dir MigrationDirection, max int, waitTime time.Duration) (int, error) {
if dialect == "sqlite3" {
return 0, errors.New("ExecWithLock does not support sqlite3 dialect")
}

dbMap, err := getMigrationDbMap(db, dialect)
if err != nil {
return 0, fmt.Errorf("Unable to instantiate dbmap: %v", err)
}

mlock, err := newMigrationLock(dbMap, waitTime)
// Skip ExecMax if we encountered an error during newMigrationLock()
if err != nil {
return 0, err
}

// We are the master migrator so we must clean up our lock
if !mlock.waitState {
defer mlock.end()
}

return ExecMax(db, dialect, m, dir, max)
}

// Execute a set of migrations
//
// Returns the number of applied migrations.
func Exec(db *sql.DB, dialect string, m MigrationSource, dir MigrationDirection) (int, error) {
return ExecMax(db, dialect, m, dir, 0)
}

type migrationLock struct {
id int
dbMap *gorp.DbMap
waitState bool
waitTime time.Duration
}

// * check for (and delete) outdated lock
// * insert lock in db
// * if insert fails, means an existing lock is in place;
// * set 'wait' to 'true'
// * periodically check for lock existance ("master migrator" should remove lock when done)
// * stop waiting if lock doesn't disappear
// * if insert succeeds, means we are the "master migrator"
// * return from beginLock() -> let ExecMax do its thing;
// * once ExecMax finishes, clean up our lock
func newMigrationLock(dbMap *gorp.DbMap, waitTime time.Duration) (*migrationLock, error) {
mlock := &migrationLock{
id: time.Now().Nanosecond(),
dbMap: dbMap,
waitState: false,
waitTime: waitTime,
}

// Remove potentially stale lock
if err := mlock.removeStaleLock(); err != nil {
return nil, err
}

insertErr := mlock.dbMap.Insert(&LockRecord{
Lock: lockName,
AcquiredAt: time.Now(),
})

if insertErr != nil {
// Insert failed, we are in 'wait' state; begin watching existing lock
mlock.waitState = true

if err := mlock.beginWatch(); err != nil {
// We have exceeded 'waitTime', bail out
return nil, err
}

// Lock was released; nothing to do
return mlock, nil
}

// lock insertion succeeded, good to go
return mlock, nil
}

// Remove a (potentially) stale lock
//
// Delete lock record if the lock is older than "now() - lockMaxStaleAge".
func (m *migrationLock) removeStaleLock() error {
maxDate := time.Now().Add(-lockMaxStaleAge)

_, err := m.dbMap.Exec("DELETE FROM gorp_lock WHERE acquired_at <= ?", maxDate)
if err != nil {
return fmt.Errorf("Unable to remove stale lock: %v", err)
}

return nil
}

// Periodically check for the existence of a 'lock' record
//
// If the lock record disappears before the 'waitTime' is up, return no error.
// If 'waitTime' is exceeded, return a 'wait time exceeded' error.
func (m *migrationLock) beginWatch() error {
ticker := time.NewTicker(lockWatchInterval)
defer ticker.Stop()

beginTime := time.Now()

for {
<-ticker.C

// Time waiting for lock clearance has elapsed
if time.Since(beginTime) > m.waitTime {
return fmt.Errorf("Exceeded lock clearance wait time (%v)", time.Since(beginTime))
}

var lockRecord LockRecord

err := m.dbMap.SelectOne(&lockRecord, fmt.Sprintf("SELECT * FROM %v", lockTableName))
if err != nil {
if err == sql.ErrNoRows {
break
}

return err
}
}

return nil
}

// Remove lock record (if we are the 'master migrator')
func (m *migrationLock) end() {
// Nothing to do if we were in 'waitState'
if m.waitState {
return
}

// perform lock clean up
_, err := m.dbMap.Delete(&LockRecord{Lock: lockName})
if err != nil {
fmt.Printf("Ran into an error during lock cleanup: %v\n", err)
}
}

// Execute a set of migrations
//
// Will apply at most `max` migrations. Pass 0 for no limit (or use Exec).
Expand Down Expand Up @@ -498,6 +677,9 @@ Check https://github.com/go-sql-driver/mysql#parsetime for more info.`)
dbMap.AddTableWithNameAndSchema(MigrationRecord{}, schemaName, tableName).SetKeys(false, "Id")
//dbMap.TraceOn("", log.New(os.Stdout, "migrate: ", log.Lmicroseconds))

// Create lock table
dbMap.AddTableWithNameAndSchema(LockRecord{}, schemaName, lockTableName).SetKeys(false, "Lock").ColMap("Lock").SetUnique(true)

err := dbMap.CreateTablesIfNotExists()
if err != nil {
return nil, err
Expand Down
Loading