Skip to content

Commit

Permalink
Merge branch 'main' into feat/database_subset_with_circ_deps_pt4
Browse files Browse the repository at this point in the history
  • Loading branch information
wwoytenko committed Aug 27, 2024
2 parents 7505527 + 80cb1cc commit cce8981
Show file tree
Hide file tree
Showing 37 changed files with 538 additions and 182 deletions.
6 changes: 5 additions & 1 deletion cmd/greenmask/cmd/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func init() {
"pgzip", "", false,
"use pgzip decompression instead of gzip",
)
Cmd.Flags().Int64P(
"batch-size", "", 0,
"the number of rows to insert in a single batch during the COPY command (0 - all rows will be inserted in a single batch)",
)

// Connection options:
Cmd.Flags().StringP("host", "h", "/var/run/postgres", "database server host or socket directory")
Expand All @@ -185,7 +189,7 @@ func init() {
"disable-triggers", "enable-row-security", "if-exists", "no-comments", "no-data-for-failed-tables",
"no-security-labels", "no-subscriptions", "no-table-access-method", "no-tablespaces", "section",
"strict-names", "use-set-session-authorization", "inserts", "on-conflict-do-nothing", "restore-in-order",
"pgzip",
"pgzip", "batch-size",

"host", "port", "username",
} {
Expand Down
1 change: 1 addition & 0 deletions cmd/greenmask/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func initConfig() {
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.StringToSliceHookFunc(","),
)
cfg.ErrorUnused = true
}

if err := viper.Unmarshal(Config, decoderCfg); err != nil {
Expand Down
26 changes: 25 additions & 1 deletion docs/commands/restore.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ allowing you to configure the restoration process as needed.
Mostly it supports the same flags as the `pg_restore` utility, with some extra flags for Greenmask-specific features.

```text title="Supported flags"
--batch-size int the number of rows to insert in a single batch during the COPY command (0 - all rows will be inserted in a single batch)
-c, --clean clean (drop) database objects before recreating
-C, --create create the target database
-a, --data-only restore only the data, no schema
Expand Down Expand Up @@ -112,5 +113,28 @@ If your database has cyclic dependencies you will be notified about it but the r
By default, Greenmask uses gzip decompression to restore data. In mist cases it is quite slow and does not utilize all
available resources and is a bootleneck for IO operations. To speed up the restoration process, you can use
the `--pgzip` flag to use pgzip decompression instead of gzip. This method splits the data into blocks, which are
decompressed in parallel, making it ideal for handling large volumes of data. The output remains a standard gzip file.
decompressed in parallel, making it ideal for handling large volumes of data.

```shell title="example with pgzip decompression"
greenmask --config=config.yml restore latest --pgzip
```

### Restore data batching

The COPY command returns the error only on transaction commit. This means that if you have a large dump and an error
occurs, you will have to wait until the end of the transaction to see the error message. To avoid this, you can use the
`--batch-size` flag to specify the number of rows to insert in a single batch during the COPY command. If an error occurs
during the batch insertion, the error message will be displayed immediately. The data will be committed **only if all
batches are inserted successfully**.

!!! warning

The batch size should be chosen carefully. If the batch size is too small, the restoration process will be slow. If
the batch size is too large, you may not be able to identify the error row.

In the example below, the batch size is set to 1000 rows. This means that 1000 rows will be inserted in a single batch,
so you will be notified of any errors immediately after each batch is inserted.

```shell title="example with batch size"
greenmask --config=config.yml restore latest --batch-size 1000
```
13 changes: 9 additions & 4 deletions internal/db/postgres/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type Dump struct {
// sortedTablesDumpIds - sorted tables dump ids in topological order
sortedTablesDumpIds []int32
// validate shows that dump worker must be in validation mode
validate bool
validate bool
validateRowsLimit uint64
}

func NewDump(cfg *domains.Config, st storages.Storager, registry *utils.TransformerRegistry) *Dump {
Expand Down Expand Up @@ -257,13 +258,17 @@ func (d *Dump) dumpWorkerRunner(
func (d *Dump) taskProducer(ctx context.Context, tasks chan<- dumpers.DumpTask) func() error {
return func() error {
defer close(tasks)
dataObjects := d.context.DataSectionObjects
if d.validate {
dataObjects = d.context.DataSectionObjectsToValidate
}

for _, dumpObj := range d.context.DataSectionObjects {
for _, dumpObj := range dataObjects {
dumpObj.SetDumpId(d.dumpIdSequence)
var task dumpers.DumpTask
switch v := dumpObj.(type) {
case *entries.Table:
task = dumpers.NewTableDumper(v, d.validate, d.pgDumpOptions.Pgzip)
task = dumpers.NewTableDumper(v, d.validate, d.validateRowsLimit, d.pgDumpOptions.Pgzip)
case *entries.Sequence:
task = dumpers.NewSequenceDumper(v)
case *entries.Blobs:
Expand Down Expand Up @@ -332,7 +337,7 @@ func (d *Dump) setDumpDependenciesGraph(tables []*entries.Table) {
return entry.Oid == oid
})
if idx == -1 {
panic("table not found")
panic(fmt.Sprintf("table not found: oid=%d", oid))
}
t := tables[idx]
// Create dependencies graph with DumpId sequence for easier restoration coordination
Expand Down
4 changes: 3 additions & 1 deletion internal/db/postgres/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,9 @@ func (r *Restore) taskPusher(ctx context.Context, tasks chan restorers.RestoreTa
r.cfg.ErrorExclusions, r.restoreOpt.Pgzip,
)
} else {
task = restorers.NewTableRestorer(entry, r.st, r.restoreOpt.ExitOnError, r.restoreOpt.Pgzip)
task = restorers.NewTableRestorer(
entry, r.st, r.restoreOpt.ExitOnError, r.restoreOpt.Pgzip, r.restoreOpt.BatchSize,
)
}

case toc.SequenceSetDesc:
Expand Down
38 changes: 15 additions & 23 deletions internal/db/postgres/cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func NewValidate(cfg *domains.Config, registry *utils.TransformerRegistry, st st
d.dumpIdSequence = toc.NewDumpSequence(0)

d.validate = true
d.validateRowsLimit = cfg.Validate.RowsLimit
return &Validate{
Dump: d,
tmpDir: tmpDirName,
Expand Down Expand Up @@ -139,7 +140,7 @@ func (v *Validate) Run(ctx context.Context) (int, error) {
return v.exitCode, nil
}

if err = v.dumpTables(ctx); err != nil {
if err = v.dataDump(ctx); err != nil {
return nonZeroExitCode, err
}

Expand All @@ -152,12 +153,20 @@ func (v *Validate) Run(ctx context.Context) (int, error) {

func (v *Validate) print(ctx context.Context) error {
for _, e := range v.dataEntries {
idx := slices.IndexFunc(v.context.DataSectionObjects, func(entry entries.Entry) bool {
t := entry.(*entries.Table)
idx := slices.IndexFunc(v.context.DataSectionObjectsToValidate, func(entry entries.Entry) bool {
t, ok := entry.(*entries.Table)
if !ok {
return false
}
return t.DumpId == e.DumpId
})

t := v.context.DataSectionObjects[idx].(*entries.Table)
if idx == -1 {
// skip if not in DataSectionObjectsToValidate
continue
}

t := v.context.DataSectionObjectsToValidate[idx].(*entries.Table)
doc, err := v.createDocument(ctx, t)
if err != nil {
return fmt.Errorf("unable to create validation document: %w", err)
Expand Down Expand Up @@ -214,7 +223,7 @@ func (v *Validate) readRecords(r *bufio.Reader, t *entries.Table) (original, tra
originalRow = pgcopy.NewRow(len(t.Columns))
transformedRow = pgcopy.NewRow(len(t.Columns))

originalLine, err = reader.ReadLine(r)
originalLine, err = reader.ReadLine(r, nil)
if err != nil {
if errors.Is(err, io.EOF) {
return nil, nil, err
Expand All @@ -226,7 +235,7 @@ func (v *Validate) readRecords(r *bufio.Reader, t *entries.Table) (original, tra
return nil, nil, io.EOF
}

transformedLine, err = reader.ReadLine(r)
transformedLine, err = reader.ReadLine(r, nil)
if err != nil {
return nil, nil, fmt.Errorf("unable to read line: %w", err)
}
Expand Down Expand Up @@ -269,23 +278,6 @@ func (v *Validate) createDocument(ctx context.Context, t *entries.Table) (valida
return doc, nil
}

func (v *Validate) dumpTables(ctx context.Context) error {
var tablesWithTransformers []entries.Entry
for _, item := range v.context.DataSectionObjects {

if t, ok := item.(*entries.Table); ok && len(t.TransformersContext) > 0 {
t.ValidateLimitedRecords = v.config.Validate.RowsLimit
tablesWithTransformers = append(tablesWithTransformers, t)
}
}
v.context.DataSectionObjects = tablesWithTransformers

if err := v.dataDump(ctx); err != nil {
return fmt.Errorf("data stage dumping error: %w", err)
}
return nil
}

func (v *Validate) printValidationWarnings() error {
// TODO: Implement warnings hook, such as logging and HTTP sender
for _, w := range v.context.Warnings {
Expand Down
26 changes: 19 additions & 7 deletions internal/db/postgres/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type RuntimeContext struct {
Types []*toolkit.Type
// DataSectionObjects - list of objects to dump in data-section. There are sequences, tables and large objects
DataSectionObjects []entries.Entry
// DataSectionObjectsToValidate - list of objects to validate in data-section
DataSectionObjectsToValidate []entries.Entry
// Warnings - list of occurred ValidationWarning during validation and config building
Warnings toolkit.ValidationWarnings
// Registry - registry of all the registered transformers definition
Expand Down Expand Up @@ -125,14 +127,24 @@ func NewRuntimeContext(
dataSectionObjects = append(dataSectionObjects, blobEntries)
}

// Generate list of tables that might be validated during the validate command call
var dataSectionObjectsToValidate []entries.Entry
for _, item := range dataSectionObjects {

if t, ok := item.(*entries.Table); ok && len(t.TransformersContext) > 0 {
dataSectionObjectsToValidate = append(dataSectionObjectsToValidate, t)
}
}

return &RuntimeContext{
Tables: tables,
Types: types,
DataSectionObjects: dataSectionObjects,
Warnings: warnings,
Registry: r,
DatabaseSchema: schema,
Graph: graph,
Tables: tables,
Types: types,
DataSectionObjects: dataSectionObjects,
Warnings: warnings,
Registry: r,
DatabaseSchema: schema,
Graph: graph,
DataSectionObjectsToValidate: dataSectionObjectsToValidate,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions internal/db/postgres/context/pg_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func getTables(

// Assigning columns, pk and fk for each table
for _, t := range tables {
if len(t.Columns) > 0 {
// Columns were already initialized during the transformer initialization
continue
}
columns, err := getColumnsConfig(ctx, tx, t.Oid, version)
if err != nil {
return nil, nil, fmt.Errorf("unable to collect table columns: %w", err)
Expand Down
6 changes: 6 additions & 0 deletions internal/db/postgres/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ func validateAndBuildTablesConfig(
}
table.Columns = columns

pkColumns, err := getPrimaryKeyColumns(ctx, tx, table.Oid)
if err != nil {
return nil, nil, fmt.Errorf("unable to collect primary key columns: %w", err)
}
table.PrimaryKey = pkColumns

// Assigning overridden column types for driver initialization
if tableCfg.ColumnsTypeOverride != nil {
for _, c := range table.Columns {
Expand Down
22 changes: 12 additions & 10 deletions internal/db/postgres/dumpers/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,30 @@ import (
"fmt"
"io"

"github.com/greenmaskio/greenmask/internal/utils/ioutils"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgproto3"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"

"github.com/greenmaskio/greenmask/internal/db/postgres/entries"
"github.com/greenmaskio/greenmask/internal/storages"
"github.com/greenmaskio/greenmask/internal/utils/ioutils"
)

type TableDumper struct {
table *entries.Table
recordNum uint64
validate bool
usePgzip bool
table *entries.Table
recordNum uint64
validate bool
validateRowsLimit uint64
usePgzip bool
}

func NewTableDumper(table *entries.Table, validate bool, usePgzip bool) *TableDumper {
func NewTableDumper(table *entries.Table, validate bool, rowsLimit uint64, usePgzip bool) *TableDumper {
return &TableDumper{
table: table,
validate: validate,
usePgzip: usePgzip,
table: table,
validate: validate,
usePgzip: usePgzip,
validateRowsLimit: rowsLimit,
}
}

Expand Down Expand Up @@ -162,7 +164,7 @@ func (td *TableDumper) process(ctx context.Context, tx pgx.Tx, w io.WriteCloser,
if td.validate {
// Logic for validation limiter - exit after recordNum rows
td.recordNum++
if td.recordNum == td.table.ValidateLimitedRecords {
if td.recordNum == td.validateRowsLimit {
return nil
}
}
Expand Down
8 changes: 3 additions & 5 deletions internal/db/postgres/entries/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ type Table struct {
CompressedSize int64
ExcludeData bool
Driver *toolkit.Driver
// ValidateLimitedRecords - perform dumping and transformation only for N records and exit
ValidateLimitedRecords uint64
Scores int64
SubsetConds []string
Scores int64
SubsetConds []string
}

func (t *Table) HasCustomTransformer() bool {
Expand Down Expand Up @@ -86,7 +84,7 @@ func (t *Table) Entry() (*toc.Entry, error) {
}
}

var query = `COPY "%s"."%s" (%s) FROM stdin`
var query = "COPY \"%s\".\"%s\" (%s) FROM stdin;\n"
var schemaName, tableName string
if t.LoadViaPartitionRoot && t.RootPtSchema != "" && t.RootPtName != "" {
schemaName = t.RootPtSchema
Expand Down
3 changes: 2 additions & 1 deletion internal/db/postgres/pgrestore/pgrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ type Options struct {
Inserts bool `mapstructure:"inserts"`
RestoreInOrder bool `mapstructure:"restore-in-order"`
// Use pgzip decompression instead of gzip
Pgzip bool `mapstructure:"pgzip"`
Pgzip bool `mapstructure:"pgzip"`
BatchSize int64 `mapstructure:"batch-size"`

// Connection options:
Host string `mapstructure:"host"`
Expand Down
Loading

0 comments on commit cce8981

Please sign in to comment.