Skip to content

Commit

Permalink
Fix issues of upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
Siim committed Sep 17, 2017
1 parent da8fa9c commit 7f8c286
Showing 1 changed file with 25 additions and 8 deletions.
33 changes: 25 additions & 8 deletions adaptor/postgres/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/compose/mejson"
Expand Down Expand Up @@ -123,11 +124,17 @@ func insertMsg(m message.Msg, s *sql.DB) error {

query := fmt.Sprintf("INSERT INTO %v (%v) VALUES (%v)", m.Namespace(), strings.Join(st.Inames, ", "), strings.Join(st.Ikeys, ", "))

pkeys := reflect.ValueOf(st.Pkeys).MapKeys()
strkeys := make([]string, len(pkeys))
for i := 0; i < len(pkeys); i++ {
strkeys[i] = pkeys[i].String()
}
pkeysstr := strings.Join(strkeys, ",")
if len(st.Ukeys) == 0 {
query = fmt.Sprintf("%v ON CONFLICT (id) DO NOTHING;", query)
query = fmt.Sprintf("%v ON CONFLICT (%v) DO NOTHING;", query, pkeysstr)

} else {
query = fmt.Sprintf("%v ON CONFLICT (id) DO UPDATE SET %v;", query, strings.Join(st.Ukeys, ", "))
query = fmt.Sprintf("%v ON CONFLICT (%v) DO UPDATE SET %v;", query, pkeysstr, strings.Join(st.Ukeys, ", "))
}

fmt.Printf("Write INSERT to Postgres %v\n", query)
Expand Down Expand Up @@ -217,18 +224,28 @@ func updateMsg(m message.Msg, s *sql.DB) error {
func primaryKeys(namespace string, db *sql.DB) (primaryKeys map[string]bool, err error) {
primaryKeys = map[string]bool{}
namespaceArray := strings.SplitN(namespace, ".", 2)
log.Debugf("namespace is: %v", namespace)
log.Debugf("len namespacearray is: %v", len(namespaceArray))
var (
tableSchema string
tableName string
columnName string
)
if namespaceArray[1] == "" {

if len(namespaceArray) > 1 {
if namespaceArray[1] == "" {
tableSchema = "public"
tableName = namespaceArray[0]
} else {
tableSchema = namespaceArray[0]
tableName = namespaceArray[1]
}
} else {
tableSchema = "public"
tableName = namespaceArray[0]
} else {
tableSchema = namespaceArray[0]
tableName = namespaceArray[1]
}
log.Debugf("tableschema is: %v", tableSchema)
log.Debugf("tablename is: %v", tableName)

tablesResult, err := db.Query(fmt.Sprintf(`
SELECT
Expand Down Expand Up @@ -266,10 +283,10 @@ func GenerateUpsertStatement(m message.Msg, s *sql.DB) (UpsertValues, error) {
vals []interface{}
)

log.Infof("Prepare primary keys for %v", m.Namespace)
log.Debugf("Prepare primary keys for %v", m.Namespace)
pkeys, err := primaryKeys(m.Namespace(), s)

log.Infof("got primary keys")
log.Debugf("got primary keys %#v", pkeys)
if err != nil {
log.Errorln("Error generating UPSERT statement for Postgres %v : %v", m.Namespace(), err)
} else {
Expand Down

0 comments on commit 7f8c286

Please sign in to comment.