From 036bd31a331a675e34fe5a5705effba31254b7ea Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 16:58:05 +0800 Subject: [PATCH] 'commit' --- .../ElasticsearchToGreenPlum.go | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index cc71ac80..d24999f5 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -21,6 +21,8 @@ import ( var client elastic.Client var db = PgUtil.Engine +var sqls = make([]string, 0) +var keyMap = make(map[string]int, 0) func main() { var host = "http://10.10.14.188:9200/" @@ -58,7 +60,7 @@ func main() { //取所有 CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) + result, err := esClient.Scroll().Index(indexName).Size(1000).Do(CTX) if err != nil { panic(err) } @@ -69,16 +71,22 @@ func main() { value := gjson.Get(resStr, "data_content") addRecord(indexName, pk, value.String()) } + //批量执行 + commitTran(sqls) + //继续用的 scoll_id scrollId := result.ScrollId //第一次命中的个数 var nowCount = int64(len(result.Hits.Hits)) + //总的命中个数 allCount := result.TotalHits() //开始循环 for { + //清空 + sqls = sqls[0:0] //如果还有数据没有获取 if allCount > nowCount { result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) @@ -95,27 +103,24 @@ func main() { //没有数据了 break } + //批量执行 + commitTran(sqls) } } + //后续优化为到达一定的阀值再提交一次,初步定为200一次 func addRecord(tableName string, pk string, jsonStr string) { var mymap map[string]interface{} var sql = "" if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { //先删除再插入 - sql = `delete from ` + tableName + ` where ` + pk + `=?` + sql = `delete from ` + tableName + ` where ` + pk + `=` switch (mymap[pk]).(type) { case string: - _, err := db.SQL(sql, mymap[pk].(string)).Execute() - if err != nil { - panic(err.Error()) - } + sqls = append(sqls, sql+"'"+mymap[pk].(string)+"';") break case float64: - _, err := db.SQL(sql, int64(mymap[pk].(float64))).Execute() - if err != nil { - panic(err.Error()) - } + sqls = append(sqls, sql+CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64)))+";") break default: break @@ -145,11 +150,24 @@ func addRecord(tableName string, pk string, jsonStr string) { } else { fmt.Println(err.Error()) } - fmt.Println(sql) - _, err := db.SQL(sql).Execute() + sqls = append(sqls, sql) +} + +//提交事务 +func commitTran(sqls []string) { + session := db.NewSession() + defer session.Close() + err := session.Begin() + for i := range sqls { + _, err = session.Insert(sqls[i]) + if err != nil { + fmt.Println(err.Error()) + session.Rollback() + return + } + } + err = session.Commit() if err != nil { - panic(err.Error()) - } else { - fmt.Println("成功插入一条数据!") + return } }