From 29f466f37274dbb296b28a23a383c79bc07741e4 Mon Sep 17 00:00:00 2001 From: Vadim Voitenko Date: Fri, 1 Nov 2024 12:05:15 +0200 Subject: [PATCH] feat: Revised implementation * Checked table object initialization order * Added support end-to-end FK/PK references --- internal/db/postgres/cmd/dump.go | 5 +- .../db/postgres/context/config_builder.go | 259 ++++++++++++++---- internal/db/postgres/context/context.go | 44 +-- internal/db/postgres/context/pg_catalog.go | 8 +- .../db/postgres/context/virtual_references.go | 2 +- internal/db/postgres/entries/table.go | 1 + internal/db/postgres/subset/edge.go | 20 ++ internal/db/postgres/subset/graph.go | 49 +++- internal/db/postgres/subset/table_link.go | 16 ++ internal/domains/config.go | 22 +- 10 files changed, 323 insertions(+), 103 deletions(-) diff --git a/internal/db/postgres/cmd/dump.go b/internal/db/postgres/cmd/dump.go index ac597983..3b6827f0 100644 --- a/internal/db/postgres/cmd/dump.go +++ b/internal/db/postgres/cmd/dump.go @@ -288,6 +288,9 @@ func (d *Dump) taskProducer(ctx context.Context, tasks chan<- dumpers.DumpTask) var task dumpers.DumpTask switch v := dumpObj.(type) { case *entries.Table: + if v.RelKind == 'p' { + continue + } task = dumpers.NewTableDumper(v, d.validate, d.validateRowsLimit, d.pgDumpOptions.Pgzip) case *entries.Sequence: task = dumpers.NewSequenceDumper(v) @@ -355,7 +358,7 @@ func (d *Dump) setDumpDependenciesGraph(tables []*entries.Table) { d.dumpDependenciesGraph = make(map[int32][]int32) for _, oid := range sortedOids { idx := slices.IndexFunc(tables, func(entry *entries.Table) bool { - return entry.Oid == oid + return entry.Oid == oid || entry.RootPtOid == oid }) if idx == -1 { panic(fmt.Sprintf("table not found: oid=%d", oid)) diff --git a/internal/db/postgres/context/config_builder.go b/internal/db/postgres/context/config_builder.go index afc58785..f534d7bb 100644 --- a/internal/db/postgres/context/config_builder.go +++ b/internal/db/postgres/context/config_builder.go @@ -10,11 +10,26 @@ import ( "github.com/rs/zerolog/log" "github.com/greenmaskio/greenmask/internal/db/postgres/entries" + "github.com/greenmaskio/greenmask/internal/db/postgres/subset" transformersUtils "github.com/greenmaskio/greenmask/internal/db/postgres/transformers/utils" "github.com/greenmaskio/greenmask/internal/domains" "github.com/greenmaskio/greenmask/pkg/toolkit" ) +const ( + columnParameterName = "column" + engineParameterName = "engine" +) + +// transformersMapping - map dump object to transformation config from yaml. This uses for validation and building +// configuration for Tables +type transformersMapping struct { + entry *entries.Table + columnName string + attNum int + cfg *domains.TransformerConfig +} + // tableExistsQuery - map dump object to transformation config from yaml. This uses for validation and building // configuration for Tables type tableConfigMapping struct { @@ -22,67 +37,39 @@ type tableConfigMapping struct { config *domains.Table } -// entriesConfig - config for tables, sequences and blobs, that are used in the runtime context -type entriesConfig struct { - tablesWithTransformers []*tableConfigMapping - tables []*entries.Table - sequences []*entries.Sequence - blobs *entries.Blobs - // cachedRealTables - filtered list of tables that are not partitioned tables - cachedRealTables []*entries.Table -} - -func (ec *entriesConfig) Tables() []*entries.Table { - if ec.cachedRealTables != nil { - return ec.cachedRealTables - } - for _, t := range ec.tables { - if t.RelKind == 'p' { - continue +func (tcm *tableConfigMapping) hasTransformerWithApplyForReferences() bool { + for _, tr := range tcm.config.Transformers { + if tr.ApplyForReferences { + return true } - ec.cachedRealTables = append(ec.cachedRealTables, t) } - return ec.cachedRealTables -} - -func (ec *entriesConfig) Sequences() []*entries.Sequence { - return ec.sequences -} - -func (ec *entriesConfig) Blobs() *entries.Blobs { - return ec.blobs + return false } // ValidateAndBuildTableConfig - validates Tables, toolkit and their parameters. Builds config for Tables and returns // ValidationWarnings that can be used for checking helpers in configuring and debugging transformation. Those // may contain the schema affection warnings that would be useful for considering consistency func validateAndBuildEntriesConfig( - ctx context.Context, tx pgx.Tx, typeMap *pgtype.Map, + ctx context.Context, tx pgx.Tx, entries []*entries.Table, typeMap *pgtype.Map, cfg *domains.Dump, registry *transformersUtils.TransformerRegistry, - version int, types []*toolkit.Type, -) (*entriesConfig, toolkit.ValidationWarnings, error) { + version int, types []*toolkit.Type, graph *subset.Graph, +) (toolkit.ValidationWarnings, error) { var warnings toolkit.ValidationWarnings // Validate that the Tables in config exist in the database tableConfigExistsWarns, err := validateConfigTables(ctx, tx, cfg.Transformation) warnings = append(warnings, tableConfigExistsWarns...) if err != nil { - return nil, nil, fmt.Errorf("cannot validate Tables: %w", err) + return nil, fmt.Errorf("cannot validate Tables: %w", err) } if tableConfigExistsWarns.IsFatal() { - return nil, tableConfigExistsWarns, nil - } - - // Get list of entries (Tables, sequences, blobs) from the database - tables, sequences, blobs, err := getDumpObjects(ctx, version, tx, &cfg.PgDumpOptions) - if err != nil { - return nil, nil, fmt.Errorf("cannot get Tables: %w", err) + return tableConfigExistsWarns, nil } // Assign settings to the Tables using config received //entriesWithTransformers := findTablesWithTransformers(cfg.Transformation, Tables) - entriesWithTransformers, err := getTablesEntriesConfig(ctx, tx, cfg.Transformation, tables) + entriesWithTransformers, err := setConfigToEntries(ctx, tx, cfg.Transformation, entries, graph) if err != nil { - return nil, nil, fmt.Errorf("cannot get Tables entries config: %w", err) + return nil, fmt.Errorf("cannot get Tables entries config: %w", err) } // TODO: // Check if any has relkind = p @@ -97,14 +84,14 @@ func validateAndBuildEntriesConfig( driverWarnings, err := setGlobalDriverForTable(cfgMapping.entry, types) warnings = append(warnings, driverWarnings...) if err != nil { - return nil, nil, fmt.Errorf( + return nil, fmt.Errorf( "cannot set global driver for table %s.%s: %w", cfgMapping.entry.Schema, cfgMapping.entry.Name, err, ) } enrichWarningsWithTableName(driverWarnings, cfgMapping.entry) if driverWarnings.IsFatal() { - return nil, driverWarnings, nil + return driverWarnings, nil } // Compile when condition and set to the table entry @@ -112,12 +99,12 @@ func validateAndBuildEntriesConfig( enrichWarningsWithTableName(driverWarnings, cfgMapping.entry) warnings = append(warnings, whenCondWarns...) if whenCondWarns.IsFatal() { - return nil, whenCondWarns, nil + return whenCondWarns, nil } // Set table constraints if err := setTableConstraints(ctx, tx, cfgMapping.entry, version); err != nil { - return nil, nil, fmt.Errorf( + return nil, fmt.Errorf( "cannot set table constraints for table %s.%s: %w", cfgMapping.entry.Schema, cfgMapping.entry.Name, err, ) @@ -125,7 +112,7 @@ func validateAndBuildEntriesConfig( // Set primary keys for the table if err := setTablePrimaryKeys(ctx, tx, cfgMapping.entry); err != nil { - return nil, nil, fmt.Errorf( + return nil, fmt.Errorf( "cannot set primary keys for table %s.%s: %w", cfgMapping.entry.Schema, cfgMapping.entry.Name, err, ) @@ -139,19 +126,14 @@ func validateAndBuildEntriesConfig( enrichWarningsWithTableName(transformersInitWarns, cfgMapping.entry) warnings = append(warnings, transformersInitWarns...) if err != nil { - return nil, nil, fmt.Errorf( + return nil, fmt.Errorf( "cannot initialise and set transformers for table %s.%s: %w", cfgMapping.entry.Schema, cfgMapping.entry.Name, err, ) } } - return &entriesConfig{ - tables: tables, - tablesWithTransformers: entriesWithTransformers, - sequences: sequences, - blobs: blobs, - }, warnings, nil + return warnings, nil } // validateConfigTables - validates that the Tables in the config exist in the database. This function iterate through @@ -216,12 +198,19 @@ func findTablesWithTransformers( return entriesWithTransformers } -func getTablesEntriesConfig( - ctx context.Context, tx pgx.Tx, cfg []*domains.Table, tables []*entries.Table, +func setConfigToEntries( + ctx context.Context, tx pgx.Tx, cfg []*domains.Table, tables []*entries.Table, g *subset.Graph, ) ([]*tableConfigMapping, error) { var res []*tableConfigMapping for _, tcm := range findTablesWithTransformers(cfg, tables) { + if tcm.hasTransformerWithApplyForReferences() { + // If table has transformer with apply_for_references, then we need to find all reference tables + // and add them to the list + refTables := getRefTables(tcm.entry, tcm.config, g) + res = append(res, refTables...) + } if tcm.entry.RelKind != 'p' { + // If table is not partitioned, simply append it to the result res = append(res, tcm) continue } @@ -232,6 +221,7 @@ func getTablesEntriesConfig( tcm.entry.Schema, tcm.entry.Name, ) } + parts, err := findPartitionsOfPartitionedTable(ctx, tx, tcm.entry) if err != nil { return nil, fmt.Errorf( @@ -248,6 +238,9 @@ func getTablesEntriesConfig( continue } e := tables[idx] + e.RootPtName = tcm.entry.Name + e.RootPtSchema = tcm.entry.Schema + e.RootPtOid = tcm.entry.Oid e.Columns = tcm.entry.Columns res = append(res, &tableConfigMapping{ entry: e, @@ -258,6 +251,158 @@ func getTablesEntriesConfig( return res, nil } +func getRefTables(rootTable *entries.Table, rootTableCfg *domains.Table, graph *subset.Graph) []*tableConfigMapping { + var res []*tableConfigMapping + //visited := make(map[string]bool) + rootTransformersMapping := collectRootTransformers(rootTable, rootTableCfg) + + // Start DFS traversal from the root table + for _, rootTr := range rootTransformersMapping { + buildRefsWithEndToEndDfs( + rootTable, rootTableCfg, graph, rootTransformersMapping, rootTr, &res, false, + ) + } + + return res +} + +// buildRefsWithEndToEndDfs performs depth-first search to apply transformations to child tables +// based on the root transformers mapping and graph structure, avoiding cycles +func buildRefsWithEndToEndDfs( + table *entries.Table, rootTableCfg *domains.Table, graph *subset.Graph, + transformers []*transformersMapping, rootTr *transformersMapping, + res *[]*tableConfigMapping, checkEndToEnd bool) { + //tableKey := fmt.Sprintf("%s.%s", table.Schema, table.Name) + //if visited[tableKey] { + // return + //} + //visited[tableKey] = true + //defer func() { delete(visited, tableKey) }() // unmark after recursion + + rg := graph.ReversedGraph() + tableIdx := findTableIndex(graph, table) + if tableIdx == -1 { + log.Warn(). + Str("SchemaName", table.Schema). + Str("TableName", table.Name). + Msg("transformer inheritance for ref: cannot find table in the graph: table will be ignored") + return + } + + for _, r := range rg[tableIdx] { + // Check for end-to-end PK-FK relationship only if it's beyond the first table + if checkEndToEnd && !isEndToEndPKFK(graph, r.From().Table()) { + continue + } + processReference(r, rootTableCfg, transformers, res) + // Recursively call DFS on child reference, setting checkEndToEnd to true after the first level + buildRefsWithEndToEndDfs( + r.To().Table(), rootTableCfg, graph, transformers, rootTr, res, true, + ) + } +} + +// collectRootTransformers gathers all transformers in the root table's configuration +func collectRootTransformers(rootTable *entries.Table, rootTableCfg *domains.Table) []*transformersMapping { + var rootTransformersMapping []*transformersMapping + for _, tr := range rootTableCfg.Transformers { + if !tr.ApplyForReferences || string(tr.Params[engineParameterName]) != "hash" { + continue + } + idx := slices.Index(rootTable.PrimaryKey, string(tr.Params[columnParameterName])) + if idx == -1 { + continue + } + rootTransformersMapping = append(rootTransformersMapping, &transformersMapping{ + entry: rootTable, + columnName: string(tr.Params[columnParameterName]), + attNum: idx, + cfg: tr, + }) + } + return rootTransformersMapping +} + +// findTableIndex locates the index of a table in the graph by name and schema +func findTableIndex(graph *subset.Graph, table *entries.Table) int { + return slices.IndexFunc(graph.GetTables(), func(t *entries.Table) bool { + return (table.Name == t.Name || fmt.Sprintf(`"%s"`, table.Name) == t.Name) && + (table.Schema == t.Schema || fmt.Sprintf(`"%s"`, table.Schema) == t.Schema) + }) +} + +// processReference applies transformers to the reference table if it matches criteria +// and recursively calls buildRefsWithEndToEndDfs on the child references +func processReference( + r *subset.Edge, rootTableCfg *domains.Table, transformers []*transformersMapping, + res *[]*tableConfigMapping, +) { + for _, rootTr := range transformers { + // Get the primary key column name of the root table + fkKeys := r.To().Keys() + refColName := fkKeys[rootTr.attNum].Name + trConf := rootTr.cfg.Clone() + trConf.Params["column"] = toolkit.ParamsValue(refColName) + + colTypeOverride := getColumnTypeOverride(rootTableCfg, rootTr.columnName) + addTransformerToReferenceTable(r, trConf, colTypeOverride, res) + } +} + +// addTransformerToReferenceTable adds the transformer configuration to the reference table in the results +func addTransformerToReferenceTable(r *subset.Edge, trConf *domains.TransformerConfig, colTypeOverride map[string]string, res *[]*tableConfigMapping) { + refTableIdx := slices.IndexFunc(*res, func(tcm *tableConfigMapping) bool { + return tcm.entry.Name == r.To().Table().Name && tcm.entry.Schema == r.To().Table().Schema + }) + if refTableIdx != -1 { + (*res)[refTableIdx].config.Transformers = append((*res)[refTableIdx].config.Transformers, trConf) + } else { + *res = append(*res, &tableConfigMapping{ + entry: r.To().Table(), + config: &domains.Table{ + Schema: r.To().Table().Schema, + Name: r.To().Table().Name, + Transformers: []*domains.TransformerConfig{trConf}, + ColumnsTypeOverride: colTypeOverride, + }, + }) + } +} + +// getColumnTypeOverride retrieves column type overrides for foreign key columns, if specified +func getColumnTypeOverride(rootTableCfg *domains.Table, columnName string) map[string]string { + colTypeOverride := make(map[string]string) + if rootTableCfg.ColumnsTypeOverride != nil && rootTableCfg.ColumnsTypeOverride[columnName] != "" { + colTypeOverride[columnName] = rootTableCfg.ColumnsTypeOverride[columnName] + } + return colTypeOverride +} + +// isEndToEndPKFK checks if a table has PK and FK on the same columns (end-to-end identifier) using the graph +func isEndToEndPKFK(graph *subset.Graph, table *entries.Table) bool { + // Get all references of the table using the graph + //references := graph.GetReferencesForTable(table) + idx := slices.IndexFunc(graph.Tables(), func(t *entries.Table) bool { + return t.Name == table.Name && t.Schema == table.Schema + }) + rg := graph.ReversedGraph() + var foundInFK bool + for _, ref := range rg[idx] { + for _, fkColName := range ref.To().Keys() { + for _, pkColName := range ref.To().Table().PrimaryKey { + if pkColName == fkColName.Name { + foundInFK = true + break + } + } + if foundInFK { + break + } + } + } + return foundInFK +} + func findPartitionsOfPartitionedTable(ctx context.Context, tx pgx.Tx, t *entries.Table) ([]toolkit.Oid, error) { log.Debug(). Str("TableSchema", t.Schema). @@ -330,8 +475,7 @@ func setTableConstraints( return nil } -func setTablePrimaryKeys( - ctx context.Context, tx pgx.Tx, t *entries.Table, +func setTablePrimaryKeys(ctx context.Context, tx pgx.Tx, t *entries.Table, ) (err error) { t.PrimaryKey, err = getPrimaryKeyColumns(ctx, tx, t.Oid) if err != nil { @@ -371,8 +515,7 @@ func enrichWarningsWithTransformerName(warns toolkit.ValidationWarnings, n strin } } -func initAndSetupTransformers( - ctx context.Context, t *entries.Table, cfg *domains.Table, r *transformersUtils.TransformerRegistry, +func initAndSetupTransformers(ctx context.Context, t *entries.Table, cfg *domains.Table, r *transformersUtils.TransformerRegistry, ) (toolkit.ValidationWarnings, error) { var warnings toolkit.ValidationWarnings if len(cfg.Transformers) == 0 { diff --git a/internal/db/postgres/context/context.go b/internal/db/postgres/context/context.go index ccaef349..61b9bbfc 100644 --- a/internal/db/postgres/context/context.go +++ b/internal/db/postgres/context/context.go @@ -78,52 +78,58 @@ func NewRuntimeContext( return nil, fmt.Errorf("cannot build type map: %w", err) } - dumpObjects, buildWarns, err := validateAndBuildEntriesConfig( - ctx, tx, typeMap, cfg, r, version, types, - ) + // Get list of entries (Tables, sequences, blobs) from the database + tables, sequences, blobs, err := getDumpObjects(ctx, version, tx, &cfg.PgDumpOptions) if err != nil { - return nil, fmt.Errorf("cannot validate and build table config: %w", err) - } - warnings = append(warnings, buildWarns...) - if buildWarns.IsFatal() { - return &RuntimeContext{ - Warnings: warnings, - }, nil + return nil, fmt.Errorf("cannot get Tables: %w", err) } - vrWarns := validateVirtualReferences(vr, dumpObjects.Tables()) + vrWarns := validateVirtualReferences(vr, tables) warnings = append(warnings, vrWarns...) if len(vrWarns) > 0 { // if there are any warnings, we shouldn't use them in the graph build vr = nil } - graph, err := subset.NewGraph(ctx, tx, dumpObjects.Tables(), vr) + graph, err := subset.NewGraph(ctx, tx, tables, vr) if err != nil { return nil, fmt.Errorf("error creating graph: %w", err) } - if hasSubset(dumpObjects.Tables()) { + if hasSubset(tables) { // If table has subset the restoration must be in the topological order // The Tables must be dumped one by one if err = subset.SetSubsetQueries(graph); err != nil { return nil, fmt.Errorf("cannot set subset queries: %w", err) } - debugQueries(dumpObjects.Tables()) + debugQueries(tables) } else { // if there are no subset Tables, we can sort them by size and transformation costs // TODO: Implement Tables ordering for subsetted Tables as well - scoreTablesEntriesAndSort(dumpObjects.Tables()) + scoreTablesEntriesAndSort(tables) + } + + buildWarns, err := validateAndBuildEntriesConfig( + ctx, tx, tables, typeMap, cfg, r, version, types, graph, + ) + if err != nil { + return nil, fmt.Errorf("cannot validate and build table config: %w", err) + } + warnings = append(warnings, buildWarns...) + if buildWarns.IsFatal() { + return &RuntimeContext{ + Warnings: warnings, + }, nil } var dataSectionObjects []entries.Entry - for _, seq := range dumpObjects.sequences { + for _, seq := range sequences { dataSectionObjects = append(dataSectionObjects, seq) } - for _, table := range dumpObjects.Tables() { + for _, table := range tables { dataSectionObjects = append(dataSectionObjects, table) } - if dumpObjects.blobs != nil { - dataSectionObjects = append(dataSectionObjects, dumpObjects.blobs) + if blobs != nil { + dataSectionObjects = append(dataSectionObjects, blobs) } //inheritTransformerOnReferences(&cfg, graph) diff --git a/internal/db/postgres/context/pg_catalog.go b/internal/db/postgres/context/pg_catalog.go index 7f9ed17d..7f2019f2 100644 --- a/internal/db/postgres/context/pg_catalog.go +++ b/internal/db/postgres/context/pg_catalog.go @@ -126,10 +126,10 @@ func getTables( Oid: toolkit.Oid(oid), Size: relSize, }, - Owner: owner, - RelKind: relKind, - RootPtSchema: rootPtSchema, - RootPtName: rootPtName, + Owner: owner, + RelKind: relKind, + //RootPtSchema: rootPtSchema, + //RootPtName: rootPtName, LoadViaPartitionRoot: options.LoadViaPartitionRoot, } tables = append(tables, table) diff --git a/internal/db/postgres/context/virtual_references.go b/internal/db/postgres/context/virtual_references.go index ce96e6b0..054c9c28 100644 --- a/internal/db/postgres/context/virtual_references.go +++ b/internal/db/postgres/context/virtual_references.go @@ -37,7 +37,7 @@ func validateVirtualReference(tableIdx int, vr *domains.VirtualReference, tables if vr.Name == "" { w := toolkit.NewValidationWarning(). SetSeverity(toolkit.ErrorValidationSeverity). - SetMsg("table name is required"). + SetMsg("table columnName is required"). AddMeta("TableIdx", tableIdx) res = append(res, w) } diff --git a/internal/db/postgres/entries/table.go b/internal/db/postgres/entries/table.go index 81ed3bd3..35f72d08 100644 --- a/internal/db/postgres/entries/table.go +++ b/internal/db/postgres/entries/table.go @@ -38,6 +38,7 @@ type Table struct { RootPtSchema string // RootPtName - name of the root partition table uses in partitioned tables when LoadViaPartitionRoot is set RootPtName string + RootPtOid toolkit.Oid TransformersContext []*utils.TransformerContext Dependencies []int32 DumpId int32 diff --git a/internal/db/postgres/subset/edge.go b/internal/db/postgres/subset/edge.go index fc7c1626..254f3c6a 100644 --- a/internal/db/postgres/subset/edge.go +++ b/internal/db/postgres/subset/edge.go @@ -17,3 +17,23 @@ func NewEdge(id, idx int, isNullable bool, a *TableLink, b *TableLink) *Edge { to: b, } } + +func (e *Edge) ID() int { + return e.id +} + +func (e *Edge) Index() int { + return e.idx +} + +func (e *Edge) IsNullable() bool { + return e.isNullable +} + +func (e *Edge) From() *TableLink { + return e.from +} + +func (e *Edge) To() *TableLink { + return e.to +} diff --git a/internal/db/postgres/subset/graph.go b/internal/db/postgres/subset/graph.go index 2c9e5295..3d444b16 100644 --- a/internal/db/postgres/subset/graph.go +++ b/internal/db/postgres/subset/graph.go @@ -45,8 +45,10 @@ type Graph struct { tables []*entries.Table // graph - the oriented graph representation of the DB tables graph [][]*Edge + // reversedGraph - the reversed oriented graph representation of the DB tables + reversedGraph [][]*Edge // graph - the oriented graph representation of the DB tables - reversedGraph [][]int + reversedSimpleGraph [][]int // scc - the strongly connected components in the graph scc []*Component // condensedGraph - the condensed graph representation of the DB tables @@ -69,7 +71,8 @@ func NewGraph( ctx context.Context, tx pgx.Tx, tables []*entries.Table, vr []*domains.VirtualReference, ) (*Graph, error) { graph := make([][]*Edge, len(tables)) - reversedGraph := make([][]int, len(tables)) + reversedGraph := make([][]*Edge, len(tables)) + reversedSimpleGraph := make([][]int, len(tables)) edges := make([]*Edge, 0) var edgeIdSequence int @@ -102,8 +105,21 @@ func NewGraph( edge, ) + reversedEdge := NewEdge( + edgeIdSequence, + idx, + ref.IsNullable, + NewTableLink(referenceTableIdx, tables[referenceTableIdx], NewKeysByColumn(tables[referenceTableIdx].PrimaryKey), nil), + NewTableLink(idx, table, NewKeysByColumn(ref.ReferencedKeys), nil), + ) + reversedGraph[referenceTableIdx] = append( reversedGraph[referenceTableIdx], + reversedEdge, + ) + + reversedSimpleGraph[referenceTableIdx] = append( + reversedSimpleGraph[referenceTableIdx], idx, ) edges = append(edges, edge) @@ -137,8 +153,8 @@ func NewGraph( edge, ) - reversedGraph[referenceTableIdx] = append( - reversedGraph[referenceTableIdx], + reversedSimpleGraph[referenceTableIdx] = append( + reversedSimpleGraph[referenceTableIdx], idx, ) edges = append(edges, edge) @@ -148,18 +164,27 @@ func NewGraph( } } g := &Graph{ - tables: tables, - graph: graph, - paths: make(map[int]*Path), - edges: edges, - visited: make([]int, len(tables)), - order: make([]int, 0), - reversedGraph: reversedGraph, + tables: tables, + graph: graph, + paths: make(map[int]*Path), + edges: edges, + visited: make([]int, len(tables)), + order: make([]int, 0), + reversedSimpleGraph: reversedSimpleGraph, + reversedGraph: reversedGraph, } g.buildCondensedGraph() return g, nil } +func (g *Graph) Tables() []*entries.Table { + return g.tables +} + +func (g *Graph) ReversedGraph() [][]*Edge { + return g.reversedGraph +} + func (g *Graph) GetTables() []*entries.Table { return g.tables } @@ -263,7 +288,7 @@ func (g *Graph) topologicalSortDfs(v int) { func (g *Graph) markComponentDfs(v, component int) { g.visited[v] = component - for _, to := range g.reversedGraph[v] { + for _, to := range g.reversedSimpleGraph[v] { if g.visited[to] == sscVertexIsNotVisited { g.markComponentDfs(to, component) } diff --git a/internal/db/postgres/subset/table_link.go b/internal/db/postgres/subset/table_link.go index 4574adf8..41cce56c 100644 --- a/internal/db/postgres/subset/table_link.go +++ b/internal/db/postgres/subset/table_link.go @@ -52,3 +52,19 @@ func NewTableLink(idx int, t *entries.Table, keys []*Key, polymorphicExprs []str polymorphicExprs: polymorphicExprs, } } + +func (tl *TableLink) Index() int { + return tl.idx +} + +func (tl *TableLink) Table() *entries.Table { + return tl.table +} + +func (tl *TableLink) Keys() []*Key { + return tl.keys +} + +func (tl *TableLink) PolymorphicExprs() []string { + return tl.polymorphicExprs +} diff --git a/internal/domains/config.go b/internal/domains/config.go index c7720ddc..101e1816 100644 --- a/internal/domains/config.go +++ b/internal/domains/config.go @@ -15,6 +15,7 @@ package domains import ( + "maps" "sync" "github.com/greenmaskio/greenmask/internal/db/postgres/pgdump" @@ -121,15 +122,9 @@ type DataRestorationErrorExclusions struct { Global *GlobalDataRestorationErrorExclusions `mapstructure:"global" yaml:"global" json:"global,omitempty"` } -type TransformerSettings struct { - NoValidateSchema bool `mapstructure:"no_validate_schema" yaml:"no_validate_schema" json:"no_validate_schema,omitempty"` - ResolvedValidationWarnings []string `mapstructure:"resolved_validation_warnings" yaml:"resolved_validation_warnings" json:"resolved_validation_warnings,omitempty"` -} - type TransformerConfig struct { - Name string `mapstructure:"name" yaml:"name" json:"name,omitempty"` - ApplyForReferences bool `mapstructure:"apply_for_references" yaml:"apply_for_references" json:"apply_for_references,omitempty"` - Settings *TransformerSettings `mapstructure:"settings,omitempty" yaml:"settings,omitempty" json:"settings,omitempty"` + Name string `mapstructure:"name" yaml:"name" json:"name,omitempty"` + ApplyForReferences bool `mapstructure:"apply_for_references" yaml:"apply_for_references" json:"apply_for_references,omitempty"` // Params - transformation parameters. It might be any type. If structure should be stored as raw json // This cannot be parsed with mapstructure due to uncontrollable lowercasing // https://github.com/spf13/viper/issues/373 @@ -148,6 +143,17 @@ type TransformerConfig struct { When string `mapstructure:"when" yaml:"when" json:"when,omitempty"` } +func (tc *TransformerConfig) Clone() *TransformerConfig { + return &TransformerConfig{ + Name: tc.Name, + ApplyForReferences: tc.ApplyForReferences, + Params: maps.Clone(tc.Params), + DynamicParams: maps.Clone(tc.DynamicParams), + When: tc.When, + } + +} + type Table struct { Schema string `mapstructure:"schema" yaml:"schema" json:"schema,omitempty"` Name string `mapstructure:"name" yaml:"name" json:"name,omitempty"`