From 17ed339c2acaf3b0a20bea159fff6c1279ea031c Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Thu, 20 Jun 2024 20:20:14 -0500 Subject: [PATCH] Collation checkin --- bin/refresh-collation | 18 ++++++ internal/flypg/admin/admin.go | 109 ++++++++++++++++++++++++++++++++++ internal/flypg/node.go | 54 +++++++++++++---- internal/utils/shell.go | 15 ++++- 4 files changed, 185 insertions(+), 11 deletions(-) create mode 100755 bin/refresh-collation diff --git a/bin/refresh-collation b/bin/refresh-collation new file mode 100755 index 00000000..d797f8b1 --- /dev/null +++ b/bin/refresh-collation @@ -0,0 +1,18 @@ +#!/bin/bash + +psql postgres://postgres:$OPERATOR_PASSWORD@$FLY_APP_NAME.internal:5432/postgres <<'EOF' +DO $$ +DECLARE + r RECORD; +BEGIN + FOR r IN (SELECT datname FROM pg_database WHERE datallowconn = true) + LOOP + BEGIN + EXECUTE 'ALTER DATABASE ' || quote_ident(r.datname) || ' REFRESH COLLATION VERSION;'; + EXCEPTION + WHEN OTHERS THEN + RAISE NOTICE 'Failed to refresh collation for database: % - %', r.datname, SQLERRM; + END; + END LOOP; +END $$; +EOF diff --git a/internal/flypg/admin/admin.go b/internal/flypg/admin/admin.go index 2f1925b1..1d3b4a86 100644 --- a/internal/flypg/admin/admin.go +++ b/internal/flypg/admin/admin.go @@ -2,7 +2,10 @@ package admin import ( "context" + "database/sql" "fmt" + "log" + "regexp" "strconv" "strings" @@ -428,3 +431,109 @@ func ValidatePGSettings(ctx context.Context, conn *pgx.Conn, requested map[strin return nil } + +func fixCollationMismatch(ctx context.Context, db *sql.DB) error { + query := ` + SELECT pg_describe_object(refclassid, refobjid, refobjsubid) AS "Collation", + pg_describe_object(classid, objid, objsubid) AS "Object" + FROM pg_depend d JOIN pg_collation c + ON refclassid = 'pg_collation'::regclass AND refobjid = c.oid + WHERE c.collversion <> pg_collation_actual_version(c.oid) + ORDER BY 1, 2;` + + rows, err := db.Query(query) + if err != nil { + return fmt.Errorf("failed to query collation mismatches: %v", err) + } + defer rows.Close() + + var collation, object string + for rows.Next() { + if err := rows.Scan(&collation, &object); err != nil { + return fmt.Errorf("failed to scan row: %v", err) + } + + fixObject(db, object) + } + + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to iterate over rows: %v", err) + } + + return nil +} + +func fixObject(db *sql.DB, object string) { + fmt.Printf("Fixing object: %s\n", object) + + switch { + case regexp.MustCompile(`index`).MatchString(object): + // reindex(db, object) + case regexp.MustCompile(`column`).MatchString(object): + // alterColumn(db, object) + case regexp.MustCompile(`constraint`).MatchString(object): + // dropAndRecreateConstraint(db, object) + case regexp.MustCompile(`materialized view`).MatchString(object): + // refreshMaterializedView(db, object) + case regexp.MustCompile(`function`).MatchString(object): + // recreateFunction(db, object) + case regexp.MustCompile(`view`).MatchString(object): + // recreateView(db, object) + case regexp.MustCompile(`trigger`).MatchString(object): + // recreateTrigger(db, object) + default: + log.Printf("Unknown object type: %s", object) + } +} + +const refreshCollationSQL = ` +DO $$ +DECLARE + r RECORD; +BEGIN + FOR r IN (SELECT datname FROM pg_database WHERE datallowconn = true) + LOOP + BEGIN + EXECUTE 'ALTER DATABASE ' || quote_ident(r.datname) || ' REFRESH COLLATION VERSION;'; + EXCEPTION + WHEN OTHERS THEN + RAISE NOTICE 'Failed to refresh collation for database: % - %', r.datname, SQLERRM; + END; + END LOOP; +END $$;` + +// RefreshCollationVersion will refresh the collation version for all databases. +func RefreshCollationVersion(ctx context.Context, conn *pgx.Conn) error { + _, err := conn.Exec(ctx, refreshCollationSQL) + return err +} + +const identifyCollationObjectsSQL = ` +SELECT pg_describe_object(refclassid, refobjid, refobjsubid) AS "Collation", + pg_describe_object(classid, objid, objsubid) AS "Object" + FROM pg_depend d JOIN pg_collation c + ON refclassid = 'pg_collation'::regclass AND refobjid = c.oid + WHERE c.collversion <> pg_collation_actual_version(c.oid) + ORDER BY 1, 2;` + +const reIndexSQL = ` +DO $$ +DECLARE + r RECORD; +BEGIN + FOR r IN (SELECT n.nspname, i.relname + FROM pg_index x + JOIN pg_class c ON c.oid = x.indrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + JOIN pg_class i ON i.oid = x.indexrelid + JOIN pg_attribute a ON a.attrelid = c.oid AND a.attnum = ANY(x.indkey) + JOIN pg_collation col ON col.oid = a.attcollation + WHERE col.collname = 'en_US.utf8') LOOP + EXECUTE 'REINDEX INDEX ' || quote_ident(r.nspname) || '.' || quote_ident(r.relname); + END LOOP; +END $$;` + +func ReIndex(ctx context.Context, conn *pgx.Conn) error { + _, err := conn.Exec(ctx, reIndexSQL) + return err +} diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 5a9364b2..9e70ef65 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -305,6 +305,12 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to unset read-only: %s", err) } } + + // Refresh collation for all databases. + if err := refreshCollation(ctx, conn); err != nil { + log.Printf("failed to refresh collation: %s", err) + } + case StandbyRoleName: // Register existing standby to apply any configuration changes. if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil { @@ -381,18 +387,18 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to issue registration certificate: %s", err) } } else { - if n.RepMgr.Witness { - log.Println("Registering witness") + // Create required users + if err := n.setupCredentials(ctx, conn); err != nil { + return fmt.Errorf("failed to create required users: %s", err) + } - // Create required users - if err := n.setupCredentials(ctx, conn); err != nil { - return fmt.Errorf("failed to create required users: %s", err) - } + // Setup repmgr database and extension + if err := n.RepMgr.enable(ctx, conn); err != nil { + return fmt.Errorf("failed to enable repmgr: %s", err) + } - // Setup repmgr database and extension - if err := n.RepMgr.enable(ctx, conn); err != nil { - return fmt.Errorf("failed to enable repmgr: %s", err) - } + if n.RepMgr.Witness { + log.Println("Registering witness") primary, err := n.RepMgr.ResolveMemberOverDNS(ctx) if err != nil { @@ -471,3 +477,31 @@ func setDirOwnership() error { return nil } + +func (n *Node) fixCollationMismatch(ctx context.Context, conn *pgx.Conn) error { + dbs, err := admin.ListDatabases(ctx, conn) + if err != nil { + return fmt.Errorf("failed to list databases: %s", err) + } + + // Add the template1 database to the list of databases to refresh. + dbs = append(dbs, admin.DbInfo{Name: "template1"}) + + for _, db := range dbs { + dbConn, err := n.NewLocalConnection(ctx, db.Name, n.SUCredentials) + if err != nil { + return fmt.Errorf("failed to establish connection to local node: %s", err) + } + defer func() { _ = dbConn.Close(ctx) }() + + if err := admin.RefreshCollationVersion(ctx, dbConn); err != nil { + return fmt.Errorf("failed to refresh collation: %s", err) + } + + if err := admin.ReIndex(ctx, dbConn); err != nil { + return fmt.Errorf("failed to reindex database: %s", err) + } + } + + return nil +} diff --git a/internal/utils/shell.go b/internal/utils/shell.go index cd550ff9..139c05a5 100644 --- a/internal/utils/shell.go +++ b/internal/utils/shell.go @@ -1,7 +1,9 @@ package utils import ( + "bytes" "fmt" + "io" "os" "os/exec" "os/user" @@ -19,7 +21,18 @@ func RunCommand(cmdStr, usr string) ([]byte, error) { cmd.SysProcAttr = &syscall.SysProcAttr{} cmd.SysProcAttr.Credential = &syscall.Credential{Uid: uint32(uid), Gid: uint32(gid)} - return cmd.Output() + var stdoutBuf, stderrBuf bytes.Buffer + cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf) + cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf) + + err = cmd.Run() + if err != nil { + if ee, ok := err.(*exec.ExitError); ok { + ee.Stderr = stderrBuf.Bytes() + } + } + + return stdoutBuf.Bytes(), err } func SetFileOwnership(pathToFile, owner string) error {