Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Addressed bugs related to partitioned tables #241

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/built_in_transformers/transformation_inheritance.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ configuration:
transformers:
- name: RandomDate
params:
min: "2000-01-01"
max: "2005-01-01"
min: "2022-01-01"
max: "2022-03-01"
column: "sale_date"
engine: "random"
```
Expand Down
8 changes: 6 additions & 2 deletions internal/db/postgres/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@ func (d *Dump) createTocEntries() error {
Original: v.OriginalSize,
Compressed: v.CompressedSize,
}
tablesEntry = append(tablesEntry, entry)
if v.RelKind != 'p' {
// Do not create TOC entry for partitioned tables because they are not dumped. Only their partitions are
// dumped
tablesEntry = append(tablesEntry, entry)
}
tables = append(tables, v)
case *entries.Sequence:
sequences = append(sequences, entry)
Expand Down Expand Up @@ -529,7 +533,7 @@ func (d *Dump) MergeTocEntries(schemaEntries []*toc.Entry, dataEntries []*toc.En
res := make([]*toc.Entry, 0, len(schemaEntries)+len(dataEntries))

preDataEnd := 0
postDataStart := 0
postDataStart := len(schemaEntries) - 1

// Find predata last index and postdata first index
for idx, item := range schemaEntries {
Expand Down
70 changes: 32 additions & 38 deletions internal/db/postgres/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,52 +550,36 @@ func (r *Restore) logWarningsIfHasCycles() {
}
}

func (r *Restore) sortTocEntriesInTopoOrder() []*toc.Entry {
res := make([]*toc.Entry, 0, len(r.tocObj.Entries))

preDataEnd := 0
postDataStart := 0

func (r *Restore) sortTocEntriesInTopoOrder(entries []*toc.Entry) []*toc.Entry {
r.logWarningsIfHasCycles()

// Find predata last index and postdata first index
for idx, item := range r.tocObj.Entries {
if item.Section == toc.SectionPreData {
preDataEnd = idx
}
if item.Section == toc.SectionPostData {
postDataStart = idx
break
}
}
dataEntries := r.tocObj.Entries[preDataEnd+1 : postDataStart]
lastTableIdx := slices.IndexFunc(dataEntries, func(entry *toc.Entry) bool {
return *entry.Desc == toc.SequenceSetDesc || *entry.Desc == toc.BlobsDesc
})
tableEntries := dataEntries
if lastTableIdx != -1 {
tableEntries = dataEntries[:lastTableIdx]
}
sortedTablesEntries := make([]*toc.Entry, 0, len(tableEntries))
// Find data section entries
sortedTablesEntries := make([]*toc.Entry, 0, len(entries))
for _, dumpId := range r.metadata.DumpIdsOrder {
idx := slices.IndexFunc(tableEntries, func(entry *toc.Entry) bool {
idx := slices.IndexFunc(entries, func(entry *toc.Entry) bool {
return entry.DumpId == dumpId
})
if idx == -1 {
tableOid, ok := r.metadata.DumpIdsToTableOid[dumpId]
if !ok {
panic(fmt.Sprintf("table with dumpId %d is not found in dumpId to Oids map", dumpId))
}
skippedTableIdx := slices.IndexFunc(r.metadata.DatabaseSchema, func(t *toolkit.Table) bool {
return t.Oid == tableOid
})
if skippedTableIdx == -1 {
panic(fmt.Sprintf("table with oid %d is not found in DatabaseSchema", tableOid))
}
log.Debug().
Int32("DumpId", dumpId).
Msg("entry not found in table entries it might be excluded from dump")
Str("SchemaName", r.metadata.DatabaseSchema[skippedTableIdx].Schema).
Str("TableName", r.metadata.DatabaseSchema[skippedTableIdx].Name).
Msg("table might be excluded from dump or it is a partitioned table (not partition itself): table is not found in dump entries")
continue
}
sortedTablesEntries = append(sortedTablesEntries, tableEntries[idx])
}

res = append(res, r.tocObj.Entries[:preDataEnd+1]...)
res = append(res, sortedTablesEntries...)
if lastTableIdx != -1 {
res = append(res, dataEntries[lastTableIdx:]...)
sortedTablesEntries = append(sortedTablesEntries, entries[idx])
}
res = append(res, r.tocObj.Entries[postDataStart:]...)
return res
return sortedTablesEntries
}

func (r *Restore) waitDependenciesAreRestore(ctx context.Context, deps []int32) error {
Expand All @@ -615,9 +599,9 @@ func (r *Restore) waitDependenciesAreRestore(ctx context.Context, deps []int32)
func (r *Restore) taskPusher(ctx context.Context, tasks chan restorers.RestoreTask) func() error {
return func() error {
defer close(tasks)
tocEntries := r.tocObj.Entries
tocEntries := getDataSectionTocEntries(r.tocObj.Entries)
if r.restoreOpt.RestoreInOrder {
tocEntries = r.sortTocEntriesInTopoOrder()
tocEntries = r.sortTocEntriesInTopoOrder(tocEntries)
}
for _, entry := range tocEntries {
select {
Expand Down Expand Up @@ -842,3 +826,13 @@ func removeEscapeQuotes(v string) string {
}
return v
}

func getDataSectionTocEntries(tocEntries []*toc.Entry) []*toc.Entry {
var dataSectionEntries []*toc.Entry
for _, entry := range tocEntries {
if entry.Section == toc.SectionData {
dataSectionEntries = append(dataSectionEntries, entry)
}
}
return dataSectionEntries
}
4 changes: 2 additions & 2 deletions internal/db/postgres/context/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func isEndToEndPKFK(graph *subset.Graph, table *entries.Table) bool {
return foundInFK
}

func findPartitionsOfPartitionedTable(ctx context.Context, tx pgx.Tx, t *entries.Table) ([]toolkit.Oid, error) {
func findPartitionsOfPartitionedTable(ctx context.Context, tx pgx.Tx, t *toolkit.Table) ([]toolkit.Oid, error) {
log.Debug().
Str("TableSchema", t.Schema).
Str("TableName", t.Name).
Expand Down Expand Up @@ -615,7 +615,7 @@ func checkTransformerAlreadyExists(
func setupConfigForPartitionedTableChildren(
ctx context.Context, tx pgx.Tx, parentTcm *tableConfigMapping, tables []*entries.Table, cfg []*domains.Table,
) ([]*tableConfigMapping, error) {
parts, err := findPartitionsOfPartitionedTable(ctx, tx, parentTcm.entry)
parts, err := findPartitionsOfPartitionedTable(ctx, tx, parentTcm.entry.Table)
if err != nil {
return nil, fmt.Errorf(
"cannot find partitions of the table %s.%s: %w",
Expand Down
34 changes: 30 additions & 4 deletions internal/db/postgres/context/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package context

import (
"context"
"slices"

"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"

"github.com/greenmaskio/greenmask/internal/db/postgres/pgdump"
"github.com/greenmaskio/greenmask/pkg/toolkit"
Expand All @@ -12,7 +14,7 @@ import (
func getDatabaseSchema(
ctx context.Context, tx pgx.Tx, options *pgdump.Options, version int,
) ([]*toolkit.Table, error) {
var res []*toolkit.Table
var tables []*toolkit.Table
query, err := buildSchemaIntrospectionQuery(
options.Table, options.ExcludeTable,
options.IncludeForeignData, options.Schema,
Expand All @@ -36,11 +38,11 @@ func getDatabaseSchema(
if err != nil {
return nil, err
}
res = append(res, table)
tables = append(tables, table)
}

// fill columns
for _, table := range res {
for _, table := range tables {
// We do not exclude generated columns here, because the schema must be compared with the original
columns, err := getColumnsConfig(ctx, tx, table.Oid, version, false)
if err != nil {
Expand All @@ -49,5 +51,29 @@ func getDatabaseSchema(
table.Columns = columns
}

return res, nil
// 1. Find partitioned tables
// 2. Find all children of partitioned tables
// 3. Find children in the tables
// 4. Set RootPtSchema, RootPtName, RootPtOid for children
for _, table := range tables {
if table.Kind != "p" || table.Parent != 0 {
continue
}
for _, ptOId := range table.Children {
idx := slices.IndexFunc(tables, func(table *toolkit.Table) bool {
return table.Oid == ptOId
})
if idx == -1 {
log.Debug().
Int("TableOid", int(ptOId)).
Msg("table might be excluded: unable to find partitioned table")
continue
}
t := tables[idx]
t.RootPtName = table.Name
t.RootPtSchema = table.Schema
t.RootPtOid = table.Oid
}
}
return tables, nil
}
12 changes: 10 additions & 2 deletions internal/db/postgres/restorers/table_insert_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,18 @@ func (td *TableRestorerInsertFormat) generateInsertStmt(onConflictDoNothing bool
overridingSystemValue = "OVERRIDING SYSTEM VALUE "
}

tableName := *td.Entry.Tag
tableSchema := *td.Entry.Namespace

if td.Table.RootPtOid != 0 {
tableName = td.Table.RootPtName
tableSchema = td.Table.RootPtSchema
}

res := fmt.Sprintf(
`INSERT INTO %s.%s (%s) %sVALUES(%s)%s`,
*td.Entry.Namespace,
*td.Entry.Tag,
tableSchema,
tableName,
strings.Join(columnNames, ", "),
overridingSystemValue,
strings.Join(placeholders, ", "),
Expand Down
24 changes: 14 additions & 10 deletions pkg/toolkit/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ type Reference struct {
}

type Table struct {
Schema string `json:"schema"`
Name string `json:"name"`
Oid Oid `json:"oid"`
Columns []*Column `json:"columns"`
Kind string `json:"kind"`
Parent Oid `json:"parent"`
Children []Oid `json:"children"`
Size int64 `json:"size"`
PrimaryKey []string `json:"primary_key"`
Constraints []Constraint `json:"-"`
Schema string `json:"schema"`
Name string `json:"name"`
Oid Oid `json:"oid"`
Columns []*Column `json:"columns"`
Kind string `json:"kind"`
Parent Oid `json:"parent"`
Children []Oid `json:"children"`
Size int64 `json:"size"`
PrimaryKey []string `json:"primary_key"`
// RootPtSchema, RootPtName, RootPtOid - the first parent of the partitioned table
RootPtSchema string `json:"root_pt_schema"`
RootPtName string `json:"root_pt_name"`
RootPtOid Oid `json:"root_pt_oid"`
Constraints []Constraint `json:"-"`
}

func (t *Table) Validate() error {
Expand Down