Skip to content

Commit

Permalink
propagate env to initial snapshot qrep config
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 2, 2024
1 parent b92c6c5 commit 86e9145
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 3 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[s
if err := ctx.Err(); err != nil {
return numRows.Load(), err
} else {
avroMap, err := avroConverter.Convert(ctx, env, qrecord)
avroMap, err := avroConverter.Convert(qrecord)
if err != nil {
logger.Error("Failed to convert QRecord to Avro compatible map", slog.Any("error", err))
return numRows.Load(), fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewQRecordAvroConverter(
}, nil
}

func (qac *QRecordAvroConverter) Convert(ctx context.Context, env map[string]string, qrecord []qvalue.QValue) (map[string]any, error) {
func (qac *QRecordAvroConverter) Convert(qrecord []qvalue.QValue) (map[string]any, error) {
m := make(map[string]any, len(qrecord))
for idx, val := range qrecord {
avroVal, err := qvalue.QValueToAvro(
Expand Down
1 change: 0 additions & 1 deletion flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ func (c *QValueAvroConverter) processNullableUnion(
}

func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) any {
slog.Warn("chchch", slog.String("name", c.Name), slog.Any("c", c), slog.Any("p", c.Precision), slog.Any("s", c.Scale))
if (c.UnboundedNumericAsString && c.Precision == 0 && c.Scale == 0) ||
(c.TargetDWH == protos.DBType_CLICKHOUSE && c.Precision > datatypes.PeerDBClickHouseMaxPrecision) {
numStr, _ := c.processNullableUnion("string", num.String())
Expand Down
1 change: 1 addition & 0 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (s *SnapshotFlowExecution) cloneTable(
WriteMode: snapshotWriteMode,
System: s.config.System,
Script: s.config.Script,
Env: s.config.Env,
ParentMirrorName: flowName,
}

Expand Down

0 comments on commit 86e9145

Please sign in to comment.