master
huanghai 5 years ago
parent 9fd2adaeb3
commit 319590a92e

@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"dsSupport/Utils/CommonUtil" "dsSupport/Utils/CommonUtil"
"dsSupport/Utils/DbUtil"
"dsSupport/Utils/PgUtil" "dsSupport/Utils/PgUtil"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -21,8 +22,10 @@ import (
// https://www.cnblogs.com/hello-shf/p/11543453.html // https://www.cnblogs.com/hello-shf/p/11543453.html
var client elastic.Client var client elastic.Client
var db = PgUtil.Engine var mysqlDb = DbUtil.Engine
var sqls = make([]string, 0) var pgDb = PgUtil.Engine
var insertStrArray = make([]string, 0)
var pkStrArray = make([]string, 0)
var keyMap = make(map[string]int, 0) var keyMap = make(map[string]int, 0)
func main() { func main() {
@ -42,26 +45,16 @@ func main() {
//要同步的索引名称,也就是表名称 //要同步的索引名称,也就是表名称
indexName := "org_school" indexName := "org_school"
//从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持
sql := `SELECT sql := "select * from t_dw_table where table_name=?"
pg_constraint.conname AS pk_name, list, _ := mysqlDb.SQL(sql, indexName).Query().List()
pg_attribute.attname AS colname, tableId := list[0]["table_id"].(int64)
pg_type.typname AS typename sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1`
FROM list, _ = mysqlDb.SQL(sql, tableId).Query().List()
pg_constraint pk := list[0]["field_name"].(string)
INNER JOIN pg_class ON pg_constraint.conrelid = pg_class.oid
INNER JOIN pg_attribute ON pg_attribute.attrelid = pg_class.oid
AND pg_attribute.attnum = pg_constraint.conkey [ 1 ]
INNER JOIN pg_type ON pg_type.oid = pg_attribute.atttypid
WHERE
pg_class.relname = ?
AND pg_constraint.contype = 'p'`
list, _ := db.SQL(sql, indexName).Query().List()
pk := list[0]["colname"].(string)
//取所有 //取所有
CTX := context.Background() CTX := context.Background()
result, err := esClient.Scroll().Index(indexName).Size(1000).Do(CTX) result, err := esClient.Scroll().Index(indexName).Size(100).Do(CTX)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -70,10 +63,10 @@ func main() {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte) resStr := string(resByte)
value := gjson.Get(resStr, "data_content") value := gjson.Get(resStr, "data_content")
addRecord(indexName, pk, value.String()) addRecord(pk, value.String())
} }
//批量执行 //批量执行
batchSave(sqls) batchSave(indexName, pk)
//继续用的 scoll_id //继续用的 scoll_id
scrollId := result.ScrollId scrollId := result.ScrollId
@ -85,7 +78,8 @@ func main() {
//开始循环 //开始循环
for { for {
//清空 //清空
sqls = sqls[0:0] insertStrArray = insertStrArray[0:0]
pkStrArray = pkStrArray[0:0]
//如果还有数据没有获取 //如果还有数据没有获取
if allCount > nowCount { if allCount > nowCount {
result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX)
@ -96,49 +90,43 @@ func main() {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte) resStr := string(resByte)
value := gjson.Get(resStr, "data_content") value := gjson.Get(resStr, "data_content")
addRecord(indexName, pk, value.String()) addRecord(pk, value.String())
} }
} else { } else {
//没有数据了 //没有数据了
break break
} }
//批量执行 //批量执行
batchSave(sqls) batchSave(indexName, pk)
} }
} }
//后续优化为到达一定的阀值再提交一次初步定为200一次 //后续优化为到达一定的阀值再提交一次初步定为200一次
func addRecord(tableName string, pk string, jsonStr string) { func addRecord(pk string, jsonStr string) {
var mymap map[string]interface{} var mymap map[string]interface{}
var sql = ""
if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil {
//拿到key //组装pkArray,用来update enable_flag
var keys []string
for k := range mymap {
keys = append(keys, k)
}
//对key排序
sort.Strings(keys)
//先删除再插入
sql = `delete from ` + tableName + ` where ` + pk + `=`
switch (mymap[pk]).(type) { switch (mymap[pk]).(type) {
case string: case string:
sqls = append(sqls, sql+"'"+mymap[pk].(string)+"';") pkStrArray = append(pkStrArray, mymap[pk].(string))
break break
case float64: case float64:
sqls = append(sqls, sql+CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64)))+";") pkStrArray = append(pkStrArray, CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64))))
break break
default: default:
break break
} }
//插入 //另一个任务是组装insertStrArray,准备批量提交
sql = `insert into ` + tableName + "(" //拿到key
var keys []string
for k := range mymap {
keys = append(keys, k)
}
//对key排序
sort.Strings(keys)
var lineSql = "" var lineSql = ""
//根据key从m中拿元素就是按顺序拿了 //根据key从m中拿元素就是按顺序拿了
for _, k := range keys { for _, k := range keys {
sql += k + ","
switch mymap[k].(type) { switch mymap[k].(type) {
case string: case string:
lineSql += "'" + mymap[k].(string) + "'," lineSql += "'" + mymap[k].(string) + "',"
@ -154,26 +142,49 @@ func addRecord(tableName string, pk string, jsonStr string) {
} }
} }
lineSql = lineSql[0 : len(lineSql)-1] lineSql = lineSql[0 : len(lineSql)-1]
sql = sql[0 : len(sql)-1] insertStrArray = append(insertStrArray, lineSql)
sql += ") values(" + lineSql + ");"
} else { } else {
fmt.Println(err.Error()) fmt.Println(err.Error())
} }
sqls = append(sqls, sql)
} }
//提交 //提交
func batchSave(sqls []string) { func batchSave(tableName string, pkName string) {
fmt.Println(sqls) //fmt.Println(pkStrArray)
//_, err := db.SQL(sqls[0:2]).Execute() //fmt.Println(insertStrArray)
//if err != nil { var pkStr = ""
// panic(err) for i := range pkStrArray {
//} pkStr += "'" + pkStrArray[i] + "',"
}
//for i := range sqls { pkStr = pkStr[0 : len(pkStr)-1]
// _, err := db.SQL(sqls[i]).Execute() sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr+")"
// if err != nil { pgDb.SQL(sql).Execute()
// panic(err) /*
// } //插入
//} sql = `insert into ` + tableName + "("
var lineSql = ""
//根据key从m中拿元素就是按顺序拿了
for _, k := range keys {
sql += k + ","
switch mymap[k].(type) {
case string:
lineSql += "'" + mymap[k].(string) + "',"
break
case int64:
lineSql += CommonUtil.ConvertInt64ToString(mymap[k].(int64)) + ","
break
case float64:
lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + ","
break
default:
break
}
}
lineSql = lineSql[0 : len(lineSql)-1]
sql = sql[0 : len(sql)-1]
sql += ") values(" + lineSql + ");"
} else {
fmt.Println(err.Error())
*/
} }

Loading…
Cancel
Save