From 078cfeef306a0ab2b587fccc50d5661c3029c9b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 24 Jun 2024 15:46:13 +0000 Subject: [PATCH] Send bytes as base64, remove bits (#1866) Code assumed bit was a single bit in places, sent bytes as binary which was then base64 decoded, or converted to ASCII bytes of binary digits --- .../bigquery/merge_stmt_generator.go | 2 +- flow/connectors/bigquery/qvalue_convert.go | 2 +- .../postgres/normalize_stmt_generator.go | 33 +++++++++++-------- flow/connectors/postgres/qvalue_convert.go | 14 ++------ .../postgres/schema_delta_test_constants.go | 6 ---- .../snowflake/avro_file_writer_test.go | 3 -- .../snowflake/merge_stmt_generator.go | 2 +- flow/connectors/sql/query_executor.go | 6 +--- flow/connectors/sqlserver/qvalue_convert.go | 2 -- flow/connectors/utils/cdc_store.go | 1 - flow/e2e/bigquery/bigquery_helper.go | 2 +- flow/e2e/postgres/peer_flow_pg_test.go | 4 +-- flow/model/qrecord_copy_from_source.go | 2 -- flow/model/qvalue/avro_converter.go | 4 +-- flow/model/qvalue/equals.go | 2 -- flow/model/qvalue/kind.go | 3 -- flow/model/qvalue/qvalue.go | 16 --------- flow/model/record_items.go | 17 ---------- flow/pua/peerdb.go | 2 -- 19 files changed, 30 insertions(+), 93 deletions(-) diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index b2419cbae0..163d53ae6b 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -39,7 +39,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`", column.Name, bqTypeString, shortCol) // expecting data in BASE64 format - case qvalue.QValueKindBytes, qvalue.QValueKindBit: + case qvalue.QValueKindBytes: castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`", column.Name, shortCol) case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16, diff --git a/flow/connectors/bigquery/qvalue_convert.go b/flow/connectors/bigquery/qvalue_convert.go index 40595588db..ce2347a6c5 100644 --- a/flow/connectors/bigquery/qvalue_convert.go +++ b/flow/connectors/bigquery/qvalue_convert.go @@ -46,7 +46,7 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription) bigque bqField.Type = bigquery.TimeFieldType // TODO: https://github.com/PeerDB-io/peerdb/issues/189 - handle INTERVAL types again, // bytes - case qvalue.QValueKindBit, qvalue.QValueKindBytes: + case qvalue.QValueKindBytes: bqField.Type = bigquery.BytesFieldType case qvalue.QValueKindArrayInt16, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64: bqField.Type = bigquery.IntegerFieldType diff --git a/flow/connectors/postgres/normalize_stmt_generator.go b/flow/connectors/postgres/normalize_stmt_generator.go index 49013b924d..ff33af6bef 100644 --- a/flow/connectors/postgres/normalize_stmt_generator.go +++ b/flow/connectors/postgres/normalize_stmt_generator.go @@ -42,6 +42,23 @@ func (n *normalizeStmtGenerator) columnTypeToPg(schema *protos.TableSchema, colu } } +func (n *normalizeStmtGenerator) generateExpr( + normalizedTableSchema *protos.TableSchema, + genericColumnType string, + stringCol string, + pgType string, +) string { + if normalizedTableSchema.System == protos.TypeSystem_Q { + qkind := qvalue.QValueKind(genericColumnType) + if qkind.IsArray() { + return fmt.Sprintf("ARRAY(SELECT JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>%s)::JSON))::%s", stringCol, pgType) + } else if qkind == qvalue.QValueKindBytes { + return fmt.Sprintf("decode(_peerdb_data->>%s, 'base64')::%s", stringCol, pgType) + } + } + return fmt.Sprintf("(_peerdb_data->>%s)::%s", stringCol, pgType) +} + func (n *normalizeStmtGenerator) generateNormalizeStatements(dstTable string) []string { normalizedTableSchema := n.tableSchemaMapping[dstTable] if n.supportsMerge { @@ -70,12 +87,7 @@ func (n *normalizeStmtGenerator) generateFallbackStatements( stringCol := QuoteLiteral(column.Name) columnNames = append(columnNames, quotedCol) pgType := n.columnTypeToPg(normalizedTableSchema, genericColumnType) - var expr string - if normalizedTableSchema.System == protos.TypeSystem_Q && qvalue.QValueKind(genericColumnType).IsArray() { - expr = fmt.Sprintf("ARRAY(SELECT JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>%s)::JSON))::%s", stringCol, pgType) - } else { - expr = fmt.Sprintf("(_peerdb_data->>%s)::%s", stringCol, pgType) - } + expr := n.generateExpr(normalizedTableSchema, genericColumnType, stringCol, pgType) flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("%s AS %s", expr, quotedCol)) if slices.Contains(normalizedTableSchema.PrimaryKeyColumns, column.Name) { @@ -138,14 +150,9 @@ func (n *normalizeStmtGenerator) generateMergeStatement( quotedCol := QuoteIdentifier(column.Name) stringCol := QuoteLiteral(column.Name) quotedColumnNames[i] = quotedCol - pgType := n.columnTypeToPg(normalizedTableSchema, genericColumnType) - var expr string - if normalizedTableSchema.System == protos.TypeSystem_Q && qvalue.QValueKind(genericColumnType).IsArray() { - expr = fmt.Sprintf("ARRAY(SELECT JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>%s)::JSON))::%s", stringCol, pgType) - } else { - expr = fmt.Sprintf("(_peerdb_data->>%s)::%s", stringCol, pgType) - } + expr := n.generateExpr(normalizedTableSchema, genericColumnType, stringCol, pgType) + flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("%s AS %s", expr, quotedCol)) if slices.Contains(normalizedTableSchema.PrimaryKeyColumns, column.Name) { primaryKeyColumnCasts[column.Name] = fmt.Sprintf("(_peerdb_data->>%s)::%s", stringCol, pgType) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 246da6b49a..d359212bdb 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -82,8 +82,6 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu return qvalue.QValueKindTimestampTZ case pgtype.NumericOID: return qvalue.QValueKindNumeric - case pgtype.BitOID, pgtype.VarbitOID: - return qvalue.QValueKindBit case pgtype.Int2ArrayOID: return qvalue.QValueKindArrayInt16 case pgtype.Int4ArrayOID: @@ -179,8 +177,6 @@ func qValueKindToPostgresType(colTypeStr string) string { return "TIMESTAMPTZ" case qvalue.QValueKindNumeric: return "NUMERIC" - case qvalue.QValueKindBit: - return "BIT" case qvalue.QValueKindINET: return "INET" case qvalue.QValueKindCIDR: @@ -379,11 +375,6 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( case qvalue.QValueKindBytes: rawBytes := value.([]byte) return qvalue.QValueBytes{Val: rawBytes}, nil - case qvalue.QValueKindBit: - bitsVal := value.(pgtype.Bits) - if bitsVal.Valid { - return qvalue.QValueBit{Val: bitsVal.Bytes}, nil - } case qvalue.QValueKindNumeric: numVal := value.(pgtype.Numeric) if numVal.Valid { @@ -449,10 +440,9 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( } return qvalue.QValueArrayString{Val: a}, nil case qvalue.QValueKindPoint: - xCoord := value.(pgtype.Point).P.X - yCoord := value.(pgtype.Point).P.Y + coord := value.(pgtype.Point).P return qvalue.QValuePoint{ - Val: fmt.Sprintf("POINT(%f %f)", xCoord, yCoord), + Val: fmt.Sprintf("POINT(%f %f)", coord.X, coord.Y), }, nil default: textVal, ok := value.(string) diff --git a/flow/connectors/postgres/schema_delta_test_constants.go b/flow/connectors/postgres/schema_delta_test_constants.go index 6ded70625a..52c50db453 100644 --- a/flow/connectors/postgres/schema_delta_test_constants.go +++ b/flow/connectors/postgres/schema_delta_test_constants.go @@ -7,7 +7,6 @@ import ( var AddAllColumnTypes = []string{ string(qvalue.QValueKindInt32), - string(qvalue.QValueKindBit), string(qvalue.QValueKindBoolean), string(qvalue.QValueKindBytes), string(qvalue.QValueKindDate), @@ -32,11 +31,6 @@ var AddAllColumnTypesFields = []*protos.FieldDescription{ Type: string(qvalue.QValueKindInt32), TypeModifier: -1, }, - { - Name: "c1", - Type: string(qvalue.QValueKindBit), - TypeModifier: 1, - }, { Name: "c2", Type: string(qvalue.QValueKindBoolean), diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index a4d89bd773..ac6f253517 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -60,8 +60,6 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeholder int) qvalue. // value = `{"key": "value"}` // placeholder JSON, replace with actual logic case qvalue.QValueKindBytes: return qvalue.QValueBytes{Val: []byte("sample bytes")} // placeholder bytes, replace with actual logic - case qvalue.QValueKindBit: - return qvalue.QValueBit{Val: []byte("sample bits")} // placeholder bytes, replace with actual logic default: require.Failf(t, "unsupported QValueKind", "unsupported QValueKind: %s", kind) return qvalue.QValueNull(kind) @@ -97,7 +95,6 @@ func generateRecords( qvalue.QValueKindUUID, qvalue.QValueKindQChar, // qvalue.QValueKindJSON, - qvalue.QValueKindBit, } numKinds := len(allQValueKinds) diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index 41ae4c31e9..3f0cfbc63a 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -41,7 +41,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(dstTable string) (string, error) targetColumnName := SnowflakeIdentifierNormalize(column.Name) switch qvKind { - case qvalue.QValueKindBytes, qvalue.QValueKindBit: + case qvalue.QValueKindBytes: flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("BASE64_DECODE_BINARY(%s:\"%s\") "+ "AS %s", toVariantColumnName, column.Name, targetColumnName)) case qvalue.QValueKindGeography: diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 48ab5ce454..fb94280c38 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -221,7 +221,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa case qvalue.QValueKindString, qvalue.QValueKindHStore: var s sql.NullString values[i] = &s - case qvalue.QValueKindBytes, qvalue.QValueKindBit: + case qvalue.QValueKindBytes: values[i] = new([]byte) case qvalue.QValueKindNumeric: var s sql.Null[decimal.Decimal] @@ -435,10 +435,6 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { if v, ok := val.(*[]byte); ok && v != nil { return qvalue.QValueBytes{Val: *v}, nil } - case qvalue.QValueKindBit: - if v, ok := val.(*[]byte); ok && v != nil { - return qvalue.QValueBit{Val: *v}, nil - } case qvalue.QValueKindUUID: if v, ok := val.(*[]byte); ok && v != nil { diff --git a/flow/connectors/sqlserver/qvalue_convert.go b/flow/connectors/sqlserver/qvalue_convert.go index b4f73420e1..57f2d90fb9 100644 --- a/flow/connectors/sqlserver/qvalue_convert.go +++ b/flow/connectors/sqlserver/qvalue_convert.go @@ -17,7 +17,6 @@ var qValueKindToSQLServerTypeMap = map[qvalue.QValueKind]string{ qvalue.QValueKindTimestampTZ: "DATETIMEOFFSET", qvalue.QValueKindTime: "TIME", qvalue.QValueKindDate: "DATE", - qvalue.QValueKindBit: "BINARY", qvalue.QValueKindBytes: "VARBINARY(MAX)", qvalue.QValueKindStruct: "NTEXT", // SQL Server doesn't support struct type qvalue.QValueKindUUID: "UNIQUEIDENTIFIER", @@ -47,7 +46,6 @@ var sqlServerTypeToQValueKindMap = map[string]qvalue.QValueKind{ "TIME": qvalue.QValueKindTime, "DATE": qvalue.QValueKindDate, "VARBINARY(MAX)": qvalue.QValueKindBytes, - "BINARY": qvalue.QValueKindBit, "DECIMAL": qvalue.QValueKindNumeric, "UNIQUEIDENTIFIER": qvalue.QValueKindUUID, "SMALLINT": qvalue.QValueKindInt32, diff --git a/flow/connectors/utils/cdc_store.go b/flow/connectors/utils/cdc_store.go index be42064d77..6b36f73258 100644 --- a/flow/connectors/utils/cdc_store.go +++ b/flow/connectors/utils/cdc_store.go @@ -98,7 +98,6 @@ func init() { gob.Register(qvalue.QValueBytes{}) gob.Register(qvalue.QValueUUID{}) gob.Register(qvalue.QValueJSON{}) - gob.Register(qvalue.QValueBit{}) gob.Register(qvalue.QValueHStore{}) gob.Register(qvalue.QValueGeography{}) gob.Register(qvalue.QValueGeometry{}) diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 9fd5691114..6138c036e0 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -432,7 +432,7 @@ func qValueKindToBqColTypeString(val qvalue.QValueKind) (string, error) { return "BOOL", nil case qvalue.QValueKindTimestamp: return "TIMESTAMP", nil - case qvalue.QValueKindBytes, qvalue.QValueKindBit: + case qvalue.QValueKindBytes: return "BYTES", nil case qvalue.QValueKindNumeric: return "NUMERIC", nil diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 93d2f4de24..ec96420480 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -99,7 +99,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { dstTableName := s.attachSchemaSuffix("test_types_pg_dst") _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c4 BOOLEAN, + CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BYTEA,c4 BOOLEAN, c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, c14 INET,c15 INTEGER,c21 MACADDR, c29 SMALLINT,c32 TEXT, @@ -124,7 +124,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s SELECT 2,2,b'1', + INSERT INTO %s SELECT 2,2,'\xdeadbeef', true,'s','test','1.1.10.2'::cidr, CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1, '08:00:2b:01:02:03'::macaddr, diff --git a/flow/model/qrecord_copy_from_source.go b/flow/model/qrecord_copy_from_source.go index 745827309c..308676c5f5 100644 --- a/flow/model/qrecord_copy_from_source.go +++ b/flow/model/qrecord_copy_from_source.go @@ -90,8 +90,6 @@ func (src *QRecordCopyFromSource) Values() ([]interface{}, error) { values[i] = uuid.UUID(v.Val) case qvalue.QValueNumeric: values[i] = v.Val - case qvalue.QValueBit: - values[i] = v.Val case qvalue.QValueBytes: values[i] = v.Val case qvalue.QValueDate: diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 764fd86759..648a6aa7ac 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -95,7 +95,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci return "double", nil case QValueKindBoolean: return "boolean", nil - case QValueKindBytes, QValueKindBit: + case QValueKindBytes: return "bytes", nil case QValueKindNumeric: avroNumericPrecision, avroNumericScale := DetermineNumericSettingForDWH(precision, scale, targetDWH) @@ -328,8 +328,6 @@ func QValueToAvro(value QValue, field *QField, targetDWH protos.DBType, logger l return c.processNumeric(v.Val), nil case QValueBytes: return c.processBytes(v.Val), nil - case QValueBit: - return c.processBytes(v.Val), nil case QValueJSON: return c.processJSON(v.Val), nil case QValueHStore: diff --git a/flow/model/qvalue/equals.go b/flow/model/qvalue/equals.go index 97ba7ca73f..a609c6df6a 100644 --- a/flow/model/qvalue/equals.go +++ b/flow/model/qvalue/equals.go @@ -76,8 +76,6 @@ func Equals(qv QValue, other QValue) bool { case QValueJSON: // TODO (kaushik): fix for tests return true - case QValueBit: - return compareBytes(qvValue, otherValue) case QValueGeometry: return compareGeometry(q.Val, otherValue) case QValueGeography: diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index fa0a3c2235..43e2495429 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -30,7 +30,6 @@ const ( QValueKindBytes QValueKind = "bytes" QValueKindUUID QValueKind = "uuid" QValueKindJSON QValueKind = "json" - QValueKindBit QValueKind = "bit" QValueKindHStore QValueKind = "hstore" QValueKindGeography QValueKind = "geography" QValueKindGeometry QValueKind = "geometry" @@ -75,7 +74,6 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindTime: "TIME", QValueKindTimeTZ: "TIME", QValueKindDate: "DATE", - QValueKindBit: "BINARY", QValueKindBytes: "BINARY", QValueKindStruct: "STRING", QValueKindUUID: "STRING", @@ -113,7 +111,6 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ QValueKindTimestampTZ: "DateTime64(6)", QValueKindTime: "String", QValueKindDate: "Date", - QValueKindBit: "Boolean", QValueKindBytes: "String", QValueKindStruct: "String", QValueKindUUID: "UUID", diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 91b9e3fe31..9b1c13f755 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -358,22 +358,6 @@ func (v QValueJSON) LValue(ls *lua.LState) lua.LValue { return lua.LString(v.Val) } -type QValueBit struct { - Val []byte -} - -func (QValueBit) Kind() QValueKind { - return QValueKindBit -} - -func (v QValueBit) Value() any { - return v.Val -} - -func (v QValueBit) LValue(ls *lua.LState) lua.LValue { - return lua.LString(shared.UnsafeFastReadOnlyBytesToString(v.Val)) -} - type QValueHStore struct { Val string } diff --git a/flow/model/record_items.go b/flow/model/record_items.go index 55614183ab..daa8ab1aea 100644 --- a/flow/model/record_items.go +++ b/flow/model/record_items.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "math" - "strings" "github.com/google/uuid" @@ -88,22 +87,6 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) { } switch v := qv.(type) { - case qvalue.QValueBit: - // convert to binary string since json.Marshal stores byte arrays as base64 - var binStr strings.Builder - binStr.Grow(len(v.Val) * 8) - for _, b := range v.Val { - binStr.WriteString(fmt.Sprintf("%08b", b)) - } - jsonStruct[col] = binStr.String() - case qvalue.QValueBytes: - // convert to binary string since json.Marshal stores byte arrays as base64 - var binStr strings.Builder - binStr.Grow(len(v.Val) * 8) - for _, b := range v.Val { - binStr.WriteString(fmt.Sprintf("%08b", b)) - } - jsonStruct[col] = binStr.String() case qvalue.QValueUUID: jsonStruct[col] = uuid.UUID(v.Val) case qvalue.QValueQChar: diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index 9a96e3576b..341d878186 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -246,8 +246,6 @@ func LuaRowNewIndex(ls *lua.LState) int { } case qvalue.QValueKindJSON: newqv = qvalue.QValueJSON{Val: lua.LVAsString(val)} - case qvalue.QValueKindBit: - newqv = qvalue.QValueBit{Val: []byte(lua.LVAsString(val))} case qvalue.QValueKindArrayFloat32: if tbl, ok := val.(*lua.LTable); ok { newqv = qvalue.QValueArrayFloat32{