diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 21648bc45..75bc9f435 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -154,7 +154,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[s if err := ctx.Err(); err != nil { return numRows.Load(), err } else { - avroMap, err := avroConverter.Convert(ctx, env, qrecord) + avroMap, err := avroConverter.Convert(qrecord) if err != nil { logger.Error("Failed to convert QRecord to Avro compatible map", slog.Any("error", err)) return numRows.Load(), fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err) diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index 1fe74efc7..ec7cfc6e3 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -46,7 +46,7 @@ func NewQRecordAvroConverter( }, nil } -func (qac *QRecordAvroConverter) Convert(ctx context.Context, env map[string]string, qrecord []qvalue.QValue) (map[string]any, error) { +func (qac *QRecordAvroConverter) Convert(qrecord []qvalue.QValue) (map[string]any, error) { m := make(map[string]any, len(qrecord)) for idx, val := range qrecord { avroVal, err := qvalue.QValueToAvro( diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 2da4eb9ca..db5bf4e2a 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -481,7 +481,6 @@ func (c *QValueAvroConverter) processNullableUnion( } func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) any { - slog.Warn("chchch", slog.String("name", c.Name), slog.Any("c", c), slog.Any("p", c.Precision), slog.Any("s", c.Scale)) if (c.UnboundedNumericAsString && c.Precision == 0 && c.Scale == 0) || (c.TargetDWH == protos.DBType_CLICKHOUSE && c.Precision > datatypes.PeerDBClickHouseMaxPrecision) { numStr, _ := c.processNullableUnion("string", num.String()) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 9b21b7b38..1db3b6d60 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -208,6 +208,7 @@ func (s *SnapshotFlowExecution) cloneTable( WriteMode: snapshotWriteMode, System: s.config.System, Script: s.config.Script, + Env: s.config.Env, ParentMirrorName: flowName, }