diff --git a/adaptor/postgres/writer.go b/adaptor/postgres/writer.go index 7f6b6999e..3abc0ec97 100644 --- a/adaptor/postgres/writer.go +++ b/adaptor/postgres/writer.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/json" "fmt" + "reflect" "strings" "github.com/compose/mejson" @@ -123,11 +124,17 @@ func insertMsg(m message.Msg, s *sql.DB) error { query := fmt.Sprintf("INSERT INTO %v (%v) VALUES (%v)", m.Namespace(), strings.Join(st.Inames, ", "), strings.Join(st.Ikeys, ", ")) + pkeys := reflect.ValueOf(st.Pkeys).MapKeys() + strkeys := make([]string, len(pkeys)) + for i := 0; i < len(pkeys); i++ { + strkeys[i] = pkeys[i].String() + } + pkeysstr := strings.Join(strkeys, ",") if len(st.Ukeys) == 0 { - query = fmt.Sprintf("%v ON CONFLICT (id) DO NOTHING;", query) + query = fmt.Sprintf("%v ON CONFLICT (%v) DO NOTHING;", query, pkeysstr) } else { - query = fmt.Sprintf("%v ON CONFLICT (id) DO UPDATE SET %v;", query, strings.Join(st.Ukeys, ", ")) + query = fmt.Sprintf("%v ON CONFLICT (%v) DO UPDATE SET %v;", query, pkeysstr, strings.Join(st.Ukeys, ", ")) } fmt.Printf("Write INSERT to Postgres %v\n", query) @@ -217,18 +224,28 @@ func updateMsg(m message.Msg, s *sql.DB) error { func primaryKeys(namespace string, db *sql.DB) (primaryKeys map[string]bool, err error) { primaryKeys = map[string]bool{} namespaceArray := strings.SplitN(namespace, ".", 2) + log.Debugf("namespace is: %v", namespace) + log.Debugf("len namespacearray is: %v", len(namespaceArray)) var ( tableSchema string tableName string columnName string ) - if namespaceArray[1] == "" { + + if len(namespaceArray) > 1 { + if namespaceArray[1] == "" { + tableSchema = "public" + tableName = namespaceArray[0] + } else { + tableSchema = namespaceArray[0] + tableName = namespaceArray[1] + } + } else { tableSchema = "public" tableName = namespaceArray[0] - } else { - tableSchema = namespaceArray[0] - tableName = namespaceArray[1] } + log.Debugf("tableschema is: %v", tableSchema) + log.Debugf("tablename is: %v", tableName) tablesResult, err := db.Query(fmt.Sprintf(` SELECT @@ -266,10 +283,10 @@ func GenerateUpsertStatement(m message.Msg, s *sql.DB) (UpsertValues, error) { vals []interface{} ) - log.Infof("Prepare primary keys for %v", m.Namespace) + log.Debugf("Prepare primary keys for %v", m.Namespace) pkeys, err := primaryKeys(m.Namespace(), s) - log.Infof("got primary keys") + log.Debugf("got primary keys %#v", pkeys) if err != nil { log.Errorln("Error generating UPSERT statement for Postgres %v : %v", m.Namespace(), err) } else {