Skip to content

Commit

Permalink
New rails
Browse files Browse the repository at this point in the history
* Fixed parameter validation
* Introduced TransformerContext that united parameters that were passed to transformer during initialization procedure and initialized transformer
* Updated RandomDate transformer implementation, adapted to the dynamic parametrization
* Fixed static and dynamic parameters initialization
* Deprecated value-parameters methods in ParameterDefinition
  • Loading branch information
wwoytenko committed Jan 15, 2024
1 parent 8a69fa5 commit 5159645
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 132 deletions.
2 changes: 1 addition & 1 deletion internal/db/postgres/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func validateAndBuildTablesConfig(
return nil, warnings, err
}
warnings = append(warnings, initWarnings...)
table.Transformers = append(table.Transformers, transformer)
table.TransformersContext = append(table.TransformersContext, transformer)
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/db/postgres/context/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func initTransformer(
c *domains.TransformerConfig,
r *transformersUtils.TransformerRegistry,
types []*toolkit.Type,
) (transformersUtils.Transformer, toolkit.ValidationWarnings, error) {
) (*transformersUtils.TransformerContext, toolkit.ValidationWarnings, error) {
var totalWarnings toolkit.ValidationWarnings
td, ok := r.Get(c.Name)
if !ok {
Expand Down
6 changes: 3 additions & 3 deletions internal/db/postgres/dump/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Table struct {
RootPtName string
LoadViaPartitionRoot bool
RootOid toolkit.Oid
Transformers []utils.Transformer
TransformersContext []*utils.TransformerContext
Dependencies []int32
DumpId int32
OriginalSize int64
Expand All @@ -47,8 +47,8 @@ type Table struct {
}

func (t *Table) HasCustomTransformer() bool {
return slices.ContainsFunc(t.Transformers, func(transformer utils.Transformer) bool {
_, ok := transformer.(*custom.CmdTransformer)
return slices.ContainsFunc(t.TransformersContext, func(transformer *utils.TransformerContext) bool {
_, ok := transformer.Transformer.(*custom.CmdTransformer)
return ok
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/db/postgres/dumpers/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (td *TableDumper) Execute(ctx context.Context, tx pgx.Tx, st storages.Stora
func() error {
var pipeline Pipeliner
var err error
if len(td.table.Transformers) > 0 {
if len(td.table.TransformersContext) > 0 {
if td.validate {
pipeline, err = NewValidationPipeline(gtx, eg, td.table, w, td.validateWithOriginal)
if err != nil {
Expand Down
43 changes: 25 additions & 18 deletions internal/db/postgres/dumpers/transformation_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,39 @@ func NewTransformationPipeline(ctx context.Context, eg *errgroup.Group, table *d

// TODO: Fix this hint. Async execution cannot be performed with template record because it is unsafe.
// For overcoming it - implement sequence transformer wrapper - that wraps internal (non CMD) transformers
hasTemplateRecordTransformer := slices.ContainsFunc(table.Transformers, func(transformer utils.Transformer) bool {
_, ok := transformer.(*transformers.TemplateRecordTransformer)
hasTemplateRecordTransformer := slices.ContainsFunc(table.TransformersContext, func(transformer *utils.TransformerContext) bool {
_, ok := transformer.Transformer.(*transformers.TemplateRecordTransformer)
return ok
})

if !hasTemplateRecordTransformer && table.HasCustomTransformer() && len(table.Transformers) > 1 {
if !hasTemplateRecordTransformer && table.HasCustomTransformer() && len(table.TransformersContext) > 1 {
isAsync = true
tw := NewTransformationWindow(ctx, eg)
tws = append(tws, tw)
for _, t := range table.Transformers {
if !tw.TryAdd(table, t) {
for _, t := range table.TransformersContext {
if !tw.TryAdd(table, t.Transformer) {
tw = NewTransformationWindow(ctx, eg)
tws = append(tws, tw)
tw.TryAdd(table, t)
tw.TryAdd(table, t.Transformer)
}
}
}

record := toolkit.NewRecord(table.Driver)

for _, tc := range table.TransformersContext {
for _, dp := range tc.DynamicParameters {
dp.SetRecord(record)
}
}

tp := &TransformationPipeline{
table: table,
//buf: bytes.NewBuffer(nil),
table: table,
w: w,
row: pgcopy.NewRow(len(table.Columns)),
transformationWindows: tws,
isAsync: true,
record: toolkit.NewRecord(table.Driver),
record: record,
}

var tf TransformationFunc = tp.TransformSync
Expand All @@ -93,18 +100,18 @@ func NewTransformationPipeline(ctx context.Context, eg *errgroup.Group, table *d
func (tp *TransformationPipeline) Init(ctx context.Context) error {
var lastInitErr error
var idx int
var t utils.Transformer
for idx, t = range tp.table.Transformers {
if err := t.Init(ctx); err != nil {
var t *utils.TransformerContext
for idx, t = range tp.table.TransformersContext {
if err := t.Transformer.Init(ctx); err != nil {
lastInitErr = err
log.Warn().Err(err).Msg("error initializing transformer")
}
}

if lastInitErr != nil {
lastInitialized := idx
for _, t = range tp.table.Transformers[:lastInitialized] {
if err := t.Done(ctx); err != nil {
for _, t = range tp.table.TransformersContext[:lastInitialized] {
if err := t.Transformer.Done(ctx); err != nil {
log.Warn().Err(err).Msg("error terminating previously initialized transformer")
}
}
Expand All @@ -123,8 +130,8 @@ func (tp *TransformationPipeline) Init(ctx context.Context) error {

func (tp *TransformationPipeline) TransformSync(ctx context.Context, r *toolkit.Record) (*toolkit.Record, error) {
var err error
for _, t := range tp.table.Transformers {
_, err = t.Transform(ctx, r)
for _, t := range tp.table.TransformersContext {
_, err = t.Transformer.Transform(ctx, r)
if err != nil {
return nil, NewDumpError(tp.table.Schema, tp.table.Name, tp.line, err)
}
Expand Down Expand Up @@ -187,8 +194,8 @@ func (tp *TransformationPipeline) CompleteDump() (err error) {

func (tp *TransformationPipeline) Done(ctx context.Context) error {
var lastErr error
for _, t := range tp.table.Transformers {
if err := t.Done(ctx); err != nil {
for _, t := range tp.table.TransformersContext {
if err := t.Transformer.Done(ctx); err != nil {
lastErr = err
log.Warn().Err(err).Msg("error terminating initialized transformer")
}
Expand Down
120 changes: 73 additions & 47 deletions internal/db/postgres/transformers/random_date.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package transformers

import (
"context"
"errors"
"fmt"
"math/rand"
"strings"
Expand Down Expand Up @@ -48,13 +47,15 @@ var RandomDateTransformerDefinition = utils.NewTransformerDefinition(
"min",
"min threshold date (and/or time) of random value",
).SetRequired(true).
SetLinkParameter("column"),
SetLinkParameter("column").
SetDynamicModeSupport(true),

toolkit.MustNewParameterDefinition(
"max",
"max threshold date (and/or time) of random value",
).SetRequired(true).
SetLinkParameter("column"),
SetLinkParameter("column").
SetDynamicModeSupport(true),

toolkit.MustNewParameterDefinition(
"truncate",
Expand Down Expand Up @@ -82,22 +83,30 @@ type RandomDateTransformer struct {
columnIdx int
rand *rand.Rand
generate dateGeneratorFunc
min *time.Time
max *time.Time
truncate string
keepNull bool
delta *int64
affectedColumns map[int]string

columnParam toolkit.Parameterizer
maxParam toolkit.Parameterizer
minParam toolkit.Parameterizer
truncateParam toolkit.Parameterizer
keepNullParam toolkit.Parameterizer
}

func NewRandomDateTransformer(ctx context.Context, driver *toolkit.Driver, parameters map[string]toolkit.Parameterizer) (utils.Transformer, toolkit.ValidationWarnings, error) {

columnParam := parameters["column"]
maxParam := parameters["max"]
minParam := parameters["min"]
truncateParam := parameters["truncate"]
keepNullParam := parameters["keep_null"]

var columnName, truncate string
var minTime, maxTime time.Time
var generator dateGeneratorFunc = generateRandomTime
var keepNull bool

p := parameters["column"]
if _, err := p.Scan(&columnName); err != nil {
if _, err := columnParam.Scan(&columnName); err != nil {
return nil, nil, fmt.Errorf(`unable to scan "column" param: %w`, err)
}

Expand All @@ -108,61 +117,53 @@ func NewRandomDateTransformer(ctx context.Context, driver *toolkit.Driver, param
affectedColumns := make(map[int]string)
affectedColumns[idx] = columnName

p = parameters["min"]
v, err := p.Value()
if err != nil {
return nil, nil, fmt.Errorf(`error parsing "min" parameter: %w`, err)
}
minTime, ok = v.(time.Time)
if !ok {
return nil, nil, errors.New(`unexpected type for "min" parameter`)
}

p = parameters["max"]
v, err = p.Value()
if err != nil {
return nil, nil, fmt.Errorf(`error parsing "max" parameter: %w`, err)
}

maxTime, ok = v.(time.Time)
if !ok {
return nil, nil, errors.New(`unexpected type for "max" parameter`)
}

p = parameters["keep_null"]
if _, err := p.Scan(&keepNull); err != nil {
//p = parameters["min"]
//v, err := p.Value()
//if err != nil {
// return nil, nil, fmt.Errorf(`error parsing "min" parameter: %w`, err)
//}
//minTime, ok = v.(time.Time)
//if !ok {
// return nil, nil, errors.New(`unexpected type for "min" parameter`)
//}
//
//p = parameters["max"]
//v, err = p.Value()
//if err != nil {
// return nil, nil, fmt.Errorf(`error parsing "max" parameter: %w`, err)
//}
//
//maxTime, ok = v.(time.Time)
//if !ok {
// return nil, nil, errors.New(`unexpected type for "max" parameter`)
//}

if _, err := keepNullParam.Scan(&keepNull); err != nil {
return nil, nil, fmt.Errorf(`unable to scan "keep_null" param: %w`, err)
}

p = parameters["truncate"]
if _, err := p.Scan(&truncate); err != nil {
if _, err := truncateParam.Scan(&truncate); err != nil {
return nil, nil, fmt.Errorf(`unable to scan "truncate" param: %w`, err)
}

if truncate != "" {
generator = generateRandomTimeTruncate
}

if minTime.After(maxTime) {
return nil, toolkit.ValidationWarnings{
toolkit.NewValidationWarning().
AddMeta("max", maxTime).
AddMeta("min", minTime).
SetMsg("max value must be greater than min"),
}, nil
}
delta := int64(maxTime.Sub(minTime))
return &RandomDateTransformer{
keepNull: keepNull,
truncate: truncate,
columnName: columnName,
columnIdx: idx,
min: &minTime,
max: &maxTime,
generate: generator,
rand: rand.New(rand.NewSource(time.Now().UnixMicro())),
affectedColumns: affectedColumns,
delta: &delta,

columnParam: columnParam,
minParam: minParam,
maxParam: maxParam,
truncateParam: truncateParam,
keepNullParam: keepNullParam,
}, nil, nil

}
Expand All @@ -180,6 +181,29 @@ func (rdt *RandomDateTransformer) Done(ctx context.Context) error {
}

func (rdt *RandomDateTransformer) Transform(ctx context.Context, r *toolkit.Record) (*toolkit.Record, error) {

minTime := &time.Time{}
empty, err := rdt.minParam.Scan(minTime)
if err != nil {
return nil, fmt.Errorf(`error getting "min" parameter value: %w`, err)
}
if empty {
return nil, fmt.Errorf("parameter \"min\" cannot be empty")
}

maxTime := &time.Time{}
empty, err = rdt.maxParam.Scan(maxTime)
if err != nil {
return nil, fmt.Errorf(`error getting "max" parameter value: %w`, err)
}
if empty {
return nil, fmt.Errorf("parameter \"max\" cannot be empty")
}

if minTime.After(*maxTime) {
return nil, fmt.Errorf("max value must be greater than min: got min = %s max = %s", minTime.String(), maxTime.String())
}

valAny, err := r.GetRawColumnValueByIdx(rdt.columnIdx)
if err != nil {
return nil, fmt.Errorf("unable to scan value: %w", err)
Expand All @@ -188,7 +212,9 @@ func (rdt *RandomDateTransformer) Transform(ctx context.Context, r *toolkit.Reco
return r, nil
}

res := rdt.generate(rdt.rand, rdt.min, rdt.delta, &rdt.truncate)
delta := int64(maxTime.Sub(*minTime))

res := rdt.generate(rdt.rand, minTime, &delta, &rdt.truncate)
if err := r.SetColumnValueByIdx(rdt.columnIdx, res); err != nil {
return nil, fmt.Errorf("unable to set new value: %w", err)
}
Expand Down
27 changes: 24 additions & 3 deletions internal/db/postgres/transformers/utils/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,15 @@ func (d *TransformerDefinition) SetSchemaValidator(v SchemaValidationFunc) *Tran
// return totalWarnings, params, nil
//}

type TransformerContext struct {
Transformer Transformer
StaticParameters map[string]*toolkit.StaticParameter
DynamicParameters map[string]*toolkit.DynamicParameter
}

func (d *TransformerDefinition) Instance(
ctx context.Context, driver *toolkit.Driver, rawParams map[string]toolkit.ParamsValue, dynamicParameters map[string]*toolkit.DynamicParamValue,
) (Transformer, toolkit.ValidationWarnings, error) {
) (*TransformerContext, toolkit.ValidationWarnings, error) {
// Decode parameters and get the pgcopy of parsed
params, parametersWarnings, err := toolkit.InitParametersV2(driver, d.Parameters, rawParams, dynamicParameters)
if err != nil {
Expand All @@ -123,12 +129,23 @@ func (d *TransformerDefinition) Instance(
return nil, parametersWarnings, nil
}

dynamicParams := make(map[string]*toolkit.DynamicParameter)
staticParams := make(map[string]*toolkit.StaticParameter)
for name, p := range params {
switch v := p.(type) {
case *toolkit.StaticParameter:
staticParams[name] = v
case *toolkit.DynamicParameter:
dynamicParams[name] = v
}
}

paramDefs := make(map[string]*toolkit.ParameterDefinition, len(d.Parameters))
for _, pd := range d.Parameters {
paramDefs[pd.Name] = pd
}
// Validate schema
schemaWarnings, err := d.SchemaValidator(ctx, driver, d.Properties, paramDefs)
schemaWarnings, err := d.SchemaValidator(ctx, driver, d.Properties, staticParams)
if err != nil {
return nil, nil, fmt.Errorf("schema validation error: %w", err)
}
Expand All @@ -144,5 +161,9 @@ func (d *TransformerDefinition) Instance(
res = append(res, schemaWarnings...)
res = append(res, transformerWarnings...)

return t, res, nil
return &TransformerContext{
Transformer: t,
StaticParameters: staticParams,
DynamicParameters: dynamicParams,
}, res, nil
}
Loading

0 comments on commit 5159645

Please sign in to comment.