Skip to content

Commit

Permalink
merge: Pulled from main
Browse files Browse the repository at this point in the history
  • Loading branch information
wwoytenko committed May 2, 2024
2 parents 883f786 + 082d02a commit 31840ad
Show file tree
Hide file tree
Showing 21 changed files with 287 additions and 107 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:

env:
go-version: '1.21.5'
python-version: '3.12'
python-version: 'pypy3.10'
cmd-name: 'greenmask'
docker-registry: greenmask/greenmask

Expand Down
8 changes: 8 additions & 0 deletions cmd/greenmask/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,12 @@ func initConfig() {
if err := viper.Unmarshal(Config, decoderCfg); err != nil {
log.Fatal().Err(err).Msg("")
}

if cfgFile != "" {
// This solves problem with map structure described -> https://github.com/spf13/viper/issues/373
// that caused issue in Greenmask https://github.com/GreenmaskIO/greenmask/issues/76
if err := configUtils.ParseTransformerParamsManually(cfgFile, Config); err != nil {
log.Fatal().Err(err).Msg("error parsing transformer parameters")
}
}
}
2 changes: 2 additions & 0 deletions docker/integration/tests/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ for pg_version in ${PG_VERSIONS_CHECK[@]}; do \n\
echo "### SUCCESSFUL CHECK COMPATIBILITY WITH POSTGRESQL ${pg_version} ###" \n\
else \n\
echo "### FAIL CHECK COMPATIBILITY WITH POSTGRESQL ${pg_version} ###" \n\
echo "### EXIT SCRIPT ###" \n\
exit 2 \n\
fi \n\
done \n' > /docker-entrypoint.sh \
&& chmod +x /docker-entrypoint.sh
Expand Down
2 changes: 2 additions & 0 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ nav:
- Core custom functions: built_in_transformers/advanced_transformers/custom_functions/core_functions.md
- Faker function: built_in_transformers/advanced_transformers/custom_functions/faker_function.md
- Release notes:
- Greenmask 0.1.12: release_notes/greenmask_0_1_12.md
- Greenmask 0.1.11: release_notes/greenmask_0_1_11.md
- Greenmask 0.1.10: release_notes/greenmask_0_1_10.md
- Greenmask 0.1.9: release_notes/greenmask_0_1_9.md
- Greenmask 0.1.8: release_notes/greenmask_0_1_8.md
Expand Down
2 changes: 1 addition & 1 deletion docs/overrides/main.html
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% extends "base.html" %}

{% block announce %}
Version 0.1.10 is <a href="https://github.com/GreenmaskIO/greenmask/releases/tag/v0.1.10">released</a>
Version 0.1.12 is <a href="https://github.com/GreenmaskIO/greenmask/releases/tag/v0.1.11">released</a>
{% endblock %}
15 changes: 15 additions & 0 deletions docs/release_notes/greenmask_0_1_11.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Greenmask 0.1.11

This release introduces improvements and bug fixes

## Changes

* Added support for generated columns in the table
* Fixed transformer parameters encoding issue caused by spf13/viper
* Fixed table scoring for transformed table
* Refactored connection management logic in restore command - fixes connection idle timeout

## Assets

To download the Greenmask binary compatible with your system, see
the [release's assets list](https://github.com/GreenmaskIO/greenmask/releases/tag/v0.1.11).
14 changes: 14 additions & 0 deletions docs/release_notes/greenmask_0_1_12.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Greenmask 0.1.12

This release introduces improvements and bug fixes

## Changes

* Fixed config decoding issue caused
* Fixed TOC entries merge behavior when data section is empty
* Fixed integration tests for S3 storage

## Assets

To download the Greenmask binary compatible with your system, see
the [release's assets list](https://github.com/GreenmaskIO/greenmask/releases/tag/v0.1.12).
5 changes: 5 additions & 0 deletions internal/db/postgres/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ func (d *Dump) Run(ctx context.Context) (err error) {
func (d *Dump) MergeTocEntries(schemaEntries []*toc.Entry, dataEntries []*toc.Entry) (
[]*toc.Entry, error,
) {
if len(dataEntries) == 0 {
// No data entries, just return schema entries
return schemaEntries, nil
}

// TODO: Assign dependencies and sort entries in the same order
res := make([]*toc.Entry, 0, len(schemaEntries)+len(dataEntries))

Expand Down
41 changes: 26 additions & 15 deletions internal/db/postgres/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (r *Restore) prepare() error {
return nil
}

func (r *Restore) preFlightRestore(ctx context.Context, conn *pgx.Conn) error {
func (r *Restore) preFlightRestore(ctx context.Context) error {

tocFile, err := r.st.GetObject(ctx, "toc.dat")
if err != nil {
Expand Down Expand Up @@ -225,7 +225,7 @@ func (r *Restore) sortAndFilterEntriesByRestoreList() error {
return nil
}

func (r *Restore) preDataRestore(ctx context.Context, conn *pgx.Conn) error {
func (r *Restore) preDataRestore(ctx context.Context) error {
// pg_dump has a limitation:
// If we want to use --cleanup command then this command must be performed for whole schema (--schema-only)
// without --section parameter. For avoiding cascade dropping we need to run pg_restore with --schema-only --clean
Expand All @@ -242,6 +242,12 @@ func (r *Restore) preDataRestore(ctx context.Context, conn *pgx.Conn) error {
return nil
}

conn, err := pgx.Connect(ctx, r.dsn)
if err != nil {
return fmt.Errorf("cannot establish connection to db: %w", err)
}
defer conn.Close(ctx)

// Execute PreData Before scripts
if err := r.RunScripts(ctx, conn, scriptPreDataSection, scriptExecuteBefore); err != nil {
return err
Expand Down Expand Up @@ -338,7 +344,7 @@ func (r *Restore) prepareCleanupToc() (string, string, error) {
return preDatadirName, postDatadirName, nil
}

func (r *Restore) dataRestore(ctx context.Context, conn *pgx.Conn) error {
func (r *Restore) dataRestore(ctx context.Context) error {
// Execute Data Before scripts

// Do not restore this section if implicitly provided
Expand All @@ -347,6 +353,12 @@ func (r *Restore) dataRestore(ctx context.Context, conn *pgx.Conn) error {
return nil
}

conn, err := pgx.Connect(ctx, r.dsn)
if err != nil {
return fmt.Errorf("cannot establish connection to db: %w", err)
}
defer conn.Close(ctx)

if err := r.RunScripts(ctx, conn, scriptDataSection, scriptExecuteBefore); err != nil {
return err
}
Expand Down Expand Up @@ -439,7 +451,7 @@ func (r *Restore) isNeedRestore(e *toc.Entry) bool {
return true
}

func (r *Restore) postDataRestore(ctx context.Context, conn *pgx.Conn) error {
func (r *Restore) postDataRestore(ctx context.Context) error {
// Execute Post Data Before scripts

// Do not restore this section if implicitly provided
Expand All @@ -448,6 +460,12 @@ func (r *Restore) postDataRestore(ctx context.Context, conn *pgx.Conn) error {
return nil
}

conn, err := pgx.Connect(ctx, r.dsn)
if err != nil {
return fmt.Errorf("cannot establish connection to db: %w", err)
}
defer conn.Close(ctx)

if err := r.RunScripts(ctx, conn, scriptPostDataSection, scriptExecuteBefore); err != nil {
return err
}
Expand Down Expand Up @@ -485,26 +503,19 @@ func (r *Restore) Run(ctx context.Context) error {
return fmt.Errorf("preparation error: %w", err)
}

// Establish connection for scripts
conn, err := pgx.Connect(ctx, r.dsn)
if err != nil {
return fmt.Errorf("cannot establish connection to db: %w", err)
}
defer conn.Close(ctx)

if err = r.preFlightRestore(ctx, conn); err != nil {
if err := r.preFlightRestore(ctx); err != nil {
return fmt.Errorf("pre-flight stage restoration error: %w", err)
}

if err = r.preDataRestore(ctx, conn); err != nil {
if err := r.preDataRestore(ctx); err != nil {
return fmt.Errorf("pre-data stage restoration error: %w", err)
}

if err = r.dataRestore(ctx, conn); err != nil {
if err := r.dataRestore(ctx); err != nil {
return fmt.Errorf("data stage restoration error: %w", err)
}

if err = r.postDataRestore(ctx, conn); err != nil {
if err := r.postDataRestore(ctx); err != nil {
return fmt.Errorf("post-data stage restoration error: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/db/postgres/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ func NewRuntimeContext(
return nil, fmt.Errorf("cannot validate and build table config: %w", err)
}

dataSectionObjects, err := getDumpObjects(ctx, tx, opt, tables)
dataSectionObjects, err := getDumpObjects(ctx, version, tx, opt, tables)
if err != nil {
return nil, fmt.Errorf("cannot build dump object list: %w", err)
}

scoreTablesEntriesAndSort(dataSectionObjects, cfg)

schema, err := getDatabaseSchema(ctx, tx, opt)
schema, err := getDatabaseSchema(ctx, tx, opt, version)
if err != nil {
return nil, fmt.Errorf("cannot get database schema: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/db/postgres/context/pg_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
// TODO: Rewrite it using gotemplate

func getDumpObjects(
ctx context.Context, tx pgx.Tx, options *pgdump.Options, config map[toolkit.Oid]*entries.Table,
ctx context.Context, version int, tx pgx.Tx, options *pgdump.Options, config map[toolkit.Oid]*entries.Table,
) ([]entries.Entry, error) {

// Building relation search query using regexp adaptation rules and pre-defined query templates
Expand Down Expand Up @@ -99,6 +99,7 @@ func getDumpObjects(
// If table was discovered during Transformer validation - use that object instead of a new
table.ExcludeData = excludeData
table.LoadViaPartitionRoot = options.LoadViaPartitionRoot
table.Size = relSize
} else {
// If table is not found - create new table object and collect all the columns

Expand Down Expand Up @@ -137,7 +138,7 @@ func getDumpObjects(
for _, obj := range dataObjects {
switch v := obj.(type) {
case *entries.Table:
columns, err := getColumnsConfig(ctx, tx, v.Oid)
columns, err := getColumnsConfig(ctx, tx, v.Oid, version)
if err != nil {
return nil, fmt.Errorf("unable to collect table columns: %w", err)
}
Expand Down
11 changes: 7 additions & 4 deletions internal/db/postgres/context/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,23 @@ var (
`

// TableColumnsQuery - SQL query for getting all columns of table
TableColumnsQuery = `
TableColumnsQuery = template.Must(template.New("TableColumnsQuery").Parse(`
SELECT
a.attname as name,
a.atttypid::TEXT::INT as typeoid,
pg_catalog.format_type(a.atttypid, a.atttypmod) as typename,
a.attnotnull as notnull,
a.atttypmod as mod,
a.atttypmod as att_len,
a.attnum as num,
t.typlen as len
t.typlen as type_len
{{ if ge .Version 120000 }}
,a.attgenerated != '' as attgenerated
{{ end }}
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_type t ON a.atttypid = t.oid
WHERE a.attrelid = $1 AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum
`
`))

CustomTypesWithTypeChainQuery = `
with RECURSIVE
Expand Down
4 changes: 2 additions & 2 deletions internal/db/postgres/context/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func getDatabaseSchema(
ctx context.Context, tx pgx.Tx, options *pgdump.Options,
ctx context.Context, tx pgx.Tx, options *pgdump.Options, version int,
) ([]*toolkit.Table, error) {
var res []*toolkit.Table
query, err := BuildSchemaIntrospectionQuery(
Expand Down Expand Up @@ -41,7 +41,7 @@ func getDatabaseSchema(

// fill columns
for _, table := range res {
columns, err := getColumnsConfig(ctx, tx, table.Oid)
columns, err := getColumnsConfig(ctx, tx, table.Oid, version)
if err != nil {
return nil, err
}
Expand Down
30 changes: 20 additions & 10 deletions internal/db/postgres/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func validateAndBuildTablesConfig(
}

// Assign columns and transformersMap if were found
columns, err := getColumnsConfig(ctx, tx, table.Oid)
columns, err := getColumnsConfig(ctx, tx, table.Oid, version)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -197,10 +197,18 @@ func getTable(ctx context.Context, tx pgx.Tx, t *domains.Table) ([]*entries.Tabl
return tables, warnings, nil
}

func getColumnsConfig(ctx context.Context, tx pgx.Tx, oid toolkit.Oid) ([]*toolkit.Column, error) {
func getColumnsConfig(ctx context.Context, tx pgx.Tx, oid toolkit.Oid, version int) ([]*toolkit.Column, error) {
defaultTypeMap := pgtype.NewMap()
var res []*toolkit.Column
rows, err := tx.Query(ctx, TableColumnsQuery, oid)
buf := bytes.NewBuffer(nil)
err := TableColumnsQuery.Execute(
buf,
map[string]int{"Version": version},
)
if err != nil {
return nil, fmt.Errorf("error templating TableColumnsQuery: %w", err)
}
rows, err := tx.Query(ctx, buf.String(), oid)
if err != nil {
return nil, fmt.Errorf("unable execute tableColumnQuery: %w", err)
}
Expand All @@ -209,8 +217,14 @@ func getColumnsConfig(ctx context.Context, tx pgx.Tx, oid toolkit.Oid) ([]*toolk
idx := 0
for rows.Next() {
column := toolkit.Column{Idx: idx}
if err = rows.Scan(&column.Name, &column.TypeOid, &column.TypeName,
&column.NotNull, &column.Length, &column.Num, &column.Length); err != nil {
if version >= 120000 {
err = rows.Scan(&column.Name, &column.TypeOid, &column.TypeName,
&column.NotNull, &column.Length, &column.Num, &column.TypeLength, &column.IsGenerated)
} else {
err = rows.Scan(&column.Name, &column.TypeOid, &column.TypeName,
&column.NotNull, &column.Length, &column.Num)
}
if err != nil {
return nil, fmt.Errorf("cannot scan tableColumnQuery: %w", err)
}
column.CanonicalTypeName = column.TypeName
Expand Down Expand Up @@ -317,11 +331,7 @@ func getTableConstraints(ctx context.Context, tx pgx.Tx, tableOid toolkit.Oid, v
buf := bytes.NewBuffer(nil)
err = TablePrimaryKeyReferencesConstraintsQuery.Execute(
buf,
struct {
Version int
}{
Version: version,
},
map[string]int{"Version": version},
)
if err != nil {
return nil, fmt.Errorf("error templating TablePrimaryKeyReferencesConstraintsQuery: %w", err)
Expand Down
5 changes: 3 additions & 2 deletions internal/db/postgres/entries/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ func (t *Table) Entry() (*toc.Entry, error) {
columns := make([]string, 0, len(t.Columns))

for _, column := range t.Columns {
columns = append(columns, fmt.Sprintf(`"%s"`, column.Name))
if !column.IsGenerated {
columns = append(columns, fmt.Sprintf(`"%s"`, column.Name))
}
}

//var query = `COPY "%s"."%s" (%s) FROM stdin WITH (FORMAT CSV, NULL '\N');`
var query = `COPY "%s"."%s" (%s) FROM stdin`
var schemaName, tableName string
if t.LoadViaPartitionRoot && t.RootPtSchema != "" && t.RootPtName != "" {
Expand Down
30 changes: 26 additions & 4 deletions internal/domains/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,19 @@ type TransformerSettings struct {
}

type TransformerConfig struct {
Name string `mapstructure:"name" yaml:"name" json:"name,omitempty"`
Settings *TransformerSettings `mapstructure:"settings,omitempty" yaml:"settings,omitempty" json:"settings,omitempty"`
Params toolkit.StaticParameters `mapstructure:"params" yaml:"params" json:"params,omitempty"`
DynamicParams toolkit.DynamicParameters `mapstructure:"dynamic_params" yaml:"dynamic_params" json:"dynamic_params,omitempty"`
Name string `mapstructure:"name" yaml:"name" json:"name,omitempty"`
Settings *TransformerSettings `mapstructure:"settings,omitempty" yaml:"settings,omitempty" json:"settings,omitempty"`
// Params - transformation parameters. It might be any type. If structure should be stored as raw json
// This cannot be parsed with mapstructure due to uncontrollable lowercasing
// https://github.com/spf13/viper/issues/373
// Instead we have to use workaround and parse it manually
Params toolkit.StaticParameters `mapstructure:"-" yaml:"-" json:"-"` // yaml:"params" json:"params,omitempty"`
// MetadataParams - encoded transformer parameters - uses only for storing into storage
// TODO: You need to get rid of it by creating a separate structure for storing metadata in
// internal/db/postgres/storage/metadata_json.go
// this is used only due to https://github.com/spf13/viper/issues/373
MetadataParams map[string]any `mapstructure:"-" yaml:"params,omitempty" json:"params,omitempty"`
DynamicParams toolkit.DynamicParameters `mapstructure:"dynamic_params" yaml:"dynamic_params" json:"dynamic_params,omitempty"`
}

type Table struct {
Expand All @@ -122,3 +131,16 @@ type Table struct {
Transformers []*TransformerConfig `mapstructure:"transformers" yaml:"transformers" json:"transformers,omitempty"`
ColumnsTypeOverride map[string]string `mapstructure:"columns_type_override" yaml:"columns_type_override" json:"columns_type_override,omitempty"`
}

// DummyConfig - This is a dummy config to the viper workaround
// It is used to parse the transformation parameters manually only avoiding parsing other pars of the config
// The reason why is there https://github.com/GreenmaskIO/greenmask/discussions/85
type DummyConfig struct {
Dump struct {
Transformation []struct {
Transformers []struct {
Params map[string]interface{} `yaml:"params" json:"params"`
} `yaml:"transformers" json:"transformers"`
} `yaml:"transformation" json:"transformation"`
} `yaml:"dump" json:"dump"`
}
Loading

0 comments on commit 31840ad

Please sign in to comment.