Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented a database subset with circular dependencies: #154

Merged
merged 2 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:

- name: Run integration tests
run: |
docker-compose -f docker-compose-integration.yml -p greenmask up \
docker compose -f docker-compose-integration.yml -p greenmask up \
--renew-anon-volumes --force-recreate --build --exit-code-from greenmask \
--abort-on-container-exit greenmask

Expand Down
10 changes: 9 additions & 1 deletion internal/db/postgres/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func getTable(ctx context.Context, tx pgx.Tx, t *domains.Table) ([]*entries.Tabl
return nil, nil, fmt.Errorf("cannot aply custom query on partitioned table \"%s\".\"%s\": is not supported", table.Schema, table.Name)
}
table.Query = t.Query
table.SubsetConds = t.SubsetConds
table.SubsetConds = escapeSubsetConds(t.SubsetConds)

if table.RelKind == 'p' {
if !t.ApplyForInherited {
Expand Down Expand Up @@ -406,3 +406,11 @@ func getTableConstraints(ctx context.Context, tx pgx.Tx, tableOid toolkit.Oid, v

return constraints, nil
}

func escapeSubsetConds(conds []string) []string {
var res []string
for _, c := range conds {
res = append(res, fmt.Sprintf(`( %s )`, c))
}
return res
}
2 changes: 1 addition & 1 deletion internal/db/postgres/dumpers/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (td *TableDumper) Execute(ctx context.Context, tx pgx.Tx, st storages.Stora
if doneErr != nil {
log.Warn().Err(err).Msg("error terminating transformation pipeline")
}
return fmt.Errorf("error processing table dump: %w", err)
return fmt.Errorf("error processing table dump %s.%s: %w", td.table.Schema, td.table.Name, err)
}
log.Debug().Msg("transformation pipeline executed successfully")
return pipeline.Done(gtx)
Expand Down
19 changes: 0 additions & 19 deletions internal/db/postgres/dumpers/transformation_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"context"
"fmt"
"io"
"os"
"path"
"slices"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -48,7 +46,6 @@ type TransformationPipeline struct {
Transform TransformationFunc
isAsync bool
record *toolkit.Record
cycleResolutionFiles []io.ReadWriteCloser
}

func NewTransformationPipeline(ctx context.Context, eg *errgroup.Group, table *entries.Table, w io.Writer) (*TransformationPipeline, error) {
Expand Down Expand Up @@ -130,17 +127,6 @@ func (tp *TransformationPipeline) Init(ctx context.Context) error {
}
}

// Initialize cycle resolution store files
tp.cycleResolutionFiles = make([]io.ReadWriteCloser, len(tp.table.CycleResolutionOps))
for cycleResOpIdx, op := range tp.table.CycleResolutionOps {
file, err := os.Create(path.Join(tmpFilePath, op.FileName))
if err != nil {
closeAllOpenFiles(tp.cycleResolutionFiles, tp.table.CycleResolutionOps[:cycleResOpIdx], true)
return fmt.Errorf("error creating cycle resolution store file: %w", err)
}
tp.cycleResolutionFiles[cycleResOpIdx] = file
}

return nil
}

Expand Down Expand Up @@ -172,9 +158,6 @@ func (tp *TransformationPipeline) Dump(ctx context.Context, data []byte) (err er
return fmt.Errorf("error decoding copy line: %w", err)
}
tp.record.SetRow(tp.row)
if err = storeCycleResolutionOps(tp.record, tp.table.CycleResolutionOps, tp.cycleResolutionFiles); err != nil {
return NewDumpError(tp.table.Schema, tp.table.Name, tp.line, fmt.Errorf("error storing cycle resolution ops: %w", err))
}

_, err = tp.Transform(ctx, tp.record)
if err != nil {
Expand Down Expand Up @@ -225,8 +208,6 @@ func (tp *TransformationPipeline) Done(ctx context.Context) error {
}
}

closeAllOpenFiles(tp.cycleResolutionFiles, tp.table.CycleResolutionOps, false)

if lastErr != nil {
return fmt.Errorf("error terminating initialized transformer: %w", lastErr)
}
Expand Down
71 changes: 0 additions & 71 deletions internal/db/postgres/dumpers/utils.go

This file was deleted.

17 changes: 0 additions & 17 deletions internal/db/postgres/entries/cycle_resolution_op.go

This file was deleted.

4 changes: 1 addition & 3 deletions internal/db/postgres/entries/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
// TODO: Deduplicate SubsetQueries and SubsetInQueries by path
type Table struct {
*toolkit.Table
Query string
// CycleResolutionOps - list of columns and file to store that must be dumped for future cycles resolution
CycleResolutionOps []*CycleResolutionOp
Query string
Owner string
RelKind rune
RootPtSchema string
Expand Down
155 changes: 155 additions & 0 deletions internal/db/postgres/subset/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package subset

import (
"fmt"
"slices"
"sort"
"strings"

"github.com/greenmaskio/greenmask/internal/db/postgres/entries"
)

type Component struct {
id int
// componentGraph - contains the mapping of the vertexes in the component to the edges in the original graph
// if the component contains one vertex and no edges, then there is only one vertex with no cycles
componentGraph map[int][]*Edge
// tables - the vertexes in the component
tables map[int]*entries.Table
// Cycles
cycles [][]*Edge
cyclesIdents map[string]struct{}
keys []string
}

func NewComponent(id int, componentGraph map[int][]*Edge, tables map[int]*entries.Table) *Component {
c := &Component{
id: id,
componentGraph: componentGraph,
tables: tables,
cyclesIdents: make(map[string]struct{}),
}
c.findCycles()
if c.hasCycle() {
c.keys = c.getComponentKeys()
} else {
c.keys = c.getOneTable().PrimaryKey
}

return c
}

func (c *Component) getSubsetConds() []string {
var subsetConds []string
for _, table := range c.tables {
if len(table.SubsetConds) > 0 {
subsetConds = append(subsetConds, table.SubsetConds...)
}
}
return subsetConds
}

func (c *Component) getOneTable() *entries.Table {
if !c.hasCycle() {
for _, table := range c.tables {
return table
}
}
panic("cannot call get one table method for cycled scc")
}

func (c *Component) hasCycle() bool {
return len(c.cycles) > 0
}

// findCycles - finds all cycles in the component
func (c *Component) findCycles() {
visited := make(map[int]bool)
var path []*Edge
recStack := make(map[int]bool)

// Collect and sort all vertices
var vertices []int
for v := range c.componentGraph {
vertices = append(vertices, v)
}
sort.Ints(vertices) // Ensure deterministic order

for _, v := range vertices {
if !visited[v] {
c.findAllCyclesDfs(v, visited, recStack, path)
}
}
}

// findAllCyclesDfs - the basic DFS algorithm adapted to find all cycles in the graph and collect the cycle vertices
func (c *Component) findAllCyclesDfs(v int, visited map[int]bool, recStack map[int]bool, path []*Edge) {
visited[v] = true
recStack[v] = true

// Sort edges to ensure deterministic order
var edges []*Edge
edges = append(edges, c.componentGraph[v]...)
sort.Slice(edges, func(i, j int) bool {
return edges[i].to.idx < edges[j].to.idx
})

for _, to := range edges {

path = append(path, to)
if !visited[to.idx] {
c.findAllCyclesDfs(to.idx, visited, recStack, path)
} else if recStack[to.idx] {
// Cycle detected
var cycle []*Edge
for idx := len(path) - 1; idx >= 0; idx-- {
cycle = append(cycle, path[idx])
if path[idx].from.idx == to.to.idx {
break
}
}
cycleId := getCycleIdent(cycle)
if _, ok := c.cyclesIdents[cycleId]; !ok {
res := slices.Clone(cycle)
slices.Reverse(res)
c.cycles = append(c.cycles, res)
c.cyclesIdents[cycleId] = struct{}{}
}
}
path = path[:len(path)-1]
}

recStack[v] = false
}

func getCycleIdent(cycle []*Edge) string {
ids := make([]string, 0, len(cycle))
for _, edge := range cycle {
ids = append(ids, fmt.Sprintf("%d", edge.id))
}
slices.Sort(ids)
return strings.Join(ids, "_")
}

func (c *Component) getComponentKeys() []string {
if len(c.cycles) > 1 {
panic("IMPLEMENT ME: multiple cycles in the component")
}
if !c.hasCycle() {
return c.getOneTable().PrimaryKey
}

var vertexes []int
for _, edge := range c.cycles[0] {
vertexes = append(vertexes, edge.to.idx)
}

var keys []string
for _, v := range vertexes {
table := c.tables[v]
for _, key := range table.PrimaryKey {
keys = append(keys, fmt.Sprintf(`%s__%s__%s`, table.Schema, table.Name, key))
}
}
return keys
}
13 changes: 13 additions & 0 deletions internal/db/postgres/subset/component_link.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package subset

type ComponentLink struct {
idx int
component *Component
}

func NewComponentLink(idx int, c *Component, keys, overriddenKeys []string) *ComponentLink {
return &ComponentLink{
idx: idx,
component: c,
}
}
Loading