Skip to content

Commit

Permalink
Merge pull request #200 from GreenmaskIO/feat/subset_polymorphic_fk
Browse files Browse the repository at this point in the history
feat: Advanced subset: Polymorphic relationships
  • Loading branch information
wwoytenko authored Sep 18, 2024
2 parents 09e8f3b + da3dc4b commit ae50218
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 44 deletions.
139 changes: 137 additions & 2 deletions docs/database_subset.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ subset_conds:
is recommended to [restore tables in topological order](commands/restore.md#restoration-in-topological-order) using
`--restore-in-order`.


## References with NULL values

For references that **do not have** `NOT NULL` constraints, Greenmask will automatically generate `LEFT JOIN` queries
Expand Down Expand Up @@ -114,6 +113,39 @@ dump:
7. `not_null` - is FK has not null constraint. If `true` Default it is `false`
8. `expression` - expression that is used to get the value of the column in the referencing table

## Plymorphyc references

Greenmask supports polymorphic references. You can define a virtual reference for a table with polymorphic references
using `polymorphic_exprs` attribute. The `polymorphic_exprs` attribute is a list of expressions that are used to make
a polymorphic reference. For instance we might have a table `comments` that has polymorphic reference to `posts` and
`videos`. The table comments might have `commentable_id` and `commentable_type` columns. The `commentable_type` column
contains the type of the table that is referenced by the `commentable_id` column. The example of the config:

```yaml title="Polymorphic references example"
dump:
virtual_references:
- schema: "public"
name: "comments"
references:
- schema: "public"
name: "videos"
polymorphic_exprs:
- "public.comments.commentable_type = 'video'"
columns:
- name: "commentable_id"
- schema: "public"
name: "posts"
polymorphic_exprs:
- "public.comments.commentable_type = 'post'"
columns:
- name: "commentable_id"
```

!!! warning

The plimorphic references cannot be non_null because the `commentable_id` column can be `NULL` if the
`commentable_type` is not set or different that the values defined in the `polymorphic_exprs` attribute.

## Troubleshooting

### Exclude the records that has NULL values in the referenced column
Expand Down Expand Up @@ -141,7 +173,7 @@ section.

If you see the error message `ERROR: column reference "{column name}" is ambiguous`, you have specified the column name
without the table and/or schema name. To avoid ambiguity, always specify the schema and table name when pointing out the
column to filter by. For instance if you want to filter employees by `employee_id` column, you should
column to filter by. For instance if you want to filter employees by `employee_id` column, you should
use `public.employees.employee_id` instead of `employee_id`.

```postgresql title="Valid subset condition"
Expand Down Expand Up @@ -479,3 +511,106 @@ As a result, the `customers` table will be dumped with the `orders` table and it
`order_items`, and `audit_logs`. The subset condition will be applied to the `customers` table, and the data will be
filtered based on the `customer_id` column.

## Example: Dump a subset with polymorphic references

In this example, we will create a subset of the tables with polymorphic references. This example includes the
`comments` table and its related tables `posts` and `videos`.

```postgresql title="Create tables with polymorphic references and insert data"
-- Create the Posts table
CREATE TABLE posts
(
id SERIAL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
content TEXT NOT NULL
);
-- Create the Videos table
CREATE TABLE videos
(
id SERIAL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
url VARCHAR(255) NOT NULL
);
-- Create the Comments table with a polymorphic reference
CREATE TABLE comments
(
id SERIAL PRIMARY KEY,
commentable_id INT NOT NULL, -- Will refer to either posts.id or videos.id
commentable_type VARCHAR(50) NOT NULL, -- Will store the type of the associated record
body TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Insert data into the Posts table
INSERT INTO posts (title, content)
VALUES ('First Post', 'This is the content of the first post.'),
('Second Post', 'This is the content of the second post.');
-- Insert data into the Videos table
INSERT INTO videos (title, url)
VALUES ('First Video', 'https://example.com/video1'),
('Second Video', 'https://example.com/video2');
-- Insert data into the Comments table, associating some comments with posts and others with videos
-- For posts:
INSERT INTO comments (commentable_id, commentable_type, body)
VALUES (1, 'post', 'This is a comment on the first post.'),
(2, 'post', 'This is a comment on the second post.');
-- For videos:
INSERT INTO comments (commentable_id, commentable_type, body)
VALUES (1, 'video', 'This is a comment on the first video.'),
(2, 'video', 'This is a comment on the second video.');
```

The `comments` table has a polymorphic reference to the `posts` and `videos` tables. Depending on the value of the
`commentable_type` column, the `commentable_id` column will reference either the `posts.id` or `videos.id` column.

The following example demonstrates how to make a subset for tables with polymorphic references.

```yaml title="Subset configuration example"
dump:
virtual_references:
- schema: "public"
name: "comments"
references:
- schema: "public"
name: "posts"
polymorphic_exprs:
- "public.comments.commentable_type = 'post'"
columns:
- name: "commentable_id"
- schema: "public"
name: "videos"
polymorphic_exprs:
- "public.comments.commentable_type = 'video'"
columns:
- name: "commentable_id"
transformation:
- schema: "public"
name: "posts"
subset_conds:
- "public.posts.id in (1)"
```

This example selects only the first post from the `posts` table and its related comments from the `comments` table.
The comments are associated with `videos` are included without filtering because the subset condition is applied only to
the `posts` table and related comments.

The resulted records will be:

```plaintext
transformed=# select * from comments;
id | commentable_id | commentable_type | body | created_at
----+----------------+------------------+---------------------------------------+----------------------------
1 | 1 | post | This is a comment on the first post. | 2024-09-18 05:27:54.217405
2 | 2 | post | This is a comment on the second post. | 2024-09-18 05:27:54.217405
3 | 1 | video | This is a comment on the first video. | 2024-09-18 05:27:54.229794
(3 rows)
```

10 changes: 10 additions & 0 deletions internal/db/postgres/context/virtual_references.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ func validateReference(vrIdx int, v *domains.Reference, fkT, pkT *entries.Table)
res = append(res, w)
}

if len(v.PolymorphicExprs) > 0 && v.NotNull {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: polymorphic expressions cannot be used with not null reference").
AddMeta("ReferenceIdx", vrIdx).
AddMeta("ReferenceName", v.Name).
AddMeta("ReferenceSchema", v.Schema)
res = append(res, w)
}

for idx, c := range v.Columns {
var vrWarns toolkit.ValidationWarnings
for _, w := range validateColumn(idx, c, fkT) {
Expand Down
14 changes: 14 additions & 0 deletions internal/db/postgres/subset/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ func NewComponent(id int, componentGraph map[int][]*Edge, tables map[int]*entrie
return c
}

func (c *Component) hasPolymorphicExpressions() bool {
for _, edges := range c.componentGraph {
for _, edge := range edges {
if len(edge.from.polymorphicExprs) > 0 {
return true
}
if len(edge.to.polymorphicExprs) > 0 {
return true
}
}
}
return false
}

func (c *Component) getSubsetConds() []string {
var subsetConds []string
for _, table := range c.tables {
Expand Down
4 changes: 4 additions & 0 deletions internal/db/postgres/subset/condenced_edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func NewCondensedEdge(id int, from, to *ComponentLink, originalEdge *Edge) *Cond
}
}

func (e *CondensedEdge) hasPolymorphicExpressions() bool {
return len(e.originalEdge.from.polymorphicExprs) > 0 || len(e.originalEdge.to.polymorphicExprs) > 0
}

func sortCondensedEdges(graph [][]*CondensedEdge) []int {
stack := make([]int, 0)
visited := make([]bool, len(graph))
Expand Down
59 changes: 27 additions & 32 deletions internal/db/postgres/subset/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func NewGraph(
edgeIdSequence,
referenceTableIdx,
ref.IsNullable,
NewTableLink(idx, table, NewKeysByColumn(ref.ReferencedKeys)),
NewTableLink(referenceTableIdx, tables[referenceTableIdx], NewKeysByColumn(tables[referenceTableIdx].PrimaryKey)),
NewTableLink(idx, table, NewKeysByColumn(ref.ReferencedKeys), nil),
NewTableLink(referenceTableIdx, tables[referenceTableIdx], NewKeysByColumn(tables[referenceTableIdx].PrimaryKey), nil),
)
graph[idx] = append(
graph[idx],
Expand Down Expand Up @@ -128,8 +128,8 @@ func NewGraph(
edgeIdSequence,
referenceTableIdx,
!ref.NotNull,
NewTableLink(idx, table, NewKeysByReferencedColumn(ref.Columns)),
NewTableLink(referenceTableIdx, tables[referenceTableIdx], NewKeysByColumn(tables[referenceTableIdx].PrimaryKey)),
NewTableLink(idx, table, NewKeysByReferencedColumn(ref.Columns), ref.PolymorphicExprs),
NewTableLink(referenceTableIdx, tables[referenceTableIdx], NewKeysByColumn(tables[referenceTableIdx].PrimaryKey), nil),
)
graph[idx] = append(
graph[idx],
Expand Down Expand Up @@ -187,7 +187,7 @@ func (g *Graph) findSubsetVertexes() {
for v := range g.condensedGraph {
path := NewPath(v)
var from, fullFrom []*CondensedEdge
if len(g.scc[v].getSubsetConds()) > 0 {
if len(g.scc[v].getSubsetConds()) > 0 || g.scc[v].hasPolymorphicExpressions() {
path.AddVertex(v)
}
g.subsetDfs(path, v, &fullFrom, &from, rootScopeId)
Expand All @@ -203,7 +203,7 @@ func (g *Graph) subsetDfs(path *Path, v int, fullFrom, from *[]*CondensedEdge, s
*fullFrom = append(*fullFrom, to)
*from = append(*from, to)
currentScopeId := scopeId
if len(g.scc[to.to.idx].getSubsetConds()) > 0 {
if len(g.scc[to.to.idx].getSubsetConds()) > 0 || (*fullFrom)[len(*fullFrom)-1].hasPolymorphicExpressions() {
for _, e := range *from {
currentScopeId = path.AddEdge(e, currentScopeId)
}
Expand Down Expand Up @@ -555,7 +555,19 @@ func (g *Graph) generateQueryForTables(path *Path, scopeEdge *ScopeEdge) string
leftTable := originalEdge.from.table
leftTableConds = append(leftTableConds, k.GetKeyReference(leftTable))
}
var exprs []string
if len(originalEdge.from.polymorphicExprs) > 0 {
exprs = append(exprs, originalEdge.from.polymorphicExprs...)
}
query = fmt.Sprintf("((%s) IN (%s))", strings.Join(leftTableConds, ", "), query)
if len(exprs) > 0 {
query = fmt.Sprintf(
"((%s) AND (%s) IN (%s))",
strings.Join(leftTableConds, ", "),
strings.Join(exprs, "AND"),
query,
)
}

if scopeEdge.isNullable {
var nullableChecks []string
Expand Down Expand Up @@ -946,12 +958,20 @@ func generateIntegrityChecksForNullableEdges(nullabilityMap map[int]bool, edges
for idx := range e.from.keys {
leftTableKey := e.from.keys
rightTableKey := e.to.keys
polymorphicExpr := ""
if len(e.from.polymorphicExprs) > 0 {
polymorphicExpr = fmt.Sprintf(" OR NOT (%s)", strings.Join(e.from.polymorphicExprs, " AND "))
}
k := fmt.Sprintf(
`(%s IS NULL OR %s IS NOT NULL)`,
`(%s IS NULL OR %s IS NOT NULL%s)`,
leftTableKey[idx].GetKeyReference(e.from.table),
rightTableKey[idx].GetKeyReference(e.to.table),
polymorphicExpr,
)
if _, ok := overriddenTables[e.to.table.Oid]; ok {
if polymorphicExpr != "" {
panic("IMPLEMENT ME: polymorphic expression for overridden table")
}
k = fmt.Sprintf(
`(%s IS NULL OR "%s"."%s" IS NOT NULL)`,
leftTableKey[idx].GetKeyReference(e.from.table),
Expand Down Expand Up @@ -1108,31 +1128,6 @@ func shiftUntilVertexWillBeFirst(v *Edge, c []*Edge) []*Edge {
return res
}

func validateVirtualReference(r *domains.Reference, pkT, fkT *entries.Table) error {
// TODO: Create ValidationWarning for it
keys := getReferencedKeys(r)
if len(keys) == 0 {
return fmt.Errorf("no keys found in reference %s.%s", r.Schema, r.Name)
}
if len(keys) != len(pkT.PrimaryKey) {
return fmt.Errorf("number of keys in reference %s.%s does not match primary key of %s.%s", r.Schema, r.Name, pkT.Schema, pkT.Name)
}
for _, col := range r.Columns {
if col.Name == "" && col.Expression == "" {
return fmt.Errorf("empty column name and expression in reference %s.%s", r.Schema, r.Name)
}
if col.Name != "" && col.Expression != "" {
return fmt.Errorf("only name or expression should be set in reference item at the same time %s.%s", r.Schema, r.Name)
}
if col.Name != "" && !slices.ContainsFunc(fkT.Columns, func(column *toolkit.Column) bool {
return column.Name == col.Name
}) {
return fmt.Errorf("column %s not found in table %s.%s", col.Name, fkT.Schema, fkT.Name)
}
}
return nil
}

func getVirtualReferences(vr []*domains.VirtualReference, t *entries.Table) []*domains.Reference {
idx := slices.IndexFunc(vr, func(r *domains.VirtualReference) bool {
return r.Schema == t.Schema && r.Name == t.Name
Expand Down
13 changes: 13 additions & 0 deletions internal/db/postgres/subset/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ func generateJoinClauseForDroppedEdge(edge *Edge, initTableName string) string {
rightPart := edge.to.keys[idx].GetKeyReference(rightTable.table)
conds = append(conds, fmt.Sprintf(`%s = %s`, leftPart, rightPart))
}
if len(edge.from.polymorphicExprs) > 0 {
conds = append(conds, edge.from.polymorphicExprs...)
}
if len(edge.to.polymorphicExprs) > 0 {
conds = append(conds, edge.to.polymorphicExprs...)
}

rightTableName := fmt.Sprintf(`"%s"."%s"`, edge.to.table.Schema, edge.to.table.Name)

Expand Down Expand Up @@ -72,6 +78,13 @@ func generateJoinClauseV2(edge *Edge, joinType string, overriddenTables map[tool
}
}

if len(edge.from.polymorphicExprs) > 0 {
conds = append(conds, edge.from.polymorphicExprs...)
}
if len(edge.to.polymorphicExprs) > 0 {
conds = append(conds, edge.to.polymorphicExprs...)
}

rightTableName := fmt.Sprintf(`"%s"."%s"`, rightTable.Table.Schema, rightTable.Table.Name)
if override, ok := overriddenTables[rightTable.Table.Oid]; ok {
rightTableName = override
Expand Down
12 changes: 8 additions & 4 deletions internal/db/postgres/subset/table_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ type TableLink struct {
idx int
table *entries.Table
keys []*Key
// polymorphicExprs - polymorphicExprs for single conditions that are not used to match FK and PK values
// this might be used for polymorphic relations
polymorphicExprs []string
}

func NewTableLink(idx int, t *entries.Table, keys []*Key) *TableLink {
func NewTableLink(idx int, t *entries.Table, keys []*Key, polymorphicExprs []string) *TableLink {
return &TableLink{
idx: idx,
table: t,
keys: keys,
idx: idx,
table: t,
keys: keys,
polymorphicExprs: polymorphicExprs,
}
}
9 changes: 5 additions & 4 deletions internal/domains/virtual_references.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ type ReferencedColumn struct {
}

type Reference struct {
Schema string `mapstructure:"schema" json:"schema" yaml:"schema"`
Name string `mapstructure:"name" json:"name" yaml:"name"`
NotNull bool `mapstructure:"not_null" json:"not_null" yaml:"not_null"`
Columns []*ReferencedColumn `mapstructure:"columns" json:"columns" yaml:"columns"`
Schema string `mapstructure:"schema" json:"schema" yaml:"schema"`
Name string `mapstructure:"name" json:"name" yaml:"name"`
NotNull bool `mapstructure:"not_null" json:"not_null" yaml:"not_null"`
Columns []*ReferencedColumn `mapstructure:"columns" json:"columns" yaml:"columns"`
PolymorphicExprs []string `mapstructure:"polymorphic_exprs" json:"polymorphic_exprs" yaml:"polymorphic_exprs"`
}

type VirtualReference struct {
Expand Down
Loading

0 comments on commit ae50218

Please sign in to comment.