master
huanghai 5 years ago
parent 14ba2fcfb9
commit 52fa055c35

@ -11,24 +11,35 @@ import (
"github.com/tidwall/gjson"
"log"
"os"
"sort"
"time"
)
// 深度分页
// TODO
// 后续需要记录每个INDEX的最后同步时间形成日志文件现在是一次性全量同步
// 因目前是以业务主键ID保持所以比ES的数据量肯定是要少的因为更新的日志记录都没有同步到数仓只是最终有效的业务ID同步过来。
// https://www.cnblogs.com/hello-shf/p/11543453.html
var client elastic.Client
//MYSQL数据连接器用于读取元数据信息
var mysqlDb = DbUtil.Engine
//GreenPlum数据连接器用于写入、修改GPDB的数据
var pgDb = PgUtil.Engine
//用于批量一次性插入的数据集合
var insertStrArray = make([]string, 0)
//用于批量更新的数据集合一般是业务主键ID集合修改enable_falg=0
var pkStrArray = make([]string, 0)
var keyMap = make(map[string]int, 0)
//缓存表的结构
type CacheFieldStruct struct {
cacheFieldKeys []string
tableId int
}
var CacheFieldBean CacheFieldStruct
func main() {
//TODO
//需要将ES的连接信息修改到配置文件中,目前想来应该是数据交换平台的概念需要在dsDataex项目中实现待迁移
var host = "http://10.10.14.188:9200/"
esClient, err := elastic.NewClient(
elastic.SetURL(host),
@ -38,74 +49,94 @@ func main() {
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
)
if err != nil {
//如果是集成到dsDataex项目中以协程方式启动这里就不有使用painc(err)会异常会导致程序退出
panic(err)
}
//要同步的索引名称,也就是表名称
indexName := "org_school"
keys := make([]string, 0)
//从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持
sql := "select * from t_dw_table where table_name=?"
list, _ := mysqlDb.SQL(sql, indexName).Query().List()
tableId := list[0]["table_id"].(int64)
sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1`
list, _ = mysqlDb.SQL(sql, tableId).Query().List()
pk := list[0]["field_name"].(string)
//取所有
CTX := context.Background()
result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX)
if err != nil {
panic(err)
}
//第一次的结果集
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
keys = addRecord(pk, value.String())
}
//批量执行
batchSave(indexName, pk, keys)
//继续用的 scoll_id
scrollId := result.ScrollId
//第一次命中的个数
var nowCount = int64(len(result.Hits.Hits))
//总的命中个数
allCount := result.TotalHits()
//开始循环
for {
//清空
insertStrArray = insertStrArray[0:0]
pkStrArray = pkStrArray[0:0]
//如果还有数据没有获取
if allCount > nowCount {
result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX)
scrollId = result.ScrollId
nowCount += int64(len(result.Hits.Hits))
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
keys = addRecord(pk, value.String())
}
} else {
//没有数据了
break
sql := "select * from t_dw_table"
list, _ := mysqlDb.SQL(sql).Query().List()
for i := range list {
tableId := list[i]["table_id"].(int64)
indexName := list[i]["table_name"].(string)
//预热数据表的列名
getFields(int(tableId))
// 当前表的主键是什么,目前只支持单业务主键,复合主键的不支持
sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1`
list, _ = mysqlDb.SQL(sql, tableId).Query().List()
pk := list[0]["field_name"].(string)
//取所有
CTX := context.Background()
result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX)
if err != nil {
panic(err)
}
//第一次的结果集
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
addRecord(pk, value.String())
}
//批量执行
batchSave(indexName, pk, keys)
batchSave(indexName, pk)
//继续用的 scoll_id
scrollId := result.ScrollId
//第一次命中的个数
var nowCount = int64(len(result.Hits.Hits))
//总的命中个数
allCount := result.TotalHits()
//开始循环
for {
//清空
insertStrArray = insertStrArray[0:0]
pkStrArray = pkStrArray[0:0]
//如果还有数据没有获取
if allCount > nowCount {
result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX)
scrollId = result.ScrollId
nowCount += int64(len(result.Hits.Hits))
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
addRecord(pk, value.String())
}
} else {
//没有数据了
break
}
//批量执行
batchSave(indexName, pk)
}
fmt.Println("索引"+indexName+"同步完成!")
}
}
/**
2020-09-07
*/
func getFields(tableId int) {
//清空
CacheFieldBean.cacheFieldKeys = CacheFieldBean.cacheFieldKeys[:]
//填充
sql := `select field_name from t_dw_table_field where table_id=? order by sort_id,field_name`
list, _ := mysqlDb.SQL(sql, tableId).Query().List()
for i := range list {
fieldName := list[i]["field_name"].(string)
CacheFieldBean.cacheFieldKeys = append(CacheFieldBean.cacheFieldKeys, fieldName)
}
CacheFieldBean.tableId = tableId
}
//后续优化为到达一定的阀值再提交一次初步定为200一次
func addRecord(pk string, jsonStr string) []string {
func addRecord(pk string, jsonStr string) {
var mymap map[string]interface{}
var keys []string
if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil {
//组装pkArray,用来update enable_flag
switch (mymap[pk]).(type) {
@ -119,15 +150,9 @@ func addRecord(pk string, jsonStr string) []string {
break
}
//另一个任务是组装insertStrArray,准备批量提交
//拿到key
for k := range mymap {
keys = append(keys, k)
}
//对key排序
sort.Strings(keys)
var lineSql = ""
//根据key从m中拿元素就是按顺序拿了
for _, k := range keys {
for _, k := range CacheFieldBean.cacheFieldKeys {
switch mymap[k].(type) {
case string:
lineSql += "'" + mymap[k].(string) + "',"
@ -139,6 +164,7 @@ func addRecord(pk string, jsonStr string) []string {
lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + ","
break
default:
lineSql += "'-1'," //进入这里一般是有这个字段,但数据源中没有这列数据,按-1填充吧~
break
}
}
@ -147,11 +173,10 @@ func addRecord(pk string, jsonStr string) []string {
} else {
fmt.Println(err.Error())
}
return keys
}
//提交
func batchSave(tableName string, pkName string, keys []string) {
func batchSave(tableName string, pkName string) {
var pkStr = ""
//数组去重
pkStrArray = CommonUtil.RemoveRepeatedElement(pkStrArray)
@ -165,7 +190,7 @@ func batchSave(tableName string, pkName string, keys []string) {
//插入
sql = `insert into ` + tableName + "("
//根据key从m中拿元素就是按顺序拿了
for _, k := range keys {
for _, k := range CacheFieldBean.cacheFieldKeys {
sql += k + ","
}
sql += "uuid"
@ -176,8 +201,8 @@ func batchSave(tableName string, pkName string, keys []string) {
}
lineSql = lineSql[0 : len(lineSql)-1]
sql += ") values " + lineSql + ";"
_,err:=pgDb.SQL(sql).Execute()
if err!=nil{
_, err := pgDb.SQL(sql).Execute()
if err != nil {
fmt.Println(sql)
//panic(err)
}

Loading…
Cancel
Save