From 465401a051314673a5fd1513fed4807922f02807 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Tue, 20 Aug 2024 11:28:04 -0500 Subject: [PATCH 1/2] Lock DB Upgrades --- harmony/harmonydb/harmonydb.go | 126 ++++++++++++++++++--------------- 1 file changed, 67 insertions(+), 59 deletions(-) diff --git a/harmony/harmonydb/harmonydb.go b/harmony/harmonydb/harmonydb.go index db96eef8b..de398f413 100644 --- a/harmony/harmonydb/harmonydb.go +++ b/harmony/harmonydb/harmonydb.go @@ -237,80 +237,88 @@ var fs embed.FS func (db *DB) upgrade() error { // Does the version table exist? if not, make it. // NOTE: This cannot change except via the next sql file. - _, err := db.Exec(context.Background(), `CREATE TABLE IF NOT EXISTS base ( + _, err := db.BeginTransaction(context.Background(), func(tx *Tx) (commit bool, err error) { + _, err = tx.Exec(`CREATE TABLE IF NOT EXISTS base ( id SERIAL PRIMARY KEY, entry CHAR(12), applied TIMESTAMP DEFAULT current_timestamp - )`) - if err != nil { - logger.Error("Upgrade failed.") - return xerrors.Errorf("Cannot create base table %w", err) - } - - // __Run scripts in order.__ - - landed := map[string]bool{} - { - var landedEntries []struct{ Entry string } - err = db.Select(context.Background(), &landedEntries, "SELECT entry FROM base") + )`) if err != nil { - logger.Error("Cannot read entries: " + err.Error()) - return xerrors.Errorf("cannot read entries: %w", err) - } - for _, l := range landedEntries { - landed[l.Entry[:8]] = true - } - } - dir, err := fs.ReadDir("sql") - if err != nil { - logger.Error("Cannot read fs entries: " + err.Error()) - return err - } - sort.Slice(dir, func(i, j int) bool { return dir[i].Name() < dir[j].Name() }) - - if len(dir) == 0 { - logger.Error("No sql files found.") - } - for _, e := range dir { - name := e.Name() - if !strings.HasSuffix(name, ".sql") { - logger.Debug("Must have only SQL files here, found: " + name) - continue - } - if landed[name[:8]] { - logger.Debug("DB Schema " + name + " already applied.") - continue + logger.Error("Upgrade failed.") + return false, xerrors.Errorf("Cannot create base table %w", err) } - file, err := fs.ReadFile("sql/" + name) + _, err = tx.Exec(`LOCK TABLE base`) if err != nil { - logger.Error("weird embed file read err") - return err + logger.Error("Upgrade failed: could not lock.") + return false, xerrors.Errorf("Cannot create base table %w", err) } - logger.Infow("Upgrading", "file", name, "size", len(file)) + // __Run scripts in order.__ - megaSql := "" - for _, s := range parseSQLStatements(string(file)) { // Implement the changes. - if len(strings.TrimSpace(s)) == 0 { - continue + landed := map[string]bool{} + { + var landedEntries []struct{ Entry string } + err = db.Select(context.Background(), &landedEntries, "SELECT entry FROM base") + if err != nil { + logger.Error("Cannot read entries: " + err.Error()) + return false, xerrors.Errorf("cannot read entries: %w", err) + } + for _, l := range landedEntries { + landed[l.Entry[:8]] = true } - megaSql += s + ";" } - _, err = db.pgx.Exec(context.Background(), megaSql) + dir, err := fs.ReadDir("sql") if err != nil { - msg := fmt.Sprintf("Could not upgrade! %s", err.Error()) - logger.Error(msg) - return xerrors.New(msg) // makes devs lives easier by placing message at the end. + logger.Error("Cannot read fs entries: " + err.Error()) + return false, err } + sort.Slice(dir, func(i, j int) bool { return dir[i].Name() < dir[j].Name() }) - // Mark Completed. - _, err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name[:8]) - if err != nil { - logger.Error("Cannot update base: " + err.Error()) - return xerrors.Errorf("cannot insert into base: %w", err) + if len(dir) == 0 { + logger.Error("No sql files found.") } - } - return nil + for _, e := range dir { + name := e.Name() + if !strings.HasSuffix(name, ".sql") { + logger.Debug("Must have only SQL files here, found: " + name) + continue + } + if landed[name[:8]] { + logger.Debug("DB Schema " + name + " already applied.") + continue + } + file, err := fs.ReadFile("sql/" + name) + if err != nil { + logger.Error("weird embed file read err") + return false, err + } + + logger.Infow("Upgrading", "file", name, "size", len(file)) + + megaSql := "" + for _, s := range parseSQLStatements(string(file)) { // Implement the changes. + if len(strings.TrimSpace(s)) == 0 { + continue + } + megaSql += s + ";" + } + _, err = db.pgx.Exec(context.Background(), megaSql) + if err != nil { + msg := fmt.Sprintf("Could not upgrade! %s", err.Error()) + logger.Error(msg) + return false, xerrors.New(msg) // makes devs lives easier by placing message at the end. + } + + // Mark Completed. + _, err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name[:8]) + if err != nil { + logger.Error("Cannot update base: " + err.Error()) + return false, xerrors.Errorf("cannot insert into base: %w", err) + } + } + return true, nil + }, OptionRetry()) + return err } func parseSQLStatements(sqlContent string) []string { From 670fbe8dc3c8f1da61a1b36dcdb7a97b57b9bf63 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Tue, 20 Aug 2024 11:54:51 -0500 Subject: [PATCH 2/2] avoid unlockables --- harmony/harmonydb/harmonydb.go | 144 ++++++++++++++++++--------------- 1 file changed, 79 insertions(+), 65 deletions(-) diff --git a/harmony/harmonydb/harmonydb.go b/harmony/harmonydb/harmonydb.go index de398f413..8dff2ed41 100644 --- a/harmony/harmonydb/harmonydb.go +++ b/harmony/harmonydb/harmonydb.go @@ -234,90 +234,104 @@ func ensureSchemaExists(connString, schema string) error { //go:embed sql var fs embed.FS -func (db *DB) upgrade() error { +func (db *DB) upgrade() (err error) { // Does the version table exist? if not, make it. // NOTE: This cannot change except via the next sql file. - _, err := db.BeginTransaction(context.Background(), func(tx *Tx) (commit bool, err error) { - _, err = tx.Exec(`CREATE TABLE IF NOT EXISTS base ( + _, err = db.Exec(context.Background(), `CREATE TABLE IF NOT EXISTS base ( id SERIAL PRIMARY KEY, entry CHAR(12), applied TIMESTAMP DEFAULT current_timestamp )`) + if err != nil { + logger.Error("Upgrade failed.") + return xerrors.Errorf("Cannot create base table %w", err) + } + done := make(chan struct{}) + txErr := make(chan error) + go func() { + _, err = db.BeginTransaction(context.Background(), func(tx *Tx) (commit bool, err error) { + _, err = tx.Exec(`LOCK TABLE base`) + if err != nil { + logger.Error("Upgrade failed: could not lock.") + return false, xerrors.Errorf("Cannot create base table %w", err) + } + <-done + return true, nil + }) + txErr <- err + }() + defer func() { + close(done) + err2 := <-txErr + if err == nil { + err = err2 + } + }() + + // __Run scripts in order.__ + + landed := map[string]bool{} + { + var landedEntries []struct{ Entry string } + err = db.Select(context.Background(), &landedEntries, "SELECT entry FROM base") if err != nil { - logger.Error("Upgrade failed.") - return false, xerrors.Errorf("Cannot create base table %w", err) + logger.Error("Cannot read entries: " + err.Error()) + return xerrors.Errorf("cannot read entries: %w", err) + } + for _, l := range landedEntries { + landed[l.Entry[:8]] = true } - _, err = tx.Exec(`LOCK TABLE base`) + } + dir, err := fs.ReadDir("sql") + if err != nil { + logger.Error("Cannot read fs entries: " + err.Error()) + return err + } + sort.Slice(dir, func(i, j int) bool { return dir[i].Name() < dir[j].Name() }) + + if len(dir) == 0 { + logger.Error("No sql files found.") + } + for _, e := range dir { + name := e.Name() + if !strings.HasSuffix(name, ".sql") { + logger.Debug("Must have only SQL files here, found: " + name) + continue + } + if landed[name[:8]] { + logger.Debug("DB Schema " + name + " already applied.") + continue + } + file, err := fs.ReadFile("sql/" + name) if err != nil { - logger.Error("Upgrade failed: could not lock.") - return false, xerrors.Errorf("Cannot create base table %w", err) + logger.Error("weird embed file read err") + return err } - // __Run scripts in order.__ + logger.Infow("Upgrading", "file", name, "size", len(file)) - landed := map[string]bool{} - { - var landedEntries []struct{ Entry string } - err = db.Select(context.Background(), &landedEntries, "SELECT entry FROM base") - if err != nil { - logger.Error("Cannot read entries: " + err.Error()) - return false, xerrors.Errorf("cannot read entries: %w", err) - } - for _, l := range landedEntries { - landed[l.Entry[:8]] = true + megaSql := "" + for _, s := range parseSQLStatements(string(file)) { // Implement the changes. + if len(strings.TrimSpace(s)) == 0 { + continue } + megaSql += s + ";" } - dir, err := fs.ReadDir("sql") + _, err = db.pgx.Exec(context.Background(), megaSql) if err != nil { - logger.Error("Cannot read fs entries: " + err.Error()) - return false, err + msg := fmt.Sprintf("Could not upgrade! %s", err.Error()) + logger.Error(msg) + return xerrors.New(msg) // makes devs lives easier by placing message at the end. } - sort.Slice(dir, func(i, j int) bool { return dir[i].Name() < dir[j].Name() }) - if len(dir) == 0 { - logger.Error("No sql files found.") + // Mark Completed. + _, err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name[:8]) + if err != nil { + logger.Error("Cannot update base: " + err.Error()) + return xerrors.Errorf("cannot insert into base: %w", err) } - for _, e := range dir { - name := e.Name() - if !strings.HasSuffix(name, ".sql") { - logger.Debug("Must have only SQL files here, found: " + name) - continue - } - if landed[name[:8]] { - logger.Debug("DB Schema " + name + " already applied.") - continue - } - file, err := fs.ReadFile("sql/" + name) - if err != nil { - logger.Error("weird embed file read err") - return false, err - } - - logger.Infow("Upgrading", "file", name, "size", len(file)) - - megaSql := "" - for _, s := range parseSQLStatements(string(file)) { // Implement the changes. - if len(strings.TrimSpace(s)) == 0 { - continue - } - megaSql += s + ";" - } - _, err = db.pgx.Exec(context.Background(), megaSql) - if err != nil { - msg := fmt.Sprintf("Could not upgrade! %s", err.Error()) - logger.Error(msg) - return false, xerrors.New(msg) // makes devs lives easier by placing message at the end. - } + } - // Mark Completed. - _, err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name[:8]) - if err != nil { - logger.Error("Cannot update base: " + err.Error()) - return false, xerrors.Errorf("cannot insert into base: %w", err) - } - } - return true, nil - }, OptionRetry()) return err }