Skip to content

Commit

Permalink
Collation checkin
Browse files Browse the repository at this point in the history
  • Loading branch information
davissp14 committed Jun 21, 2024
1 parent f682f39 commit 17ed339
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 11 deletions.
18 changes: 18 additions & 0 deletions bin/refresh-collation
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

psql postgres://postgres:$OPERATOR_PASSWORD@$FLY_APP_NAME.internal:5432/postgres <<'EOF'
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT datname FROM pg_database WHERE datallowconn = true)
LOOP
BEGIN
EXECUTE 'ALTER DATABASE ' || quote_ident(r.datname) || ' REFRESH COLLATION VERSION;';
EXCEPTION
WHEN OTHERS THEN
RAISE NOTICE 'Failed to refresh collation for database: % - %', r.datname, SQLERRM;
END;
END LOOP;
END $$;
EOF
109 changes: 109 additions & 0 deletions internal/flypg/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package admin

import (
"context"
"database/sql"
"fmt"
"log"
"regexp"
"strconv"
"strings"

Expand Down Expand Up @@ -428,3 +431,109 @@ func ValidatePGSettings(ctx context.Context, conn *pgx.Conn, requested map[strin

return nil
}

func fixCollationMismatch(ctx context.Context, db *sql.DB) error {
query := `
SELECT pg_describe_object(refclassid, refobjid, refobjsubid) AS "Collation",
pg_describe_object(classid, objid, objsubid) AS "Object"
FROM pg_depend d JOIN pg_collation c
ON refclassid = 'pg_collation'::regclass AND refobjid = c.oid
WHERE c.collversion <> pg_collation_actual_version(c.oid)
ORDER BY 1, 2;`

rows, err := db.Query(query)
if err != nil {
return fmt.Errorf("failed to query collation mismatches: %v", err)
}
defer rows.Close()

var collation, object string
for rows.Next() {
if err := rows.Scan(&collation, &object); err != nil {
return fmt.Errorf("failed to scan row: %v", err)
}

fixObject(db, object)
}

if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate over rows: %v", err)
}

return nil
}

func fixObject(db *sql.DB, object string) {
fmt.Printf("Fixing object: %s\n", object)

switch {
case regexp.MustCompile(`index`).MatchString(object):
// reindex(db, object)
case regexp.MustCompile(`column`).MatchString(object):
// alterColumn(db, object)
case regexp.MustCompile(`constraint`).MatchString(object):
// dropAndRecreateConstraint(db, object)
case regexp.MustCompile(`materialized view`).MatchString(object):
// refreshMaterializedView(db, object)
case regexp.MustCompile(`function`).MatchString(object):
// recreateFunction(db, object)
case regexp.MustCompile(`view`).MatchString(object):
// recreateView(db, object)
case regexp.MustCompile(`trigger`).MatchString(object):
// recreateTrigger(db, object)
default:
log.Printf("Unknown object type: %s", object)
}
}

const refreshCollationSQL = `
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT datname FROM pg_database WHERE datallowconn = true)
LOOP
BEGIN
EXECUTE 'ALTER DATABASE ' || quote_ident(r.datname) || ' REFRESH COLLATION VERSION;';
EXCEPTION
WHEN OTHERS THEN
RAISE NOTICE 'Failed to refresh collation for database: % - %', r.datname, SQLERRM;
END;
END LOOP;
END $$;`

// RefreshCollationVersion will refresh the collation version for all databases.
func RefreshCollationVersion(ctx context.Context, conn *pgx.Conn) error {
_, err := conn.Exec(ctx, refreshCollationSQL)
return err
}

const identifyCollationObjectsSQL = `
SELECT pg_describe_object(refclassid, refobjid, refobjsubid) AS "Collation",
pg_describe_object(classid, objid, objsubid) AS "Object"
FROM pg_depend d JOIN pg_collation c
ON refclassid = 'pg_collation'::regclass AND refobjid = c.oid
WHERE c.collversion <> pg_collation_actual_version(c.oid)
ORDER BY 1, 2;`

const reIndexSQL = `
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT n.nspname, i.relname
FROM pg_index x
JOIN pg_class c ON c.oid = x.indrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
JOIN pg_class i ON i.oid = x.indexrelid
JOIN pg_attribute a ON a.attrelid = c.oid AND a.attnum = ANY(x.indkey)
JOIN pg_collation col ON col.oid = a.attcollation
WHERE col.collname = 'en_US.utf8') LOOP
EXECUTE 'REINDEX INDEX ' || quote_ident(r.nspname) || '.' || quote_ident(r.relname);
END LOOP;
END $$;`

func ReIndex(ctx context.Context, conn *pgx.Conn) error {
_, err := conn.Exec(ctx, reIndexSQL)
return err
}
54 changes: 44 additions & 10 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to unset read-only: %s", err)
}
}

// Refresh collation for all databases.
if err := refreshCollation(ctx, conn); err != nil {
log.Printf("failed to refresh collation: %s", err)
}

case StandbyRoleName:
// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
Expand Down Expand Up @@ -381,18 +387,18 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to issue registration certificate: %s", err)
}
} else {
if n.RepMgr.Witness {
log.Println("Registering witness")
// Create required users
if err := n.setupCredentials(ctx, conn); err != nil {
return fmt.Errorf("failed to create required users: %s", err)
}

// Create required users
if err := n.setupCredentials(ctx, conn); err != nil {
return fmt.Errorf("failed to create required users: %s", err)
}
// Setup repmgr database and extension
if err := n.RepMgr.enable(ctx, conn); err != nil {
return fmt.Errorf("failed to enable repmgr: %s", err)
}

// Setup repmgr database and extension
if err := n.RepMgr.enable(ctx, conn); err != nil {
return fmt.Errorf("failed to enable repmgr: %s", err)
}
if n.RepMgr.Witness {
log.Println("Registering witness")

primary, err := n.RepMgr.ResolveMemberOverDNS(ctx)
if err != nil {
Expand Down Expand Up @@ -471,3 +477,31 @@ func setDirOwnership() error {

return nil
}

func (n *Node) fixCollationMismatch(ctx context.Context, conn *pgx.Conn) error {
dbs, err := admin.ListDatabases(ctx, conn)
if err != nil {
return fmt.Errorf("failed to list databases: %s", err)
}

// Add the template1 database to the list of databases to refresh.
dbs = append(dbs, admin.DbInfo{Name: "template1"})

for _, db := range dbs {
dbConn, err := n.NewLocalConnection(ctx, db.Name, n.SUCredentials)
if err != nil {
return fmt.Errorf("failed to establish connection to local node: %s", err)
}
defer func() { _ = dbConn.Close(ctx) }()

if err := admin.RefreshCollationVersion(ctx, dbConn); err != nil {
return fmt.Errorf("failed to refresh collation: %s", err)
}

if err := admin.ReIndex(ctx, dbConn); err != nil {
return fmt.Errorf("failed to reindex database: %s", err)
}
}

return nil
}
15 changes: 14 additions & 1 deletion internal/utils/shell.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package utils

import (
"bytes"
"fmt"
"io"
"os"
"os/exec"
"os/user"
Expand All @@ -19,7 +21,18 @@ func RunCommand(cmdStr, usr string) ([]byte, error) {
cmd.SysProcAttr = &syscall.SysProcAttr{}
cmd.SysProcAttr.Credential = &syscall.Credential{Uid: uint32(uid), Gid: uint32(gid)}

return cmd.Output()
var stdoutBuf, stderrBuf bytes.Buffer
cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf)
cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf)

err = cmd.Run()
if err != nil {
if ee, ok := err.(*exec.ExitError); ok {
ee.Stderr = stderrBuf.Bytes()
}
}

return stdoutBuf.Bytes(), err
}

func SetFileOwnership(pathToFile, owner string) error {
Expand Down

0 comments on commit 17ed339

Please sign in to comment.