Skip to content

Commit

Permalink
feat: Subset stage 5: Virtual foreign keys
Browse files Browse the repository at this point in the history
* Introduced virtual_references that can be used to define logical foreign keys using column names or SQL expression
* Refactored subset system and removed artifacts
* Added subset query debugging in log-level=debug
* Implemented virtual keys validation

Closes #156
  • Loading branch information
wwoytenko committed Aug 29, 2024
1 parent 8bf72f4 commit 858f92f
Show file tree
Hide file tree
Showing 11 changed files with 390 additions and 87 deletions.
6 changes: 4 additions & 2 deletions internal/db/postgres/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,10 @@ func (d *Dump) startMainTx(ctx context.Context, conn *pgx.Conn) (pgx.Tx, error)
}

func (d *Dump) buildContextAndValidate(ctx context.Context, tx pgx.Tx) (err error) {
d.context, err = runtimeContext.NewRuntimeContext(ctx, tx, d.config.Dump.Transformation, d.registry,
d.pgDumpOptions, d.version)
d.context, err = runtimeContext.NewRuntimeContext(
ctx, tx, d.config.Dump.Transformation, d.registry, d.pgDumpOptions,
d.config.Dump.VirtualReferences, d.version,
)
if err != nil {
return fmt.Errorf("unable to build runtime context: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions internal/db/postgres/cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ func (v *Validate) Run(ctx context.Context) (int, error) {
}
v.config.Dump.Transformation = tablesToValidate

v.context, err = runtimeContext.NewRuntimeContext(ctx, tx, v.config.Dump.Transformation, v.registry,
v.pgDumpOptions, v.version)
v.context, err = runtimeContext.NewRuntimeContext(
ctx, tx, v.config.Dump.Transformation, v.registry,
v.pgDumpOptions, v.config.Dump.VirtualReferences, v.version,
)
if err != nil {
return nonZeroExitCode, fmt.Errorf("unable to build runtime context: %w", err)
}
Expand Down
29 changes: 25 additions & 4 deletions internal/db/postgres/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog/log"

"github.com/greenmaskio/greenmask/internal/db/postgres/entries"
"github.com/greenmaskio/greenmask/internal/db/postgres/pgdump"
Expand Down Expand Up @@ -61,8 +62,9 @@ type RuntimeContext struct {
//
// warnings are fatal procedure must be terminated immediately due to lack of objects required on the next step
func NewRuntimeContext(
ctx context.Context, tx pgx.Tx, cfg []*domains.Table, r *transformersUtils.TransformerRegistry, opt *pgdump.Options,
version int,
ctx context.Context, tx pgx.Tx, cfg []*domains.Table,
r *transformersUtils.TransformerRegistry, opt *pgdump.Options,
vr []*domains.VirtualReference, version int,
) (*RuntimeContext, error) {
var salt []byte
saltHex := os.Getenv("GREENMASK_GLOBAL_SALT")
Expand Down Expand Up @@ -98,8 +100,14 @@ func NewRuntimeContext(
if err != nil {
return nil, fmt.Errorf("cannot get database schema: %w", err)
}
vrWarns := validateVirtualReferences(vr, tablesEntries)
warnings = append(warnings, vrWarns...)
if len(vrWarns) > 0 {
// if there are any warnings, we should use them in the graph build
vr = nil
}

graph, err := subset.NewGraph(ctx, tx, tablesEntries)
graph, err := subset.NewGraph(ctx, tx, tablesEntries, vr)
if err != nil {
return nil, fmt.Errorf("error creating graph: %w", err)
}
Expand All @@ -109,7 +117,7 @@ func NewRuntimeContext(
if err = subset.SetSubsetQueries(graph); err != nil {
return nil, fmt.Errorf("cannot set subset queries: %w", err)
}

debugQueries(tablesEntries)
} else {
// if there are no subset tables, we can sort them by size and transformation costs
// TODO: Implement tables ordering for subsetted tables as well
Expand Down Expand Up @@ -182,3 +190,16 @@ func hasSubset(tables []*entries.Table) bool {
return len(table.SubsetConds) > 0
})
}

func debugQueries(tables []*entries.Table) {
for _, t := range tables {
if t.Query == "" {
continue
}
log.Debug().
Str("Schema", t.Schema).
Str("Table", t.Name).
Msg("Debug query")
log.Logger.Println(t.Query)
}
}
180 changes: 180 additions & 0 deletions internal/db/postgres/context/virtual_references.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package context

import (
"slices"

"github.com/greenmaskio/greenmask/internal/db/postgres/entries"
"github.com/greenmaskio/greenmask/internal/domains"
"github.com/greenmaskio/greenmask/pkg/toolkit"
)

func getReferencedKeys(r *domains.Reference) (res []string) {
for _, ref := range r.Columns {
if ref.Name != "" {
res = append(res, ref.Name)
} else if ref.Expression != "" {
res = append(res, ref.Expression)
}
}
return
}

func validateVirtualReferences(vrs []*domains.VirtualReference, tables []*entries.Table) (res toolkit.ValidationWarnings) {
for idx, vr := range vrs {
res = append(res, validateVirtualReference(idx, vr, tables)...)
}
return
}

func validateVirtualReference(tableIdx int, vr *domains.VirtualReference, tables []*entries.Table) (res toolkit.ValidationWarnings) {
if vr.Schema == "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("schema is required").
AddMeta("TableIdx", tableIdx)
res = append(res, w)
}
if vr.Name == "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("table name is required").
AddMeta("TableIdx", tableIdx)
res = append(res, w)
}
if len(vr.References) == 0 {
w := toolkit.NewValidationWarning().
SetMsg("virtual reference error: references are required: received empty").
SetSeverity(toolkit.ErrorValidationSeverity).
AddMeta("TableIdx", tableIdx).
AddMeta("TableName", vr.Name).
AddMeta("TableSchema", vr.Name)
res = append(res, w)
}

referencedTableIdx := slices.IndexFunc(tables, func(t *entries.Table) bool {
return t.Name == vr.Name && t.Schema == vr.Schema
})

if referencedTableIdx == -1 {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: table not found").
AddMeta("TableIdx", tableIdx).
AddMeta("TableName", vr.Name).
AddMeta("TableSchema", vr.Schema)
res = append(res, w)
return
}

fkT := tables[referencedTableIdx]

for idx, v := range vr.References {
var vrWarns toolkit.ValidationWarnings

primaryKeyTableIdx := slices.IndexFunc(tables, func(t *entries.Table) bool {
return t.Name == v.Name && t.Schema == v.Schema
})
if primaryKeyTableIdx == -1 {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: table not found").
AddMeta("ReferenceIdx", idx).
AddMeta("ReferenceName", v.Name).
AddMeta("ReferenceSchema", v.Schema)
vrWarns = append(vrWarns, w)
continue
}
pkT := tables[primaryKeyTableIdx]

for _, w := range validateReference(idx, v, fkT, pkT) {
w.AddMeta("TableIdx", tableIdx).
SetSeverity(toolkit.ErrorValidationSeverity).
AddMeta("TableName", vr.Name).
AddMeta("TableSchema", vr.Schema)
vrWarns = append(vrWarns, w)
}
res = append(res, vrWarns...)
}
return res
}

func validateReference(vrIdx int, v *domains.Reference, fkT, pkT *entries.Table) (res toolkit.ValidationWarnings) {
if v.Schema == "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: schema is required").
AddMeta("ReferenceIdx", vrIdx)
res = append(res, w)
}
if v.Name == "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: table name is required").
AddMeta("ReferenceIdx", vrIdx)
res = append(res, w)
}
if len(v.Columns) == 0 {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("columns are required: received empty").
AddMeta("ReferenceIdx", vrIdx).
AddMeta("ReferenceName", v.Name).
AddMeta("ReferenceSchema", v.Schema)
res = append(res, w)
}
refCols := getReferencedKeys(v)
if len(refCols) != len(pkT.PrimaryKey) {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: number of columns in reference does not match primary key").
AddMeta("ReferenceIdx", vrIdx).
AddMeta("ReferencedTableColumns", refCols).
AddMeta("PrimaryTableColumns", pkT.PrimaryKey).
AddMeta("ReferenceName", v.Name).
AddMeta("ReferenceSchema", v.Schema)
res = append(res, w)
}

for idx, c := range v.Columns {
var vrWarns toolkit.ValidationWarnings
for _, w := range validateColumn(idx, c, fkT) {
w.AddMeta("ReferenceIdx", vrIdx).
SetSeverity(toolkit.ErrorValidationSeverity).
AddMeta("ReferenceName", v.Name).
AddMeta("ReferenceSchema", v.Schema)
vrWarns = append(vrWarns, w)
}
res = append(res, vrWarns...)
}

return res
}

func validateColumn(colIdx int, c *domains.ReferencedColumn, fkT *entries.Table) (res toolkit.ValidationWarnings) {
if c.Name == "" && c.Expression == "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: name or expression is required").
AddMeta("ColumnIdx", colIdx)
res = append(res, w)
}
if c.Name != "" && c.Expression != "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: name and expression are mutually exclusive").
AddMeta("ColumnIdx", colIdx)
res = append(res, w)
}
if c.Name != "" && !slices.ContainsFunc(fkT.Columns, func(column *toolkit.Column) bool {
return column.Name == c.Name
}) {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: column not found").
AddMeta("ColumnIdx", colIdx).
AddMeta("ColumnName", c.Name)
res = append(res, w)
}

return res
}
6 changes: 0 additions & 6 deletions internal/db/postgres/subset/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type Component struct {
// Cycles
cycles [][]*Edge
cyclesIdents map[string]struct{}
keys []string
// groupedCycles - cycles grouped by the vertexes
groupedCycles map[string][]int
// groupedCyclesGraph - contains the mapping of the vertexes in the component to the edges in the original graph
Expand All @@ -36,11 +35,6 @@ func NewComponent(id int, componentGraph map[int][]*Edge, tables map[int]*entrie
cyclesIdents: make(map[string]struct{}),
}
c.findCycles()
if c.hasCycle() {
c.keys = c.getComponentKeys()
} else {
c.keys = c.getOneTable().PrimaryKey
}
c.groupCycles()
c.buildCyclesGraph()

Expand Down
2 changes: 1 addition & 1 deletion internal/db/postgres/subset/component_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ type ComponentLink struct {
component *Component
}

func NewComponentLink(idx int, c *Component, keys, overriddenKeys []string) *ComponentLink {
func NewComponentLink(idx int, c *Component) *ComponentLink {
return &ComponentLink{
idx: idx,
component: c,
Expand Down
Loading

0 comments on commit 858f92f

Please sign in to comment.