master
huanghai 5 years ago
parent 7b4b7049f4
commit 036bd31a33

@ -21,6 +21,8 @@ import (
var client elastic.Client var client elastic.Client
var db = PgUtil.Engine var db = PgUtil.Engine
var sqls = make([]string, 0)
var keyMap = make(map[string]int, 0)
func main() { func main() {
var host = "http://10.10.14.188:9200/" var host = "http://10.10.14.188:9200/"
@ -58,7 +60,7 @@ func main() {
//取所有 //取所有
CTX := context.Background() CTX := context.Background()
result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) result, err := esClient.Scroll().Index(indexName).Size(1000).Do(CTX)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -69,16 +71,22 @@ func main() {
value := gjson.Get(resStr, "data_content") value := gjson.Get(resStr, "data_content")
addRecord(indexName, pk, value.String()) addRecord(indexName, pk, value.String())
} }
//批量执行
commitTran(sqls)
//继续用的 scoll_id //继续用的 scoll_id
scrollId := result.ScrollId scrollId := result.ScrollId
//第一次命中的个数 //第一次命中的个数
var nowCount = int64(len(result.Hits.Hits)) var nowCount = int64(len(result.Hits.Hits))
//总的命中个数 //总的命中个数
allCount := result.TotalHits() allCount := result.TotalHits()
//开始循环 //开始循环
for { for {
//清空
sqls = sqls[0:0]
//如果还有数据没有获取 //如果还有数据没有获取
if allCount > nowCount { if allCount > nowCount {
result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX)
@ -95,27 +103,24 @@ func main() {
//没有数据了 //没有数据了
break break
} }
//批量执行
commitTran(sqls)
} }
} }
//后续优化为到达一定的阀值再提交一次初步定为200一次 //后续优化为到达一定的阀值再提交一次初步定为200一次
func addRecord(tableName string, pk string, jsonStr string) { func addRecord(tableName string, pk string, jsonStr string) {
var mymap map[string]interface{} var mymap map[string]interface{}
var sql = "" var sql = ""
if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil {
//先删除再插入 //先删除再插入
sql = `delete from ` + tableName + ` where ` + pk + `=?` sql = `delete from ` + tableName + ` where ` + pk + `=`
switch (mymap[pk]).(type) { switch (mymap[pk]).(type) {
case string: case string:
_, err := db.SQL(sql, mymap[pk].(string)).Execute() sqls = append(sqls, sql+"'"+mymap[pk].(string)+"';")
if err != nil {
panic(err.Error())
}
break break
case float64: case float64:
_, err := db.SQL(sql, int64(mymap[pk].(float64))).Execute() sqls = append(sqls, sql+CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64)))+";")
if err != nil {
panic(err.Error())
}
break break
default: default:
break break
@ -145,11 +150,24 @@ func addRecord(tableName string, pk string, jsonStr string) {
} else { } else {
fmt.Println(err.Error()) fmt.Println(err.Error())
} }
fmt.Println(sql) sqls = append(sqls, sql)
_, err := db.SQL(sql).Execute() }
//提交事务
func commitTran(sqls []string) {
session := db.NewSession()
defer session.Close()
err := session.Begin()
for i := range sqls {
_, err = session.Insert(sqls[i])
if err != nil {
fmt.Println(err.Error())
session.Rollback()
return
}
}
err = session.Commit()
if err != nil { if err != nil {
panic(err.Error()) return
} else {
fmt.Println("成功插入一条数据!")
} }
} }

Loading…
Cancel
Save