diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index daf3e5f0a6..a9f58a5f3f 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -440,6 +440,7 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn }) errGroup.Go(func() error { + var err error rowsSynced, err = syncRecords(dstConn, errCtx, config, partition, outstream) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) diff --git a/flow/datatypes/numeric.go b/flow/datatypes/numeric.go index 56c1b17839..14f2ef1420 100644 --- a/flow/datatypes/numeric.go +++ b/flow/datatypes/numeric.go @@ -102,6 +102,14 @@ func GetNumericTypeForWarehouse(typmod int32, warehouseNumeric WarehouseNumericC } precision, scale := ParseNumericTypmod(typmod) + return GetNumericTypeForWarehousePrecisionScale(precision, scale, warehouseNumeric) +} + +func GetNumericTypeForWarehousePrecisionScale(precision int16, scale int16, warehouseNumeric WarehouseNumericCompatibility) (int16, int16) { + if precision == 0 && scale == 0 { + return warehouseNumeric.DefaultPrecisionAndScale() + } + if !IsValidPrecision(precision, warehouseNumeric) { precision = warehouseNumeric.MaxPrecision() } diff --git a/flow/model/qvalue/dwh.go b/flow/model/qvalue/dwh.go index 49c359b885..b2d085acb4 100644 --- a/flow/model/qvalue/dwh.go +++ b/flow/model/qvalue/dwh.go @@ -5,24 +5,24 @@ import ( "go.temporal.io/sdk/log" - numeric "github.com/PeerDB-io/peer-flow/datatypes" + "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/generated/protos" ) func DetermineNumericSettingForDWH(precision int16, scale int16, dwh protos.DBType) (int16, int16) { - var warehouseNumeric numeric.WarehouseNumericCompatibility + var warehouseNumeric datatypes.WarehouseNumericCompatibility switch dwh { case protos.DBType_CLICKHOUSE: - warehouseNumeric = numeric.ClickHouseNumericCompatibility{} + warehouseNumeric = datatypes.ClickHouseNumericCompatibility{} case protos.DBType_SNOWFLAKE: - warehouseNumeric = numeric.SnowflakeNumericCompatibility{} + warehouseNumeric = datatypes.SnowflakeNumericCompatibility{} case protos.DBType_BIGQUERY: - warehouseNumeric = numeric.BigQueryNumericCompatibility{} + warehouseNumeric = datatypes.BigQueryNumericCompatibility{} default: - warehouseNumeric = numeric.DefaultNumericCompatibility{} + warehouseNumeric = datatypes.DefaultNumericCompatibility{} } - return numeric.GetNumericTypeForWarehouse(numeric.MakeNumericTypmod(int32(precision), int32(scale)), warehouseNumeric) + return datatypes.GetNumericTypeForWarehousePrecisionScale(precision, scale, warehouseNumeric) } // Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD