From 9e95c884b4106360efa768a5b8cf6e4fef5534f6 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 14:20:05 +0800 Subject: [PATCH] 'commit' --- .../ElasticsearchToGreenPlum.go | 71 ++++++++++++++----- 1 file changed, 52 insertions(+), 19 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 74476b8c..d0156664 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -36,6 +36,23 @@ func main() { //要同步的索引名称,也就是表名称 indexName := "org_school" + + sql := `SELECT + pg_constraint.conname AS pk_name, + pg_attribute.attname AS colname, + pg_type.typname AS typename + FROM + pg_constraint + INNER JOIN pg_class ON pg_constraint.conrelid = pg_class.oid + INNER JOIN pg_attribute ON pg_attribute.attrelid = pg_class.oid + AND pg_attribute.attnum = pg_constraint.conkey [ 1 ] + INNER JOIN pg_type ON pg_type.oid = pg_attribute.atttypid + WHERE + pg_class.relname = ? + AND pg_constraint.contype = 'p'` + list, _ := db.SQL(sql, indexName).Query().List() + pk := list[0]["colname"].(string) + //取所有 CTX := context.Background() result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) @@ -47,7 +64,7 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(indexName, value.String()) + addRecord(indexName, pk, value.String()) } //继续用的 scoll_id scrollId := result.ScrollId @@ -69,7 +86,7 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(indexName, value.String()) + addRecord(indexName, pk, value.String()) } } else { //没有数据了 @@ -77,42 +94,58 @@ func main() { } } } -func addRecord(tableName string, jsonStr string) { - sql := `insert into ` + tableName + "(" +func addRecord(tableName string, pk string, jsonStr string) { var mymap map[string]interface{} + var sql = "" if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { - for key, _ := range mymap { - sql += key + "," + //先删除再插入 + sql = `delete from ` + tableName + ` where ` + pk + `=?` + switch (mymap[pk]).(type) { + case string: + _, err := db.SQL(sql, mymap[pk].(string)).Execute() + if err != nil { + panic(err.Error()) + } + break + case float64: + _, err := db.SQL(sql, int64(mymap[pk].(float64))).Execute() + if err != nil { + panic(err.Error()) + } + break + default: + break } - sql = sql[0 : len(sql)-1] - sql += ") values(" - - for _, value := range mymap { + //插入 + sql = `insert into ` + tableName + "(" + var lineSql = "" + for key, value := range mymap { + sql += key + "," switch value.(type) { case string: - sql += "'" + value.(string) + "'," + lineSql += "'" + value.(string) + "'," break case int64: - sql += CommonUtil.ConvertInt64ToString(value.(int64)) + "," + lineSql += CommonUtil.ConvertInt64ToString(value.(int64)) + "," break case float64: - sql += fmt.Sprintf("%d", int64(value.(float64)))+ "," + lineSql += fmt.Sprintf("%d", int64(value.(float64))) + "," break default: break } } + lineSql = lineSql[0 : len(lineSql)-1] sql = sql[0 : len(sql)-1] + sql += ") values(" + lineSql + ");" } else { fmt.Println(err.Error()) } - - sql += ");" fmt.Println(sql) - _,err:=db.SQL(sql).Execute() - if err!=nil{ - fmt.Println("成功插入一条数据!") - }else{ + _, err := db.SQL(sql).Execute() + if err != nil { panic(err.Error()) + } else { + fmt.Println("成功插入一条数据!") } }