Skip to content

Commit

Permalink
Table size and transformation scoring
Browse files Browse the repository at this point in the history
* Added tables scoring according to their size and transformation count. It should help to distribute the dumper tasks equally to the workers (#50). The transformer score is 0.03 for table size
* Fixed pgx rows object leakage during the schema introspection stage
  • Loading branch information
wwoytenko committed Apr 4, 2024
1 parent ce23c93 commit 25138c5
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 8 deletions.
44 changes: 44 additions & 0 deletions internal/db/postgres/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package context
import (
"context"
"fmt"
"slices"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
Expand All @@ -28,6 +29,8 @@ import (
"github.com/greenmaskio/greenmask/pkg/toolkit"
)

const defaultTransformerCostMultiplier = 0.03

// RuntimeContext - describes current runtime behaviour according to the config and schema objects
type RuntimeContext struct {
// Tables - map of build tables with toolkit that was wrapped into dump.Entry
Expand Down Expand Up @@ -73,6 +76,8 @@ func NewRuntimeContext(
return nil, fmt.Errorf("cannot build dump object list: %w", err)
}

scoreTablesEntriesAndSort(dataSectionObjects, cfg)

schema, err := getDatabaseSchema(ctx, tx, opt)
if err != nil {
return nil, fmt.Errorf("cannot get database schema: %w", err)
Expand All @@ -91,3 +96,42 @@ func NewRuntimeContext(
func (rc *RuntimeContext) IsFatal() bool {
return rc.Warnings.IsFatal()
}

func scoreTablesEntriesAndSort(dataSectionObjects []entries.Entry, cfg []*domains.Table) {
for _, entry := range dataSectionObjects {
t, ok := entry.(*entries.Table)
if ok {
var transformersCount float64
idx := slices.IndexFunc(cfg, func(table *domains.Table) bool {
return table.Name == t.Name && table.Schema == t.Schema
})
if idx != -1 {
transformersCount = float64(len(cfg[idx].Transformers))
}

// scores = relSize + (realSize * 0.03 * transformersCount)
t.Scores = t.Size + int64(float64(t.Size)*defaultTransformerCostMultiplier*transformersCount)
}

}

slices.SortFunc(dataSectionObjects, func(a, b entries.Entry) int {
var scoresA, scoresB int64
t, ok := a.(*entries.Table)
if ok {
scoresA = t.Scores
}
t, ok = b.(*entries.Table)
if ok {
scoresB = t.Scores
}

if scoresA > scoresB {
return -1
} else if scoresA < scoresB {
return 1
}
return 0
})

}
22 changes: 17 additions & 5 deletions internal/db/postgres/context/pg_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"

"github.com/greenmaskio/greenmask/internal/db/postgres/entries"
"github.com/greenmaskio/greenmask/internal/db/postgres/pgdump"
Expand Down Expand Up @@ -61,13 +62,13 @@ func getDumpObjects(
defer tableSearchRows.Close()
for tableSearchRows.Next() {
var oid toc.Oid
var lastVal int64
var lastVal, relSize int64
var schemaName, name, owner, rootPtName, rootPtSchema string
var relKind rune
var excludeData, isCalled bool
var ok bool

err = tableSearchRows.Scan(&oid, &schemaName, &name, &owner, &relKind,
err = tableSearchRows.Scan(&oid, &schemaName, &name, &owner, &relSize, &relKind,
&rootPtSchema, &rootPtName, &excludeData, &isCalled, &lastVal,
)
if err != nil {
Expand Down Expand Up @@ -106,6 +107,7 @@ func getDumpObjects(
Name: name,
Schema: schemaName,
Oid: toolkit.Oid(oid),
Size: relSize,
},
Owner: owner,
RelKind: relKind,
Expand All @@ -118,6 +120,10 @@ func getDumpObjects(

if table.ExcludeData {
// TODO: Ensure data exclusion works properly
log.Debug().
Str("TableSchema", table.Schema).
Str("TableName", table.Name).
Msg("object data excluded")
continue
}

Expand Down Expand Up @@ -179,14 +185,15 @@ func getDumpObjects(
if err != nil {
return nil, fmt.Errorf("error quering LargeObjectDescribeAclItemQuery: %w", err)
}
defer loDescribeDefaultAclRows.Close()
for loDescribeDefaultAclRows.Next() {
item := &entries.ACLItem{}
if err = loDescribeDefaultAclRows.Scan(&item.Grantor, &item.Grantee, &item.PrivilegeType, &item.Grantable); err != nil {
loDescribeDefaultAclRows.Close()
return nil, fmt.Errorf("error scanning LargeObjectDescribeAclItemQuery: %w", err)
}
defaultACLItems = append(defaultACLItems, item)
}
loDescribeDefaultAclRows.Close()
defaultACL.Items = defaultACLItems
lo.DefaultACL = defaultACL

Expand All @@ -196,14 +203,16 @@ func getDumpObjects(
if err != nil {
return nil, fmt.Errorf("error quering LargeObjectGetAclQuery: %w", err)
}
defer loAclRows.Close()
loAclRows.Close()
for loAclRows.Next() {
a := &entries.ACL{}
if err = loAclRows.Scan(&a.Value); err != nil {
loAclRows.Close()
return nil, fmt.Errorf("error scanning LargeObjectGetAclQuery: %w", err)
}
acls = append(acls, a)
}
loAclRows.Close()

// Getting ACL items
for _, a := range acls {
Expand All @@ -212,14 +221,15 @@ func getDumpObjects(
if err != nil {
return nil, fmt.Errorf("error quering LargeObjectDescribeAclItemQuery: %w", err)
}
defer loDescribeAclRows.Close()
for loDescribeAclRows.Next() {
item := &entries.ACLItem{}
if err = loDescribeAclRows.Scan(&item.Grantor, &item.Grantee, &item.PrivilegeType, &item.Grantable); err != nil {
loDescribeAclRows.Close()
return nil, fmt.Errorf("error scanning LargeObjectDescribeAclItemQuery: %w", err)
}
aclItems = append(aclItems, item)
}
loDescribeAclRows.Close()
a.Items = aclItems
}

Expand Down Expand Up @@ -351,6 +361,7 @@ func BuildTableSearchQuery(
n.nspname as "Schema",
c.relname as "Name",
pg_catalog.pg_get_userbyid(c.relowner) as "Owner",
pg_catalog.pg_relation_size(c.oid) as "Size",
c.relkind as "RelKind",
(coalesce(pn.nspname, '')) as "rootPtSchema",
(coalesce(pc.relname, '')) as "rootPtName",
Expand Down Expand Up @@ -389,6 +400,7 @@ func BuildTableSearchQuery(
AND n.nspname <> 'pg_catalog'
AND n.nspname !~ '^pg_toast'
AND n.nspname <> 'information_schema'
ORDER BY 5 DESC
`

return fmt.Sprintf(totalQuery, tableDataExclusionCond, tableInclusionCond, tableExclusionCond,
Expand Down
6 changes: 3 additions & 3 deletions internal/db/postgres/context/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ var (
c.relname as "Name",
pg_catalog.pg_get_userbyid(c.relowner) as "Owner",
c.relkind as "RelKind",
(coalesce(pn.nspname, '')) as "rootPtSchema",
(coalesce(pc.relname, '')) as "rootPtName",
(coalesce(pc.oid, 0))::TEXT::INT as "rootOid"
(coalesce(pn.nspname, '')) as "RootPtSchema",
(coalesce(pc.relname, '')) as "RootPtName",
(coalesce(pc.oid, 0))::TEXT::INT as "RootOid"
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_catalog.pg_inherits i ON i.inhrelid = c.oid
Expand Down
1 change: 1 addition & 0 deletions internal/db/postgres/entries/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Table struct {
Driver *toolkit.Driver
// ValidateLimitedRecords - perform dumping and transformation only for N records and exit
ValidateLimitedRecords uint64
Scores int64
}

func (t *Table) HasCustomTransformer() bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/toolkit/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Table struct {
Kind string `json:"kind"`
Parent Oid `json:"parent"`
Children []Oid `json:"children"`
Size int64 `json:"size"`
Constraints []Constraint `json:"-"`
}

Expand Down

0 comments on commit 25138c5

Please sign in to comment.