Skip to content

Commit

Permalink
Merge branch 'main' into no-tempo-peer-config
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jun 24, 2024
2 parents 6b4072d + 078cfee commit 66a5310
Show file tree
Hide file tree
Showing 19 changed files with 30 additions and 92 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
column.Name, bqTypeString, shortCol)
// expecting data in BASE64 format
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
case qvalue.QValueKindBytes:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`",
column.Name, shortCol)
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16,
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription) bigque
bqField.Type = bigquery.TimeFieldType
// TODO: https://github.com/PeerDB-io/peerdb/issues/189 - handle INTERVAL types again,
// bytes
case qvalue.QValueKindBit, qvalue.QValueKindBytes:
case qvalue.QValueKindBytes:
bqField.Type = bigquery.BytesFieldType
case qvalue.QValueKindArrayInt16, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64:
bqField.Type = bigquery.IntegerFieldType
Expand Down
33 changes: 20 additions & 13 deletions flow/connectors/postgres/normalize_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ func (n *normalizeStmtGenerator) columnTypeToPg(schema *protos.TableSchema, colu
}
}

func (n *normalizeStmtGenerator) generateExpr(
normalizedTableSchema *protos.TableSchema,
genericColumnType string,
stringCol string,
pgType string,
) string {
if normalizedTableSchema.System == protos.TypeSystem_Q {
qkind := qvalue.QValueKind(genericColumnType)
if qkind.IsArray() {
return fmt.Sprintf("ARRAY(SELECT JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>%s)::JSON))::%s", stringCol, pgType)
} else if qkind == qvalue.QValueKindBytes {
return fmt.Sprintf("decode(_peerdb_data->>%s, 'base64')::%s", stringCol, pgType)
}
}
return fmt.Sprintf("(_peerdb_data->>%s)::%s", stringCol, pgType)
}

func (n *normalizeStmtGenerator) generateNormalizeStatements(dstTable string) []string {
normalizedTableSchema := n.tableSchemaMapping[dstTable]
if n.supportsMerge {
Expand Down Expand Up @@ -70,12 +87,7 @@ func (n *normalizeStmtGenerator) generateFallbackStatements(
stringCol := QuoteLiteral(column.Name)
columnNames = append(columnNames, quotedCol)
pgType := n.columnTypeToPg(normalizedTableSchema, genericColumnType)
var expr string
if normalizedTableSchema.System == protos.TypeSystem_Q && qvalue.QValueKind(genericColumnType).IsArray() {
expr = fmt.Sprintf("ARRAY(SELECT JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>%s)::JSON))::%s", stringCol, pgType)
} else {
expr = fmt.Sprintf("(_peerdb_data->>%s)::%s", stringCol, pgType)
}
expr := n.generateExpr(normalizedTableSchema, genericColumnType, stringCol, pgType)

flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("%s AS %s", expr, quotedCol))
if slices.Contains(normalizedTableSchema.PrimaryKeyColumns, column.Name) {
Expand Down Expand Up @@ -138,14 +150,9 @@ func (n *normalizeStmtGenerator) generateMergeStatement(
quotedCol := QuoteIdentifier(column.Name)
stringCol := QuoteLiteral(column.Name)
quotedColumnNames[i] = quotedCol

pgType := n.columnTypeToPg(normalizedTableSchema, genericColumnType)
var expr string
if normalizedTableSchema.System == protos.TypeSystem_Q && qvalue.QValueKind(genericColumnType).IsArray() {
expr = fmt.Sprintf("ARRAY(SELECT JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>%s)::JSON))::%s", stringCol, pgType)
} else {
expr = fmt.Sprintf("(_peerdb_data->>%s)::%s", stringCol, pgType)
}
expr := n.generateExpr(normalizedTableSchema, genericColumnType, stringCol, pgType)

flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("%s AS %s", expr, quotedCol))
if slices.Contains(normalizedTableSchema.PrimaryKeyColumns, column.Name) {
primaryKeyColumnCasts[column.Name] = fmt.Sprintf("(_peerdb_data->>%s)::%s", stringCol, pgType)
Expand Down
14 changes: 2 additions & 12 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu
return qvalue.QValueKindTimestampTZ
case pgtype.NumericOID:
return qvalue.QValueKindNumeric
case pgtype.BitOID, pgtype.VarbitOID:
return qvalue.QValueKindBit
case pgtype.Int2ArrayOID:
return qvalue.QValueKindArrayInt16
case pgtype.Int4ArrayOID:
Expand Down Expand Up @@ -179,8 +177,6 @@ func qValueKindToPostgresType(colTypeStr string) string {
return "TIMESTAMPTZ"
case qvalue.QValueKindNumeric:
return "NUMERIC"
case qvalue.QValueKindBit:
return "BIT"
case qvalue.QValueKindINET:
return "INET"
case qvalue.QValueKindCIDR:
Expand Down Expand Up @@ -379,11 +375,6 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
case qvalue.QValueKindBytes:
rawBytes := value.([]byte)
return qvalue.QValueBytes{Val: rawBytes}, nil
case qvalue.QValueKindBit:
bitsVal := value.(pgtype.Bits)
if bitsVal.Valid {
return qvalue.QValueBit{Val: bitsVal.Bytes}, nil
}
case qvalue.QValueKindNumeric:
numVal := value.(pgtype.Numeric)
if numVal.Valid {
Expand Down Expand Up @@ -449,10 +440,9 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
}
return qvalue.QValueArrayString{Val: a}, nil
case qvalue.QValueKindPoint:
xCoord := value.(pgtype.Point).P.X
yCoord := value.(pgtype.Point).P.Y
coord := value.(pgtype.Point).P
return qvalue.QValuePoint{
Val: fmt.Sprintf("POINT(%f %f)", xCoord, yCoord),
Val: fmt.Sprintf("POINT(%f %f)", coord.X, coord.Y),
}, nil
default:
textVal, ok := value.(string)
Expand Down
6 changes: 0 additions & 6 deletions flow/connectors/postgres/schema_delta_test_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

var AddAllColumnTypes = []string{
string(qvalue.QValueKindInt32),
string(qvalue.QValueKindBit),
string(qvalue.QValueKindBoolean),
string(qvalue.QValueKindBytes),
string(qvalue.QValueKindDate),
Expand All @@ -32,11 +31,6 @@ var AddAllColumnTypesFields = []*protos.FieldDescription{
Type: string(qvalue.QValueKindInt32),
TypeModifier: -1,
},
{
Name: "c1",
Type: string(qvalue.QValueKindBit),
TypeModifier: 1,
},
{
Name: "c2",
Type: string(qvalue.QValueKindBoolean),
Expand Down
3 changes: 0 additions & 3 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeholder int) qvalue.
// value = `{"key": "value"}` // placeholder JSON, replace with actual logic
case qvalue.QValueKindBytes:
return qvalue.QValueBytes{Val: []byte("sample bytes")} // placeholder bytes, replace with actual logic
case qvalue.QValueKindBit:
return qvalue.QValueBit{Val: []byte("sample bits")} // placeholder bytes, replace with actual logic
default:
require.Failf(t, "unsupported QValueKind", "unsupported QValueKind: %s", kind)
return qvalue.QValueNull(kind)
Expand Down Expand Up @@ -97,7 +95,6 @@ func generateRecords(
qvalue.QValueKindUUID,
qvalue.QValueKindQChar,
// qvalue.QValueKindJSON,
qvalue.QValueKindBit,
}

numKinds := len(allQValueKinds)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(dstTable string) (string, error)

targetColumnName := SnowflakeIdentifierNormalize(column.Name)
switch qvKind {
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
case qvalue.QValueKindBytes:
flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("BASE64_DECODE_BINARY(%s:\"%s\") "+
"AS %s", toVariantColumnName, column.Name, targetColumnName))
case qvalue.QValueKindGeography:
Expand Down
6 changes: 1 addition & 5 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
case qvalue.QValueKindString, qvalue.QValueKindHStore:
var s sql.NullString
values[i] = &s
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
case qvalue.QValueKindBytes:
values[i] = new([]byte)
case qvalue.QValueKindNumeric:
var s sql.Null[decimal.Decimal]
Expand Down Expand Up @@ -435,10 +435,6 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) {
if v, ok := val.(*[]byte); ok && v != nil {
return qvalue.QValueBytes{Val: *v}, nil
}
case qvalue.QValueKindBit:
if v, ok := val.(*[]byte); ok && v != nil {
return qvalue.QValueBit{Val: *v}, nil
}

case qvalue.QValueKindUUID:
if v, ok := val.(*[]byte); ok && v != nil {
Expand Down
2 changes: 0 additions & 2 deletions flow/connectors/sqlserver/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ var qValueKindToSQLServerTypeMap = map[qvalue.QValueKind]string{
qvalue.QValueKindTimestampTZ: "DATETIMEOFFSET",
qvalue.QValueKindTime: "TIME",
qvalue.QValueKindDate: "DATE",
qvalue.QValueKindBit: "BINARY",
qvalue.QValueKindBytes: "VARBINARY(MAX)",
qvalue.QValueKindStruct: "NTEXT", // SQL Server doesn't support struct type
qvalue.QValueKindUUID: "UNIQUEIDENTIFIER",
Expand Down Expand Up @@ -47,7 +46,6 @@ var sqlServerTypeToQValueKindMap = map[string]qvalue.QValueKind{
"TIME": qvalue.QValueKindTime,
"DATE": qvalue.QValueKindDate,
"VARBINARY(MAX)": qvalue.QValueKindBytes,
"BINARY": qvalue.QValueKindBit,
"DECIMAL": qvalue.QValueKindNumeric,
"UNIQUEIDENTIFIER": qvalue.QValueKindUUID,
"SMALLINT": qvalue.QValueKindInt32,
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/utils/cdc_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func init() {
gob.Register(qvalue.QValueBytes{})
gob.Register(qvalue.QValueUUID{})
gob.Register(qvalue.QValueJSON{})
gob.Register(qvalue.QValueBit{})
gob.Register(qvalue.QValueHStore{})
gob.Register(qvalue.QValueGeography{})
gob.Register(qvalue.QValueGeometry{})
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func qValueKindToBqColTypeString(val qvalue.QValueKind) (string, error) {
return "BOOL", nil
case qvalue.QValueKindTimestamp:
return "TIMESTAMP", nil
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
case qvalue.QValueKindBytes:
return "BYTES", nil
case qvalue.QValueKindNumeric:
return "NUMERIC", nil
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
dstTableName := s.attachSchemaSuffix("test_types_pg_dst")

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c4 BOOLEAN,
CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BYTEA,c4 BOOLEAN,
c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION,
c14 INET,c15 INTEGER,c21 MACADDR,
c29 SMALLINT,c32 TEXT,
Expand All @@ -122,7 +122,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s SELECT 2,2,b'1',
INSERT INTO %s SELECT 2,2,'\xdeadbeef',
true,'s','test','1.1.10.2'::cidr,
CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1,
'08:00:2b:01:02:03'::macaddr,
Expand Down
2 changes: 0 additions & 2 deletions flow/model/qrecord_copy_from_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ func (src *QRecordCopyFromSource) Values() ([]interface{}, error) {
values[i] = uuid.UUID(v.Val)
case qvalue.QValueNumeric:
values[i] = v.Val
case qvalue.QValueBit:
values[i] = v.Val
case qvalue.QValueBytes:
values[i] = v.Val
case qvalue.QValueDate:
Expand Down
4 changes: 1 addition & 3 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci
return "double", nil
case QValueKindBoolean:
return "boolean", nil
case QValueKindBytes, QValueKindBit:
case QValueKindBytes:
return "bytes", nil
case QValueKindNumeric:
avroNumericPrecision, avroNumericScale := DetermineNumericSettingForDWH(precision, scale, targetDWH)
Expand Down Expand Up @@ -328,8 +328,6 @@ func QValueToAvro(value QValue, field *QField, targetDWH protos.DBType, logger l
return c.processNumeric(v.Val), nil
case QValueBytes:
return c.processBytes(v.Val), nil
case QValueBit:
return c.processBytes(v.Val), nil
case QValueJSON:
return c.processJSON(v.Val), nil
case QValueHStore:
Expand Down
2 changes: 0 additions & 2 deletions flow/model/qvalue/equals.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ func Equals(qv QValue, other QValue) bool {
case QValueJSON:
// TODO (kaushik): fix for tests
return true
case QValueBit:
return compareBytes(qvValue, otherValue)
case QValueGeometry:
return compareGeometry(q.Val, otherValue)
case QValueGeography:
Expand Down
3 changes: 0 additions & 3 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const (
QValueKindBytes QValueKind = "bytes"
QValueKindUUID QValueKind = "uuid"
QValueKindJSON QValueKind = "json"
QValueKindBit QValueKind = "bit"
QValueKindHStore QValueKind = "hstore"
QValueKindGeography QValueKind = "geography"
QValueKindGeometry QValueKind = "geometry"
Expand Down Expand Up @@ -75,7 +74,6 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
QValueKindTime: "TIME",
QValueKindTimeTZ: "TIME",
QValueKindDate: "DATE",
QValueKindBit: "BINARY",
QValueKindBytes: "BINARY",
QValueKindStruct: "STRING",
QValueKindUUID: "STRING",
Expand Down Expand Up @@ -113,7 +111,6 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{
QValueKindTimestampTZ: "DateTime64(6)",
QValueKindTime: "String",
QValueKindDate: "Date",
QValueKindBit: "Boolean",
QValueKindBytes: "String",
QValueKindStruct: "String",
QValueKindUUID: "UUID",
Expand Down
16 changes: 0 additions & 16 deletions flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,22 +358,6 @@ func (v QValueJSON) LValue(ls *lua.LState) lua.LValue {
return lua.LString(v.Val)
}

type QValueBit struct {
Val []byte
}

func (QValueBit) Kind() QValueKind {
return QValueKindBit
}

func (v QValueBit) Value() any {
return v.Val
}

func (v QValueBit) LValue(ls *lua.LState) lua.LValue {
return lua.LString(shared.UnsafeFastReadOnlyBytesToString(v.Val))
}

type QValueHStore struct {
Val string
}
Expand Down
16 changes: 0 additions & 16 deletions flow/model/record_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,6 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) {
}

switch v := qv.(type) {
case qvalue.QValueBit:
// convert to binary string since json.Marshal stores byte arrays as base64
var binStr strings.Builder
binStr.Grow(len(v.Val) * 8)
for _, b := range v.Val {
binStr.WriteString(fmt.Sprintf("%08b", b))
}
jsonStruct[col] = binStr.String()
case qvalue.QValueBytes:
// convert to binary string since json.Marshal stores byte arrays as base64
var binStr strings.Builder
binStr.Grow(len(v.Val) * 8)
for _, b := range v.Val {
binStr.WriteString(fmt.Sprintf("%08b", b))
}
jsonStruct[col] = binStr.String()
case qvalue.QValueUUID:
jsonStruct[col] = uuid.UUID(v.Val)
case qvalue.QValueQChar:
Expand Down
2 changes: 0 additions & 2 deletions flow/pua/peerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ func LuaRowNewIndex(ls *lua.LState) int {
}
case qvalue.QValueKindJSON:
newqv = qvalue.QValueJSON{Val: lua.LVAsString(val)}
case qvalue.QValueKindBit:
newqv = qvalue.QValueBit{Val: []byte(lua.LVAsString(val))}
case qvalue.QValueKindArrayFloat32:
if tbl, ok := val.(*lua.LTable); ok {
newqv = qvalue.QValueArrayFloat32{
Expand Down

0 comments on commit 66a5310

Please sign in to comment.