Skip to content

Commit

Permalink
feat: database subset
Browse files Browse the repository at this point in the history
* Fixed wrong join condition by different edges
* Introduced Path object - it collects edges and vertexes and isolate them within scope if the edges are the same
* Introduced graph object
* Refactored DFS algo for finding all subset vertexes and paths
  • Loading branch information
wwoytenko committed Jul 11, 2024
1 parent a4c1f97 commit cd55b01
Show file tree
Hide file tree
Showing 11 changed files with 481 additions and 295 deletions.
6 changes: 3 additions & 3 deletions internal/db/postgres/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ func NewRuntimeContext(
if hasSubset(tablesEntries) {
// If table has subset the restoration must be in the topological order
// The tables must be dumped one by one
err = subset.GenerateSubsetQueriesForTable(ctx, tx, tablesEntries)
if err != nil {
return nil, fmt.Errorf("error during subset tables: %w", err)
if err = subset.SetSubsetQueries(ctx, tx, tablesEntries); err != nil {
return nil, fmt.Errorf("cannot set subset queries: %w", err)
}

} else {
// if there are no subset tables, we can sort them by size and transformation costs
scoreTablesEntriesAndSort(tablesEntries, cfg)
Expand Down
104 changes: 52 additions & 52 deletions internal/db/postgres/context/pg_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,58 +388,58 @@ func BuildTableSearchQuery(

// -- WHERE c.relkind IN ('r', 'p', '') (array['r', 'S', 'v', 'm', 'f', 'p'])
totalQuery := `
SELECT
c.oid::TEXT::INT,
n.nspname as "Schema",
c.relname as "Name",
pg_catalog.pg_get_userbyid(c.relowner) as "Owner",
pg_catalog.pg_relation_size(c.oid) +
coalesce(
pg_catalog.pg_relation_size(
c.reltoastrelid
),
0
) as "Size",
c.relkind as "RelKind",
(coalesce(pn.nspname, '')) as "rootPtSchema",
(coalesce(pc.relname, '')) as "rootPtName",
(%s) as "ExcludeData", -- data exclusion
CASE
WHEN c.relkind = 'S' THEN
CASE
WHEN pg_sequence_last_value(c.oid::regclass) ISNULL THEN
FALSE
ELSE
TRUE
END
ELSE
FALSE
END AS "IsCalled",
CASE
WHEN c.relkind = 'S' THEN
coalesce(pg_sequence_last_value(c.oid::regclass), sq.seqstart)
ELSE
0
END AS "LastVal"
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_catalog.pg_inherits i ON i.inhrelid = c.oid
LEFT JOIN pg_catalog.pg_class pc ON i.inhparent = pc.oid AND pc.relkind = 'p'
LEFT JOIN pg_catalog.pg_namespace pn ON pc.relnamespace = pn.oid
LEFT JOIN pg_catalog.pg_foreign_table ft ON c.oid = ft.ftrelid
LEFT JOIN pg_catalog.pg_foreign_server s ON s.oid = ft.ftserver
LEFT JOIN pg_catalog.pg_sequence sq ON c.oid = sq.seqrelid
WHERE c.relkind IN ('r', 'f', 'S')
AND %s -- relname inclusion
AND NOT %s -- relname exclusion
AND %s -- schema inclusion
AND NOT %s -- schema exclusion
AND (s.srvname ISNULL OR %s) -- include foreign data
AND n.nspname <> 'pg_catalog'
AND n.nspname !~ '^pg_toast'
AND n.nspname <> 'information_schema'
ORDER BY 5 DESC
`
SELECT
c.oid::TEXT::INT,
n.nspname as "Schema",
c.relname as "Name",
pg_catalog.pg_get_userbyid(c.relowner) as "Owner",
pg_catalog.pg_relation_size(c.oid) +
coalesce(
pg_catalog.pg_relation_size(
c.reltoastrelid
),
0
) as "Size",
c.relkind as "RelKind",
(coalesce(pn.nspname, '')) as "rootPtSchema",
(coalesce(pc.relname, '')) as "rootPtName",
(%s) as "ExcludeData", -- data exclusion
CASE
WHEN c.relkind = 'S' THEN
CASE
WHEN pg_sequence_last_value(c.oid::regclass) ISNULL THEN
FALSE
ELSE
TRUE
END
ELSE
FALSE
END AS "IsCalled",
CASE
WHEN c.relkind = 'S' THEN
coalesce(pg_sequence_last_value(c.oid::regclass), sq.seqstart)
ELSE
0
END AS "LastVal"
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_catalog.pg_inherits i ON i.inhrelid = c.oid
LEFT JOIN pg_catalog.pg_class pc ON i.inhparent = pc.oid AND pc.relkind = 'p'
LEFT JOIN pg_catalog.pg_namespace pn ON pc.relnamespace = pn.oid
LEFT JOIN pg_catalog.pg_foreign_table ft ON c.oid = ft.ftrelid
LEFT JOIN pg_catalog.pg_foreign_server s ON s.oid = ft.ftserver
LEFT JOIN pg_catalog.pg_sequence sq ON c.oid = sq.seqrelid
WHERE c.relkind IN ('r', 'f', 'S')
AND %s -- relname inclusion
AND NOT %s -- relname exclusion
AND %s -- schema inclusion
AND NOT %s -- schema exclusion
AND (s.srvname ISNULL OR %s) -- include foreign data
AND n.nspname <> 'pg_catalog'
AND n.nspname !~ '^pg_toast'
AND n.nspname <> 'information_schema'
ORDER BY 1
`

return fmt.Sprintf(totalQuery, tableDataExclusionCond, tableInclusionCond, tableExclusionCond,
schemaInclusionCond, schemaExclusionCond, foreignDataInclusionCond), nil
Expand Down
56 changes: 23 additions & 33 deletions internal/db/postgres/subset/dfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,56 +12,46 @@ const (
emptyFromValue = -1
)

// FindCycle returns the cycle in the graph provided
// The result contains the cycle vertices in the order they appear in the cycle
func FindCycle(graph [][]*Edge) []int {
// getCycle returns the cycle in the graph provided based on the from slice gathered in findCycleDfs function
func getCycle(from []int, lastEdge int) []int {
var cycle []int
for e := from[lastEdge]; e != lastEdge; e = from[e] {
cycle = append(cycle, e)
}
cycle = append(cycle, lastEdge)
slices.Reverse(cycle)
return cycle
}

// FindAllCycles returns all cycles in the graph provided
// The result contains a slice of cycles, where each cycle is a slice of vertices in the order they appear in the cycle
func FindAllCycles(graph [][]*Edge) [][]int {
var allCycles [][]int
visited := make([]int, len(graph))
from := make([]int, len(graph))
for idx := range from {
from[idx] = emptyFromValue
}
for v := range graph {
// Contains is required for same reason as in TopologicalSort function
if visited[v] == 0 {
cycle := findCycleDfs(graph, v, visited, from)
if len(cycle) > 0 {
return cycle
}
if visited[v] == vertexIsNotVisited {
findAllCyclesDfs(graph, v, visited, from, &allCycles)
}
}
return nil
}

// getCycle returns the cycle in the graph provided based on the from slice gathered in findCycleDfs function
func getCycle(from []int, lastVertex int) []int {
var cycle []int
for v := from[lastVertex]; v != lastVertex; v = from[v] {
cycle = append(cycle, v)
}
cycle = append(cycle, lastVertex)
slices.Reverse(cycle)
return cycle
return allCycles
}

// findCycleDfs - the basic DFS algorithm adapted to find the cycle in the graph and collect the cycle vertices
func findCycleDfs(graph [][]*Edge, v int, visited []int, from []int) []int {
// findAllCyclesDfs - the basic DFS algorithm adapted to find all cycles in the graph and collect the cycle vertices
func findAllCyclesDfs(graph [][]*Edge, v int, visited []int, from []int, allCycles *[][]int) {
visited[v] = vertexIsVisitedAndPrecessing
for _, to := range graph[v] {
if visited[to.Idx] == vertexIsNotVisited {
from[to.Idx] = v
cycle := findCycleDfs(graph, to.Idx, visited, from)
// Return upper in stack the cycle
if len(cycle) > 0 {
return cycle
}
findAllCyclesDfs(graph, to.Idx, visited, from, allCycles)
} else if visited[to.Idx] == vertexIsVisitedAndPrecessing {
// Graph has cycle
// If the cycle is found, then restore it
// Find the vertex where the cycle starts
from[to.Idx] = v
return getCycle(from, to.Idx)
cycle := getCycle(from, to.Idx)
*allCycles = append(*allCycles, cycle)
}
}
visited[v] = vertexIsVisitedAndCompleted
return nil
}
24 changes: 24 additions & 0 deletions internal/db/postgres/subset/edge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package subset

type Edge struct {
Id int
Idx int
A *TableLink
B *TableLink
}

func NewEdge(id, idx int, a *TableLink, b *TableLink) *Edge {
return &Edge{
Id: id,
Idx: idx,
A: a,
B: b,
}
}

func (e *Edge) GetLeftAndRightTable(idx int) (*TableLink, *TableLink) {
if e.A.Idx == idx {
return e.A, e.B
}
return e.B, e.A
}
Loading

0 comments on commit cd55b01

Please sign in to comment.