You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

186 lines
5.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package main
import (
"context"
"dsSupport/Utils/CommonUtil"
"dsSupport/Utils/DbUtil"
"dsSupport/Utils/PgUtil"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"github.com/tidwall/gjson"
"log"
"os"
"sort"
"time"
)
// 深度分页
// TODO
// 后续需要记录每个INDEX的最后同步时间形成日志文件现在是一次性全量同步
// 因目前是以业务主键ID保持所以比ES的数据量肯定是要少的因为更新的日志记录都没有同步到数仓只是最终有效的业务ID同步过来。
// https://www.cnblogs.com/hello-shf/p/11543453.html
var client elastic.Client
var mysqlDb = DbUtil.Engine
var pgDb = PgUtil.Engine
var insertStrArray = make([]string, 0)
var pkStrArray = make([]string, 0)
var keyMap = make(map[string]int, 0)
func main() {
var host = "http://10.10.14.188:9200/"
esClient, err := elastic.NewClient(
elastic.SetURL(host),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(10*time.Second),
elastic.SetGzip(true),
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
)
if err != nil {
panic(err)
}
//要同步的索引名称,也就是表名称
indexName := "org_school"
keys := make([]string, 0)
//从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持
sql := "select * from t_dw_table where table_name=?"
list, _ := mysqlDb.SQL(sql, indexName).Query().List()
tableId := list[0]["table_id"].(int64)
sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1`
list, _ = mysqlDb.SQL(sql, tableId).Query().List()
pk := list[0]["field_name"].(string)
//取所有
CTX := context.Background()
result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX)
if err != nil {
panic(err)
}
//第一次的结果集
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
keys = addRecord(pk, value.String())
}
//批量执行
batchSave(indexName, pk, keys)
//继续用的 scoll_id
scrollId := result.ScrollId
//第一次命中的个数
var nowCount = int64(len(result.Hits.Hits))
//总的命中个数
allCount := result.TotalHits()
//开始循环
for {
//清空
insertStrArray = insertStrArray[0:0]
pkStrArray = pkStrArray[0:0]
//如果还有数据没有获取
if allCount > nowCount {
result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX)
scrollId = result.ScrollId
nowCount += int64(len(result.Hits.Hits))
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
keys = addRecord(pk, value.String())
}
} else {
//没有数据了
break
}
//批量执行
batchSave(indexName, pk, keys)
}
}
//后续优化为到达一定的阀值再提交一次初步定为200一次
func addRecord(pk string, jsonStr string) []string {
var mymap map[string]interface{}
var keys []string
if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil {
//组装pkArray,用来update enable_flag
switch (mymap[pk]).(type) {
case string:
pkStrArray = append(pkStrArray, mymap[pk].(string))
break
case float64:
pkStrArray = append(pkStrArray, CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64))))
break
default:
break
}
//另一个任务是组装insertStrArray,准备批量提交
//拿到key
for k := range mymap {
keys = append(keys, k)
}
//对key排序
sort.Strings(keys)
var lineSql = ""
//根据key从m中拿元素就是按顺序拿了
for _, k := range keys {
switch mymap[k].(type) {
case string:
lineSql += "'" + mymap[k].(string) + "',"
break
case int64:
lineSql += CommonUtil.ConvertInt64ToString(mymap[k].(int64)) + ","
break
case float64:
lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + ","
break
default:
break
}
}
lineSql += "'" + CommonUtil.GetUUID() + "'"
insertStrArray = append(insertStrArray, lineSql)
} else {
fmt.Println(err.Error())
}
return keys
}
//提交
func batchSave(tableName string, pkName string, keys []string) {
var pkStr = ""
//数组去重
pkStrArray = CommonUtil.RemoveRepeatedElement(pkStrArray)
for i := range pkStrArray {
pkStr += "'" + pkStrArray[i] + "',"
}
pkStr = pkStr[0 : len(pkStr)-1]
sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr + ") and enable_flag=1"
pgDb.SQL(sql).Execute()
//插入
sql = `insert into ` + tableName + "("
//根据key从m中拿元素就是按顺序拿了
for _, k := range keys {
sql += k + ","
}
sql += "uuid"
var lineSql = ""
for i := range insertStrArray {
lineSql += "(" + insertStrArray[i] + "),"
}
lineSql = lineSql[0 : len(lineSql)-1]
sql += ") values " + lineSql + ";"
_,err:=pgDb.SQL(sql).Execute()
if err!=nil{
fmt.Println(sql)
//panic(err)
}
fmt.Println("批量执行" + CommonUtil.ConvertIntToString(len(insertStrArray)) + "条.")
}